Message Producer using Kafka bindings of Spring cloud streams
@Component public static class PageViewEventSource implements ApplicationRunner { private final MessageChannel pageViewsOut; private final Log log = LogFactory.getLog(getClass()); public PageViewEventSource(AnalyticsBinding binding) { this.pageViewsOut = binding.pageViewsOut(); } @Override public void run(ApplicationArguments args) throws Exception { List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry"); List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about"); Runnable runnable = () -> { String rPage = pages.get(new Random().nextInt(pages.size())); String rName = pages.get(new Random().nextInt(names.size())); PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000); Serializer<PageViewEvent> serializer = new JsonSerde<>(PageViewEvent.class).serializer(); byte[] m = serializer.serialize(null, pageViewEvent); Message<byte[]> message = MessageBuilder .withPayload(m).build(); try { this.pageViewsOut.send(message); log.info("sent " + message); } catch (Exception e) { log.error(e); } }; Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS); }
This use below serialization
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$BytesSerde
I am Trying to consume these messages in separate Consumer Application via Spring Kafka – KafkaListener
@Service public class PriceEventConsumer { private static final Logger LOG = LoggerFactory.getLogger(PriceEventConsumer.class); @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory") public void receive(Bytes data){ //public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) { LOG.info("Message received"); LOG.info("received data='{}'", data); }
Container factory configuration
@Bean public ConsumerFactory<String, Bytes> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
With this Configuration Consumer is not picking up the messages(Bytes). If I change Kafka listener to accept String then it gives me below exception :
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory") public void receive(String data){ LOG.info("Message received"); LOG.info("received data='{}'", data); }
Caused by:
org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload={“userId”:”facebook”,”page”:”about”,”duration”:10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}], failedMessage=GenericMessage [payload={“userId”:”facebook”,”page”:”about”,”duration”:10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] … 23 more Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload={“userId”:”facebook”,”page”:”about”,”duration”:10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] … 22 more
Any pointers will be very helpful.
Updating POJO Part
Pojo Part ——
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory") public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) { LOG.info("Message received"); LOG.info("received data='{}'", data); }
Container factory configuration
@Bean public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(priceEventConsumerFactory()); return factory; }
Producer –
@Override public void run(ApplicationArguments args) throws Exception { List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry"); List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about"); Runnable runnable = () -> { String rPage = pages.get(new Random().nextInt(pages.size())); String rName = pages.get(new Random().nextInt(names.size())); PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000); Message<PageViewEvent> message = MessageBuilder .withPayload(pageViewEvent).build(); try { this.pageViewsOut.send(message); log.info("sent " + message); } catch (Exception e) { log.error(e); } };
Advertisement
Answer
You can deserialize the record from kfka into POJO, for versions <2.2.x use the MessageConverter
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean
@Bean public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false)); }
Or by using MessageConverter
@Bean public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; }