Skip to content
Advertisement

Cannot infer type-variable(s) R in Flink ProcessWindowFunction

I am having trouble with resolving this error in Flink (Version 1.11.0):

java: no suitable method found for process(com.xyz.myPackage.operators.windowed.ComputeFeatures)
    method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
      (cannot infer type-variable(s) R
        (argument mismatch; com.xyz.myPackage.operators.windowed.ComputeFeatures cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
    method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
      (cannot infer type-variable(s) R
        (actual and formal argument lists differ in length))

This is how I create a keyed windowed stream:

timestampedStreamElementDataStream
        .keyBy(StreamElement::getId)
        .window(SlidingEventTimeWindows.of(Time.seconds(600),Time.seconds(60)))
        .process(new ComputeFeatures());

And here is how my ComputeFeatures function looks like:

public class ComputeFeatures extends ProcessWindowFunction<
        StreamElement,
        StreamElement,
        Long,
        TimeWindow> {

    @Override
    public void process(Long key,
                        Context context,
                        Iterable<StreamElement> elements,
                        Collector<StreamElement> out) throws Exception {


        System.out.println("In windowed function");

    }
}

The StreamElement::getId returns a Long so everything regarding types should be correct, but it seems that Flink still has trouble inferring a type. I am looking for ideas how to solve this.

NOTE: This issue seems related but it didn’t fit my problem: LINK

EDIT 1: As suggested by David I tried autogenerating the overridden process function with IntelliJ, but the issue still remains the same. The autogenerated code looks like this in case of specifying types:

public class ComputeFeatures extends ProcessWindowFunction<StreamElement,StreamElement,Long,TimeWindow> {
    
    
    @Override
    public void process(Long aLong,
                        ProcessWindowFunction<StreamElement, StreamElement, Long, TimeWindow>.Context context,
                        Iterable<StreamElement> elements,
                        Collector<StreamElement> out) throws Exception {

    }

And like this when I omit type specification:

public class ComputeFeatures extends ProcessWindowFunction {
    @Override
    public void process(Object o, Context context, Iterable elements, Collector out) throws Exception {
        System.out.println("In windowed function");
    }

EDIT 2: Maybe relevant: When I hover over new ComputeFeatures() IntelliJ displays this infobox:

Required type:
ProcessWindowFunction
<com.xyz.myPackage.entities.StreamElement,
R,
java.lang.Long,
org.apache.flink.streaming.api.windowing.windows.TimeWindow>
Provided:
ComputeFeatures


reason: no instance(s) of type variable(s) R exist so that ComputeFeatures conforms to ProcessWindowFunction<StreamElement, R, Long, TimeWindow>

Advertisement

Answer

Eh stupid mistake, the code works as it is, the problem was that IntelliJ imported a wrong ProcessWindowFunction (the Scala variant). After changing that everything worked as expected

User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement