Skip to content
Advertisement

Compare 2 Data streams in Flink to retrieve missing data

I have 2 Data streams “dataStream1” and “dataStream2”, both of them of type String, i have created 2 tables out of them

   Table t1  = streamTableEnvironment.fromDataStream(dataStream1,$("a"),$("atime").proctime());
   Table t2 =  streamTableEnvironment.fromDataStream(dataStream2,$("b"),$("btime").proctime());

I need to get all the data in T1 that don’t exist in T2 in time frame of 5 seconds, like if i have inserted a record in T1, it should exist in T2 during 5 seconds, otherwise they should be collected as they would be considered as misisng data.

is there any hints? i tried to use SQL for it but i don’t see the right way to do it, i know for example how to get the common data in the time interval with SQL with no problem…but i don’t see how to get the missing data as the minus operator is doens’t exist in SQL for unbounded streams.

Advertisement

Answer

If You for some reason need and want to use SQL, You may try to use the IN clause on interval join. So, You basically select all elements from T1 that are also in T2 to create T3 and then only select elements from T1 that are not in T3 using NOT IN clause as described in the documentation. Note that this will only work if the elements are UNIQUE.

The other thing You may try is to handle this Yourself by using CoProcessFunction, so You do something like:

dataStream1.keyBy(_).connect(datastream2.keyBy(_))
.process(new MyProcessFunction())

Inside the function You would simply have a state that would keep every element from dataStream1 and whenever anything from dataStream2 arrives You would check if You can join, if it has the timestamp in given boundaries, if so You would delete the data from state as it’s not going to be emitted. You could also have a registered timer, that would emit all elements that were not joined.

Advertisement