I need to parse the messages from confluent Kafka stored in Avro. But while applying filter it is giving null pointer exception, without filter i was able to write back into kafka but while applying filter, it is giving null pointer exception.
public static void main(String[] args) throws Exception { Properties config = new Properties(); config.setProperty("bootstrap.servers", "localhost:9092"); config.setProperty("group.id","topic"); String schemaRegistryUrl = "http://localhost:8091"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<topic1> streamIn = env .addSource( new FlinkKafkaConsumer<>( "topic1", ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl), config ).setStartFromEarliest()); //Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() { public boolean filter(topic1 user) throws Exception { return user.get("col3").toString().equals("New"); } })).print(); //Also let me know if there is any better way to do it as for me its just the start..)
Here is the Schema:
{ "namespace": "testclass", "type": "record", "name": "topic1", "fields": [ { "name": "col1", "type": "string" }, { "name": "col2", "type": "string" }, { "default": null, "name": "col3", "type": [ "null", "string" ] }, { "default": null, "name": "col4", "type": [ "null", "string" ] } ] }
When you have null
vallue in your topic for col3 and you trying to access it .toString()
you have your exception and it’s normal behavior for null.toString()
You should test if col3
is not null