Skip to content
Advertisement

Kafka Consumer stuck at deserialization

I created a simple producer-consumer app that using a custom Serializer and Deserializer.

After adding a new method to the Message class that I produce, the consumer started being stack at deserialization. My producer is using the new class (with the new method) and the consumer is using the old class (without the method).

I didn’t add any new data members to the message that is sent!

I have multiple class messages that all are based on the following base class:

public enum MessageType{
    A,
    B,
    C,
}

public class Message implements Serializable {

    protected long id;
    protected MessageType type;

    public Message(int id , MessageType type) {
        this.id=id;
        this.type=type;
    }

Each one of the messages classes add some data members that are relevant to that message type.

For example:

public class MessageA extends Message{
    private String name;

    public MessageA(int id, String name) {
        super(id,MessageType.A);
        this.name = name;
    }

My Serializer:

public class MessageSerializer implements Serializer<Message> {
    @Override
    public byte[] serialize(String topic, Message msg) {
        return SerializationUtils.serialize(msg);    
}

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public void close() {}
}

My Deserializer :

public class MessageDeserializer implements Deserializer<Message> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public Message deserialize(String topic, byte[] data) {
        if(data == null)
            return null;
        log.info("Deserializing msg");
        Message msg= (Message) SerializationUtils.deserialize(data);
        if(msg== null){
            log.warn("Deserialization failed, got null as a result.");
            return null;
        }
        log.info("Deserialization complete");
        return msg;
    }

    @Override
    public void close() {}
    
    }

I added a new method to MessageC class which is a subclass of Message. The new class version is available only on the producer and not on the consumer. Although I didn’t change the schema of the class, can that change cause a problem in the deserialization in the consumer?

After producing a MessageC message, my consumer printed "Deserializing msg" but it is stuck/failed since it didn’t print any exception or "Deserialization complete".

Can JSONSerializer/Deserializer handles those type of fixes? If I’ll use JSONSerialzier it should care only regarding the schema of the class, right?

Answer

If you use JsonSerializer it should not be affected by changes to the methods – only changes to the data fields. Other serializers can serialize the whole object including the methods.

Advertisement