I have a piece of code that only works inside static methods. If I put the code in a static method, then call it from a non-static method, it works. Never heard of anything like this and couldn’t find information online on it.
This works:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) { startStatic(streamExecutionEnvironment); } private static void startStatic(StreamExecutionEnvironment streamExecutionEnvironment) { DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); // use a ElasticsearchSink.Builder to create an ElasticsearchSink ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); int endIndexExclusive = element.indexOf('"', 8); String id = element.substring(7, endIndexExclusive); IndexRequest indexRequest = Requests.indexRequest() .index("myindexzzz") .id(id) .source(element, XContentType.JSON); return indexRequest; } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build()); }
This doesn’t work:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) { DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); // use a ElasticsearchSink.Builder to create an ElasticsearchSink ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); int endIndexExclusive = element.indexOf('"', 8); String id = element.substring(7, endIndexExclusive); IndexRequest indexRequest = Requests.indexRequest() .index("myindexzzz") .id(id) .source(element, XContentType.JSON); return indexRequest; } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build()); }
The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
A possible reason is that when an anonymous class is created (at new ElasticsearchSinkFunction<String>() { ... }
) in non-static context, it retains a reference to the enclosing instance (and you can access its fields). So the issue could be that when it tries to serialize the said anonymous class instance, it reaches the enclosing instance, and fails to serialize it. It doesn’t happen in static context, because the anonymous class doesn’t have enclosing instance. However, I tried to create a separate class that extends ElasticsearchSinkFunction<String>
and using that, but it still failed to serialize, giving the same error but also saying that the enclosing instance is not serializable, meaning that it’s still trying to serialize the enclosing instance.
Note: Thanks to “Techno Chess, Atomic variation#1325” on the Programming Discussions Discord server for suggesting this possible reason.
Advertisement
Answer
The enclosing class was indeed being serialized. In order to make it work, implement Serialible on the class and add the serialVersionUID variable. Example below:
public abstract class Pipeline implements Serializable { private static final long serialVersionUID = 1L; ... }
This makes the classes that extend Pipeline be serialible and work properly. You could, obviously, also implement the serializable interface in a non abstract class and add the variable and it would work. The classes that must be serializable are the ones that provide the Flink functions such as ElasticsearchSinkFunction.