I am a beginner of Flink, and I tried to use Flink to run LFM, one of the recommended algorithms, but the following errors appeared in my code when it was running. I tried to find and modify them, but they were not solved. Could someone tell me why I had problems?
Here are my main exception
The implementation of the StreamExecutionEnvironment is not serializable
And my code here
Noted that sourceDataStream
is from my custom source extends RichFunction<Tuple3<>>
//training model for (int iter = 0; iter < iterations; iter++) { sourceDataStream // the exception appears here .process(new ProcessFunction<Tuple3<String, String, Double>, Object>() { @Override public void processElement(Tuple3<String, String, Double> in, ProcessFunction<Tuple3<String, String, Double>, Object>.Context context, Collector<Object> collector) throws Exception { Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment); Double err_ui = in.f2 - hat_rui; Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0)); Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1)); DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE))); pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() { @Override public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception { DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE))); qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() { @Override public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception { double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0)); double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0)); List<Double> pList = new ArrayList<>(); List<Double> qList = new ArrayList<>(); for (int fIter = 0; fIter < F; fIter++) { Double pValueLast = userTuple.f1.f1.get(fIter); Double qValueLast = itemTuple.f1.f1.get(fIter); Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast)); Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast)); pList.add(pValueNew); qList.add(qValueNew); } streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")"); streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")"); } }); } }); } }); }
Advertisement
Answer
There are a few things about this that aren’t going to work:
In the implementation of any user function (such as a ProcessFunction
) you cannot have a DataStream
, or a Table
, or a StreamExecutionEnvironment
, or another ProcessFunction
. All you can do is react to an incoming stream record, optionally using state you have built up inside that function based on the previously processed records.
The DataStream
and Table
APIs are organized around a builder paradigm, with which you are describing a streaming dataflow pipeline. This pipeline must be a directed acyclic graph: it can split and merge but must flow from sources to sinks without any loops. The stages of that pipeline (e.g., a ProcessFunction
) must be coded as independent blocks — they cannot reach outside of themselves to access data from other pipeline stages.
This paradigm isn’t well suited for the purpose of training machine learning models (since training involves iterating/looping). If that’s your objective, maybe take a look at https://github.com/apache/flink-ml.