Flink: does Flink support abstract operator which can process different data streams with common fields?

Tags: , ,



Suppose we have multiple data streams and they share some common features.

For example, we have a stream of Teacher and a stream of Student, and they both have an age field. If I want to find out the eldest student or teacher from the realtime stream, I can implement an operator as below.

public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
    int maxAge;

    @Override
    public void flatMap(Student s, Collector<Integer> collector) throws Exception {
        if(s.age > maxAge){
            maxAge = s.age;
        }
        collector.collect(maxAge);
    }
}

To find out the eldest Teacher, we need to implement a similar operator as below

public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
    int maxAge;

    @Override
    public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = t.age;
        }
        collector.collect(maxAge);
    }
}

But actually these two operators have common process logic, so my idea is to define a parent class, such as People.

public class People{
    public Integer age;
}

Then Student and Teacher can be defined as their child class, and also keep their own fields.

public class Student extends People {
    public Integer grade;  // student grade
    ...
}
public class Student extends People {
    public Integer subject;  // the subject that teacher teaches
    ...
}

In this case, I can define an operator as below.

public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
    int maxAge;

    @Override
    public void flatMap(People p, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = p.age;
        }
        collector.collect(maxAge);
    }
}

But when I try to use this operator to implement a Flink execution topology, it won’t work because of the unmatched data type.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);

studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();

And this is my question, is it possible to make an abstract operator for input streams having common fields?

Answer

This is more a Java than a Flink question:

What you want to do is to make MaxiumAgeFunc parameterized like this

public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
    int maxAge;

    @Override
    public void flatMap(T p, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = p.age;
        }
        collector.collect(maxAge);
    }
}

and then use it like this

studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();

edit:

btw your function is not working with checkpointing (so will produce wrong results upon recovery from a checkpoint) and I’d rather go with an aggregation function over the global window.

students
    .windowAll(GlobalWindows.create())
    .aggregate(new AggregateFunction<People, Integer, Integer>() {
        @Override
        public Integer createAccumulator() {
            return -1;
        }

        @Override
        public Integer add(People value, Integer accumulator) {
            return Math.max(value.age, accumulator);
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return Math.max(a, b);
        }
    });


Source: stackoverflow