I am trying to understand what is the best way to achieve current time timestamps using Flink when producing a new record to Kafka
Does flink automatically fill the produced event with metadata containing the timestamp of the current time? Is that the best practice for the consumers or should we put the current time inside the event?
If I really want to put the current time of a processed event, how should I do it in Java? I am running flink in kubernetes, so I don’t know if a simple current_time() call would be the ideal way of doing it, because task managers may be in different nodes, and I am not sure if the clock in each of them are going to be in sync.
Advertisement
Answer
When initializing a KafkaSink
you have to provide a KafkaRecordSerializationSchema
, in the serialize
method you can set the timestamp associated to each element when building the org.apache.kafka.clients.producer.ProducerRecord
. The timestamp the serialize
method receives will depend on your pipeline configuration. You can get more information about assigning timestamps and how Flink handles time in here: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/
If you are not setting it, Kafka will automatically assign a timestamp to each record when receiving it (the ingestion time, which will basically be the processing time plus a slight delay).
In any case, achieving perfectly ordered processing time timestamps in a distributed application will face the problem you describe. Different nodes will have different clocks, even if all are synchronized using NTP. It is a big problem in distributed systems that requires significant effort to solve (if even possible).
A pragmatic approach that may be good enough is to just have all records that belong to the same key timestamped by the same node, this way you will have most of the time a perfectly ordered timestamp. Be aware that a rebalance or a correction of the clock (which NTP does periodically) will break these perfectly ordered timestamps per key for some records from time to time. If you have a KeyedStream
and you assign the timestamp in a keyed map
or let Kafka do it, you will get these mostly-ordered timestamps per key.