I need to parse the messages from confluent Kafka stored in Avro. But while applying filter it is giving null pointer exception, without filter i was able to write back into kafka but while applying filter, it is giving null pointer exception.
public static void main(String[] args) throws Exception { Properties config = new Properties(); config.setProperty("bootstrap.servers", "localhost:9092"); config.setProperty("group.id","topic"); String schemaRegistryUrl = "http://localhost:8091"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<topic1> streamIn = env .addSource( new FlinkKafkaConsumer<>( "topic1", ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl), config ).setStartFromEarliest()); //Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() { public boolean filter(topic1 user) throws Exception { return user.get("col3").toString().equals("New"); } })).print(); //Also let me know if there is any better way to do it as for me its just the start..)
Here is the Schema:
{ "namespace": "testclass", "type": "record", "name": "topic1", "fields": [ { "name": "col1", "type": "string" }, { "name": "col2", "type": "string" }, { "default": null, "name": "col3", "type": [ "null", "string" ] }, { "default": null, "name": "col4", "type": [ "null", "string" ] } ] }
Advertisement
Answer
When you have null
vallue in your topic for col3 and you trying to access it .toString()
you have your exception and it’s normal behavior for null.toString()
.
You should test if col3
is not null