Flink ElasticsearchSinkFunction not serializable in non-static method, but serializable in static method

Tags: , , , ,



I have a piece of code that only works inside static methods. If I put the code in a static method, then call it from a non-static method, it works. Never heard of anything like this and couldn’t find information online on it.

This works:

    public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
        startStatic(streamExecutionEnvironment);
    }

    private static void startStatic(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);


                        int endIndexExclusive = element.indexOf('"', 8);
                        String id = element.substring(7, endIndexExclusive);
                        IndexRequest indexRequest = Requests.indexRequest()
                                .index("myindexzzz")
                                .id(id)
                                .source(element, XContentType.JSON);

                        return indexRequest;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());
    }

This doesn’t work:

    public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);


                        int endIndexExclusive = element.indexOf('"', 8);
                        String id = element.substring(7, endIndexExclusive);
                        IndexRequest indexRequest = Requests.indexRequest()
                                .index("myindexzzz")
                                .id(id)
                                .source(element, XContentType.JSON);

                        return indexRequest;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());
    }

(Full) stack trace: Error1

The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

A possible reason is that when an anonymous class is created (at new ElasticsearchSinkFunction<String>() { ... }) in non-static context, it retains a reference to the enclosing instance (and you can access its fields). So the issue could be that when it tries to serialize the said anonymous class instance, it reaches the enclosing instance, and fails to serialize it. It doesn’t happen in static context, because the anonymous class doesn’t have enclosing instance. However, I tried to create a separate class that extends ElasticsearchSinkFunction<String> and using that, but it still failed to serialize, giving the same error but also saying that the enclosing instance is not serializable, meaning that it’s still trying to serialize the enclosing instance.

Note: Thanks to “Techno Chess, Atomic variation#1325” on the Programming Discussions Discord server for suggesting this possible reason.

Answer

The enclosing class was indeed being serialized. In order to make it work, implement Serialible on the class and add the serialVersionUID variable. Example below:

public abstract class Pipeline implements Serializable {

    private static final long serialVersionUID = 1L;

    ...
}

This makes the classes that extend Pipeline be serialible and work properly. You could, obviously, also implement the serializable interface in a non abstract class and add the variable and it would work. The classes that must be serializable are the ones that provide the Flink functions such as ElasticsearchSinkFunction.



Source: stackoverflow