Skip to content
Advertisement

InvalidTypesException for Generic Pattern Transformation in Apache Flink

I have problem regarding Apache Flink. I want to have an abstract class that consumes a stream. However the pattern applied to this stream should be interchangeable.

public abstract class AbstractVisitConsumer<TEventType>

TEventType marks the type of the event that is generated from the pattern. Every pattern must implement an interface called IEventPattern

public interface IEventPattern<TStreamInput, TMatchedEvent> extends Serializable {
 
TMatchedEvent create(Map<String, List<TStreamInput>> pattern);

Pattern<TStreamInput, ?> getEventPattern();

The abstract class has a method called applyPatternSelectToStream()

 DataStream<TEventType> applyPatternSelectToStream(DataStream<VisitEvent> stream, IEventPattern<VisitEvent, TEventType> pattern) {
    DataStream<TEventType> patternStream = CEP.pattern(stream, pattern.getEventPattern()).select(new PatternSelectFunction<VisitEvent, TEventType>() {
        @Override
        public TEventType select(Map<String, List<VisitEvent>> map) throws Exception {
            return pattern.create(map);
        }
    }).returns(this.typeInformation);
    return patternStream;
}

The flink compiler always gives me the error

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'TEventType' in 'class com.felix.AbstractVisitConsumer' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s)

My class WorkPlaceConsumer extends the aforementioned abstract class to specify the desired event that is generated from the stream.

public class WorkPlaceConsumer extends AbstractVisitConsumer<WorkPlaceEvent> {


public WorkPlaceConsumer(TypeInformation typeInfo) {
    super(TypeInformation.of(WorkPlaceEvent.class));
}

public static void main(String[] args) {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    WorkPlaceConsumer consumer = new WorkPlaceConsumer();
    DataStream<VisitEvent> visitStream = consumer.getVisitStream(env);
    DataStream<WorkPlaceEvent> workPlaceStream = consumer.applyPatternSelectToStream(visitStream, new WorkPlacePattern());

    visitStream.print();
    workPlaceStream
            .keyBy((KeySelector<WorkPlaceEvent, Integer>) event -> event.getUserId())
            .filter(new NewWorkPlaceEventFilter())
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

I already tried to implement the ResultTypeQueryable interface and tried to store the type information in the abstract class by passing the type information from the child class at runtime. I also was using .returns for the method to provide manual type information. Maybe I am just doing it wrong. Had anybody similar issues with generic transformations on streams?

Advertisement

Answer

All right I had a second look at your problem and you were right it has nothing to do with the lambda expression. The problem is that type erasure was applied to the PatternSelectFunction.

You can manually provide the return type by implementing ResultTypeQueryable interface. You can do it e.g. like that:

public interface InnerPatternSelectFunction<T> extends PatternSelectFunction<String, T>, ResultTypeQueryable<T>{};

public DataStream<T> applyPatternSelectToStream(DataStream<String> stream, IEventPattern<String, T> pattern) {
    TypeInformation<T> producedType = this.typeInformation;
    return CEP.pattern(stream, pattern.getEventPattern()).select(new InnerPatternSelectFunction<T>() {
        @Override
        public TypeInformation<T> getProducedType() {
            return producedType;
        }

        @Override
        public T select(Map<String, List<String>> map) throws Exception {
            return pattern.create(map);
        }
    });
}

Of course it is just a proposal and I think you can improve the code ;). But you get the general idea of implementing ResultTypeQueryable interface.

Advertisement