I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable “boxes”.
If I run the console consumer these are coming as empty strings
This is the WordCount code I wrote
package streams; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; public class WordCount { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); // topology StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> input = builder.stream("temp-in"); KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words .selectKey((k, v) -> v) // changing the key .groupByKey().count().toStream(); // getting count after groupBy fil.to("temp-out"); KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.start(); System.out.println(streams.toString()); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
This is the output I am getting in the consumer. It is there on the right side in image
I had tried casting the long to long again to see if it works. But it’s not working
I am attaching the consumer code too if it helps.
package tutorial; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Once the consumer starts running it keeps running even after we stop in console // We should create new consumer to read from earliest because the previous one had already consumed until certain offset // when we run the same consumer in two consoles kafka detects it and re balances // In this case the consoles split the partitions they consume forming a consumer group properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -> consumer id properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -> From when consumer gets data KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("temp-out")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record: consumerRecords) { System.out.println(record.key() + " " + record.value()); System.out.println(record.partition() + " " + record.offset()); } } } }
Any help is appreciated. Thanks in advance.
Advertisement
Answer
The message value you’re writing with Kafka Streams is a Long
, and you’re consuming it as a String
.
If you make the following changes to your Consumer
class, you’ll be able to see the count printed correctly to stdout:
// Change this from StringDeserializer to LongDeserializer. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); ... // The value you're consuming here is a Long, not a String. KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("temp-out")); while (true) { ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, Long> record : consumerRecords) { System.out.println(record.key() + " " + record.value()); System.out.println(record.partition() + " " + record.offset()); } }