Skip to content
Advertisement

Parsing Avro messages in flink , giving null pointer exception if field is nullable in Avro Schema

I need to parse the messages from confluent Kafka stored in Avro. But while applying filter it is giving null pointer exception, without filter i was able to write back into kafka but while applying filter, it is giving null pointer exception.

 public static void main(String[] args) throws Exception {

        Properties config = new Properties();
        config.setProperty("bootstrap.servers", "localhost:9092");
        config.setProperty("group.id","topic");
        String schemaRegistryUrl  = "http://localhost:8091";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<topic1> streamIn = env
                .addSource(
                        new FlinkKafkaConsumer<>(
                                "topic1",
                                ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
                                config
                        ).setStartFromEarliest());
                        
        //Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception

        DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
            public boolean filter(topic1 user) throws Exception {
                return user.get("col3").toString().equals("New");
            }
        })).print();        
        //Also let me know if there is any better way to do it as for me its just the start..)

Here is the Schema:

{
  "namespace": "testclass",
  "type": "record",
  "name": "topic1",
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    },
    {
      "default": null,
      "name": "col3",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "col4",
      "type": [
        "null",
        "string"
      ]
    }
    ]
}

Advertisement

Answer

When you have null vallue in your topic for col3 and you trying to access it .toString() you have your exception and it’s normal behavior for null.toString().

You should test if col3 is not null

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement