I need to parse the messages from confluent Kafka stored in Avro. But while applying filter it is giving null pointer exception, without filter i was able to write back into kafka but while applying filter, it is giving null pointer exception.
JavaScript
x
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:9092");
config.setProperty("group.id","topic");
String schemaRegistryUrl = "http://localhost:8091";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<topic1> streamIn = env
.addSource(
new FlinkKafkaConsumer<>(
"topic1",
ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
config
).setStartFromEarliest());
//Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception
DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
public boolean filter(topic1 user) throws Exception {
return user.get("col3").toString().equals("New");
}
})).print();
//Also let me know if there is any better way to do it as for me its just the start..)
Here is the Schema:
JavaScript
{
"namespace": "testclass",
"type": "record",
"name": "topic1",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"default": null,
"name": "col3",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "col4",
"type": [
"null",
"string"
]
}
]
}
Advertisement
Answer
When you have null
vallue in your topic for col3 and you trying to access it .toString()
you have your exception and it’s normal behavior for null.toString()
.
You should test if col3
is not null