Suppose we have multiple data streams and they share some common features.
For example, we have a stream of Teacher and a stream of Student, and they both have an age field. If I want to find out the eldest student or teacher from the realtime stream, I can implement an operator as below.
public MaxiumAgeFunc extends RichMapFunction<Student,Integer> { int maxAge; @Override public void flatMap(Student s, Collector<Integer> collector) throws Exception { if(s.age > maxAge){ maxAge = s.age; } collector.collect(maxAge); } }
To find out the eldest Teacher, we need to implement a similar operator as below
public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> { int maxAge; @Override public void flatMap(Teacher t, Collector<Integer> collector) throws Exception { if(t.age > maxAge){ maxAge = t.age; } collector.collect(maxAge); } }
But actually these two operators have common process logic, so my idea is to define a parent class, such as People.
public class People{ public Integer age; }
Then Student and Teacher can be defined as their child class, and also keep their own fields.
public class Student extends People { public Integer grade; // student grade ... }
public class Student extends People { public Integer subject; // the subject that teacher teaches ... }
In this case, I can define an operator as below.
public MaxiumAgeFunc extends RichMapFunction<People,Integer> { int maxAge; @Override public void flatMap(People p, Collector<Integer> collector) throws Exception { if(t.age > maxAge){ maxAge = p.age; } collector.collect(maxAge); } }
But when I try to use this operator to implement a Flink execution topology, it won’t work because of the unmatched data type.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Student> studentStream = env.addSource(...); DataStream<Teacher> teacherStream = env.addSource(...); studentStream.map(new MaxiumAgeFunc()).print(); teacherStream.map(new MaxiumAgeFunc()).print();
And this is my question, is it possible to make an abstract operator for input streams having common fields?
Advertisement
Answer
This is more a Java than a Flink question:
What you want to do is to make MaxiumAgeFunc
parameterized like this
public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> { int maxAge; @Override public void flatMap(T p, Collector<Integer> collector) throws Exception { if(t.age > maxAge){ maxAge = p.age; } collector.collect(maxAge); } }
and then use it like this
studentStream.map(new MaxiumAgeFunc<>()).print(); teacherStream.map(new MaxiumAgeFunc<>()).print();
edit:
btw your function is not working with checkpointing (so will produce wrong results upon recovery from a checkpoint) and I’d rather go with an aggregation function over the global window.
students .windowAll(GlobalWindows.create()) .aggregate(new AggregateFunction<People, Integer, Integer>() { @Override public Integer createAccumulator() { return -1; } @Override public Integer add(People value, Integer accumulator) { return Math.max(value.age, accumulator); } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return Math.max(a, b); } });