I’ve a pipeline developed as shown below:
SingleOutputStreamOperator<String> stream = ... DataStream<String> branch2 = stream .getSideOutput(outputTag2) .keyBy(MetricObject::getRootAssetId) .window(TumblingEventTimeWindows.of(Time.seconds(180))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(15))) .aggregate(new CountDistinctAggregate(),new CountDistinctProcess()) .name("windowed-count-distinct") .uid("windowed-count-distinct") .map(AggregationObject::toString) .name("get-toString");
I’m thinking hard about a way to value latency from input to output, but the relation between input and output isn’t 1 to 1 but there are a lot of transformations that make the latency evaluation conceptually very hard. I know about the latency metrics given by using the method:
env.getConfig().setLatencyTrackingInterval(1000)
, but I can’t understand what do they represent and how to use it in test where I try to stress the application sending different numbers of records per seconds (10/s, 20/s, 50/s and so on) and monitor when the throughput starts to decrease, the latency starts to increase and the backpressure begins.
Advertisement
Answer
Flink’s built-in latency metrics measure the time it takes for latency tracking markers to travel from the sources to each downstream operator instance. These markers travel with your stream records, waiting their turn in network queues, but skip over your user functions. This means that the actual latency will be larger.
For a comprehensive overview of how to measure and improve latency, see Getting into Low-Latency Gears with Apache Flink, the code for which is in https://github.com/ververica/lab-flink-latency. In these jobs, a custom “eventTimeLag” histogram metric is used to measure and report latency.
https://github.com/apache/flink-benchmarks might also be of interest.