I want to implement Kafka producer which sends and receives Java Serialized Objects. I tried this:
Producer:
@Configuration public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); /* Serialization configuration */ return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
Send object:
@Autowired private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate; private static String topic = "tp-sale"; private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj); }
Consumer:
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
// Receive Object
@KafkaListener(topics = "tp-sale") public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj; }
When I deploy the Producer I get error during deployment:
Caused by: org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.SaleResponseFactory is not an instance of org.apache.kafka.common.serialization.Deserializer
Custom object
import org.apache.kafka.common.serialization.Serializer; @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleRequestFactory implements Serializable, Serializer { private static final long serialVersionUID = 1744050117179344127L; private int id; @Override public byte[] serialize(String s, Object o) { return new byte[0]; } }
import org.apache.kafka.common.serialization.Deserializer; @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable, Deserializer { private static final long serialVersionUID = 1744050117179344127L; private String unique_id; @Override public Object deserialize(String s, byte[] bytes) { return null; } }
Do you know how I can fix this issue?
EDIT: I tried this:
Producer:
@Configuration public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
Send object:
@Autowired private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate; private static String topic = "tp-sale"; private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj); }
Consumer:
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
// Receive Object
@KafkaListener(topics = "tp-sale") public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj; }
Custom objects
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleRequestFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private int id; } public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> { @Override public byte[] serialize(String topic, SaleRequestFactory data) { // convert data to byte[] ByteArrayOutputStream out = new ByteArrayOutputStream(); try { ObjectOutputStream outputStream = new ObjectOutputStream(out); outputStream.writeObject(data); out.close(); } catch (IOException e) { e.printStackTrace(); } return out.toByteArray(); } } @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private String unique_id; } public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> { @Override public SaleResponseFactory deserialize(String topic, byte[] data) { // convert data to SaleResponseFactory SaleResponseFactory saleResponseFactory = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream in = new ObjectInputStream(bis); saleResponseFactory = (SaleResponseFactory) in.readObject(); in.close(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return saleResponseFactory; } }
When I try to send message I get error:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: java.lang.ClassCastException: null 21:27:51.152 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:296] - Commit list: {} 21:27:51.153 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:835) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: java.lang.ClassCastException: null
Do you know how I can fix this issue?
EDIT: I managed to implement these improvements:
Producer:
@Configuration public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() { return new KafkaTemplate<>(saleRequestFactoryProducerFactory()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
Send object:
@Autowired private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate; private static String topic = "tp-sale"; private void perform(){ SaleRequestFactory obj = new SaleRequestFactory(); obj.setId(100); ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj); }
Consumer:
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; private String groupId = "test"; @Bean public ConsumerFactory<String, SaleResponseFactory> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Receive Object
@KafkaListener(topics = "tp-sale") public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception { System.out.println(tf.getId()); SaleResponseFactory resObj = new SaleResponseFactory(); resObj.setUnique_id("123123"); return resObj; }
Custom objects
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleRequestFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private int id; } public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> { @Override public byte[] serialize(String topic, SaleRequestFactory data) { // convert data to byte[] ByteArrayOutputStream out = new ByteArrayOutputStream(); try { ObjectOutputStream outputStream = new ObjectOutputStream(out); outputStream.writeObject(data); out.close(); } catch (IOException e) { e.printStackTrace(); } return out.toByteArray(); } } @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements Serializable{ private static final long serialVersionUID = 1744050117179344127L; private String unique_id; } public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> { @Override public SaleResponseFactory deserialize(String topic, byte[] data) { // convert data to SaleResponseFactory SaleResponseFactory saleResponseFactory = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream in = new ObjectInputStream(bis); saleResponseFactory = (SaleResponseFactory) in.readObject(); in.close(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return saleResponseFactory; } }
When I send some message I get error:
13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - Listener method returned result [org.factory.SaleResponseFactory@69c400ab] - generating response message for it 13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - No replyTopic to handle the reply: org.factory.SaleResponseFactory@69c400ab
Do you know how I can solve this issue?
Advertisement
Answer
You are using different type to cast the object than what it was serialize with. Not sure why you need to do that. You can update your deserialize to something like below.
public class SaleRequestFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> { @Override public SaleRequestFactory deserialize(String topic, byte[] data) { ... saleRequestFactory = (SaleRequestFactory) in.readObject(); } }
java.lang.ClassCastException: null
This also means your serialization didn’t work as expected. Make sure you have valid payload before you try to cast.