Skip to content
Advertisement

Filtering duplicates out of an infinite DataStream with windows

I want to filter out duplicates in Flink from an infinite DataStream. I know the duplicates arise only in a small time window (max 10 seconds). I found a promising approach that is pretty simple here. But it doesn’t work. It uses a keyed DataStream and returns only the first message of every window. This is my window code:

DataStream<Row> outputStream = inputStream
                .keyBy(new MyKeySelector())
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.minutes(5)))
                .process(new DuplicateFilter());

MyKeySelector()is just a class to select the first two attributes of the Row message as the key. This key works as a primary key and causes that only messages with same key are assigned to the same window (classic keyed stream behaviour).

That’s the class Duplicate Filter which is very similar to the proposed answer to the above-mentioned question. I only used the newer process() function instead of apply().

public class DuplicateFilter extends ProcessWindowFunction<Row, Row, Tuple2<String, String>, TimeWindow> {
private static final Logger LOG = LoggerFactory.getLogger(DuplicateFilter.class);

@Override
public void process(Tuple2<String, String> key, Context context, Iterable<Row> iterable, Collector<Row> collector) throws Exception {
    // this is just for debugging and can be ignored
    int count = 0;
    for (Row record :
            iterable) {
        LOG.info("Row number {}: {}", count, record);
        count++;
    }
    LOG.info("first Row: {}", iterable.iterator().next());

    collector.collect(iterable.iterator().next()); //output only the first message in this window
}
}

My messages arrive with an interval of max. one second, so a 30 seconds window should handle that well. But messages which arrive with a distance of less than 1 second are assigned to different windows. What I can see from the logs is that it works correctly only very rarely.

Has someone got an idea or another approach for this task? Please let me know if you need more information.

Advertisement

Answer

Flink’s time windows are aligned to the clock, rather than to the events, so two events that are close together in time can be assigned to different windows. Windows are often not very well suited for deduplication, but you might get good results if you use session windows.

Personally, I would use a keyed flatmap (or a process function), and use state TTL (or timers) to clear the state for keys once it’s no longer needed.

You can also do deduplication with Flink SQL: https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/sql/queries/deduplication/ (but you would need to set an idle state retention interval).


Advertisement