I am working through my first sample of Kafka Streams:
package com.example; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; class DslExample { public static void main(String[] args) { // the builder is used to construct the topology StreamsBuilder builder = new StreamsBuilder(); // read from the source topic, "users" KStream<Void, String> stream = builder.stream("users"); // for each record that appears in the source topic, // print the value stream.foreach( (key, value) -> { System.out.println("(DSL) Hello, " + value); }); // you can also print using the `print` operator // stream.print(Printed.<String, String>toSysOut().withLabel("source")); // set the required properties for running Kafka Streams Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.24:29092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // build the topology and start streaming KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // close Kafka Streams when the JVM shuts down (e.g. SIGTERM) Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
When I attempt to run it I get this error:
Caused by: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer.
This is a sample message from the “users” topic:
Value:
{ "registertime": 1517518703752, "userid": "User_8", "regionid": "Region_7", "gender": "OTHER" }
Header:
[ { "key": "task.generation", "stringValue": "0" }, { "key": "task.id", "stringValue": "0" }, { "key": "current.iteration", "stringValue": "86144" } ]
Key:
User_8
What should I do to avoid this problem?
Advertisement
Answer
If the key actually has data, you shouldn’t be using Serdes.Void()
or KStream<Void,