I am having trouble with resolving this error in Flink (Version 1.11.0):
java: no suitable method found for process(com.xyz.myPackage.operators.windowed.ComputeFeatures) method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable (cannot infer type-variable(s) R (argument mismatch; com.xyz.myPackage.operators.windowed.ComputeFeatures cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>)) method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable (cannot infer type-variable(s) R (actual and formal argument lists differ in length))
This is how I create a keyed windowed stream:
timestampedStreamElementDataStream .keyBy(StreamElement::getId) .window(SlidingEventTimeWindows.of(Time.seconds(600),Time.seconds(60))) .process(new ComputeFeatures());
And here is how my ComputeFeatures function looks like:
public class ComputeFeatures extends ProcessWindowFunction< StreamElement, StreamElement, Long, TimeWindow> { @Override public void process(Long key, Context context, Iterable<StreamElement> elements, Collector<StreamElement> out) throws Exception { System.out.println("In windowed function"); } }
The StreamElement::getId
returns a Long
so everything regarding types should be correct, but it seems that Flink still has trouble inferring a type. I am looking for ideas how to solve this.
NOTE: This issue seems related but it didn’t fit my problem: LINK
EDIT 1:
As suggested by David I tried autogenerating the overridden process
function with IntelliJ, but the issue still remains the same. The autogenerated code looks like this in case of specifying types:
public class ComputeFeatures extends ProcessWindowFunction<StreamElement,StreamElement,Long,TimeWindow> { @Override public void process(Long aLong, ProcessWindowFunction<StreamElement, StreamElement, Long, TimeWindow>.Context context, Iterable<StreamElement> elements, Collector<StreamElement> out) throws Exception { }
And like this when I omit type specification:
public class ComputeFeatures extends ProcessWindowFunction { @Override public void process(Object o, Context context, Iterable elements, Collector out) throws Exception { System.out.println("In windowed function"); }
EDIT 2:
Maybe relevant: When I hover over new ComputeFeatures()
IntelliJ displays this infobox:
Required type: ProcessWindowFunction <com.xyz.myPackage.entities.StreamElement, R, java.lang.Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow> Provided: ComputeFeatures reason: no instance(s) of type variable(s) R exist so that ComputeFeatures conforms to ProcessWindowFunction<StreamElement, R, Long, TimeWindow>
Advertisement
Answer
Eh stupid mistake, the code works as it is, the problem was that IntelliJ imported a wrong ProcessWindowFunction
(the Scala variant). After changing that everything worked as expected