I am trying to understand what is the best way to achieve current time timestamps using Flink when producing a new record to Kafka Does flink automatically fill the produced event with metadata containing the timestamp of the current time? Is that the best practice for the consumers or should we put the current time inside the event? If I
Tag: apache-flink
Cannot infer type-variable(s) R in Flink ProcessWindowFunction
I am having trouble with resolving this error in Flink (Version 1.11.0): This is how I create a keyed windowed stream: And here is how my ComputeFeatures function looks like: The StreamElement::getId returns a Long so everything regarding types should be correct, but it seems that Flink still has trouble inferring a type. I am looking for ideas how to
How to see restored value from savepoints?
I have tried to log info about states that recovered when I start a Flink app using savepoints. I see that the app was started using a savepoint correctly. But I can’t log state values that were recovered. The app calculates how many products were bought. Can I use CheckpointedFunction in such way? I see that context.isRestored() return true, but
Direct Buffer Memory error when connecting mqtt
we are running an Apache Beam Apllication on a Flink Cluster. Since a few days the application fails with the following error: The connection is build up with the following method: But it runs on my machine When I start the application from my eclipse project all works fine. So the error only happen in the Flink Cluster which makes
Flink error with ‘The implementation of the StreamExecutionEnvironment is not serializable’
I am a beginner of Flink, and I tried to use Flink to run LFM, one of the recommended algorithms, but the following errors appeared in my code when it was running. I tried to find and modify them, but they were not solved. Could someone tell me why I had problems? Here are my main exception And my code
Developing job for Flink
I am building a simple data pipeline for learning purposes. I have real time data coming from Kafka, I would like to do some transformations using Flink. Unfortunately, I’m not sure if I understand correctly deployment options. In the the Flink docs I have found section about Docker Compose and application mode. It says that I can deploy only one
Parsing Avro messages in flink , giving null pointer exception if field is nullable in Avro Schema
I need to parse the messages from confluent Kafka stored in Avro. But while applying filter it is giving null pointer exception, without filter i was able to write back into kafka but while applying filter, it is giving null pointer exception. Here is the Schema: Answer When you have null vallue in your topic for col3 and you trying
Flink Job submission throws java.nio.file.NoSuchFileException while the file actually exists
I tried to submit a flink job that is already packaged in a JAR. Basically it consumes a kafka topic protected by SASL authentication, thus it requires a .jks file which I already include them in JAR and read in the code as: I tried to submit the job on two different (different VM specs) standalone server for the sake
What does latency metrics represent in Flink and are them really valid to evaluate the latency of an application?
I’ve a pipeline developed as shown below: I’m thinking hard about a way to value latency from input to output, but the relation between input and output isn’t 1 to 1 but there are a lot of transformations that make the latency evaluation conceptually very hard. I know about the latency metrics given by using the method: env.getConfig().setLatencyTrackingInterval(1000), but I
Flink: java.lang.NoSuchMethodError: AvroSchemaConverter
I am trying to connect to Kafka. When I run a simple JAR file, I get the following error: Yes, I am aware that NoSuchMethod can potentially mean that there is a conflict between versions; however, there is no Flink instance running on YARN cluster. I have also tried to play with pom.xml but no luck The versions are indicated