Skip to content
Advertisement

Kafka Streams: SerializationException: Size of data received by LongDeserializer is not 8

I have a small app to count the number of colors using Apache Kafka –

public class FavouriteColor {


    private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
    private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
    private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";

    private static final String APPLICATION_ID = "favourite-colour-java";


    public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);

        KStream<String, String> usersAndColours = textLines
            .filter((key, value) -> value.contains(","))
            .selectKey((key, value) -> value.split(",")[0].toLowerCase())
            .mapValues(value -> value.split(",")[1].toLowerCase())
            .filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));

        usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
        KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);

        KTable<String, Long> favouriteColours = usersAndColoursTable
            .groupBy((user, colour) -> new KeyValue<>(colour, colour))
            .count(Named.as("CountsByColours"));

        favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.cleanUp();
        streams.start();

        System.out.println(streams);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The topics are created and producers/ consumers are started using the terminal:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact



kafka-console-consumer --bootstrap-server localhost:9092 
    --topic favourite-colour-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer



kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input

I provided the following inputs into the terminal:

stephane,blue
john,green
stephane,red
alice,red

I received the error in the consumer terminal:

stephane    Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
    at scala.Option.map(Option.scala:242)
    at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
    at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

What’s the issue here? I did brief research and find similar questions asked by other people, but, the solutions seem not to work for me.

Advertisement

Answer

You defined the value deserializer to be that for Long, but it looks like your data is a String instead.

User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement