I have a case where i want to publish message from Kafka Producer, My message is just a POJO object e.g CreateRequest. So for consuming I have added below code
@Bean @InboundChannelAdapter(channel = "inputchannel", poller=@Poller(fixedDelay = "5000",errorChannel = "errorChannel")) public KafkaMessageSource consumeMsg() { ConsumerFactory<String, String> cf = consumerFactory(); KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (cf, new ConsumerProperties("Kafka_Topic")); kafkaMessageSource.getConsumerProperties().setGroupId("group_id"); kafkaMessageSource.getConsumerProperties().setClientId("clientid"); kafkaMessageSource.setMessageConverter(messageConverter()); kafkaMessageSource.setPayloadType(CreateResponse.class); return kafkaMessageSource; } @Bean public ConsumerFactory consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class); return new DefaultKafkaConsumerFactory(props); } @Bean RecordMessageConverter messageConverter(){ return new StringJsonMessageConverter(); }
Also,I have added setMessageConverter and setPayloadType to get response of type CreateResponse but still i am getting response of type KafkaMessageSource which is throwing java.lang.ClassCastException cannot cast KafkaMessageSource to type CreateResponse
kafkaMessageSource.setMessageConverter(messageConverter()); kafkaMessageSource.setPayloadType(CreateResponse.class);
Stacktrace:-
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@6e6d85cd]; nested exception is java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:104) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:444) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:428) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE] at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311] Caused by: java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse at com.kafka.response.service.MessageConsumerQueue.consume(MessageConsumerQueue.java:55) ~[classes!/:1.0-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_311] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_311] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_311] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_311] at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:387) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:614) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:585) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE] ... 31 more
Can somebody tell where i am going wrong?
Advertisement
Answer
Here is a simple Spring Boot app which demonstrates that intention of the mentioned configuration is correct:
@SpringBootApplication public class So71806313Application { public static void main(String[] args) { SpringApplication.run(So71806313Application.class, args); } @Bean RecordMessageConverter messageConverter(){ return new StringJsonMessageConverter(); } @Bean @InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000")) public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory, RecordMessageConverter messageConverter) { KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties("Kafka_Topic")); kafkaMessageSource.getConsumerProperties().setGroupId("group_id"); kafkaMessageSource.getConsumerProperties().setClientId("clientid"); kafkaMessageSource.setMessageConverter(messageConverter); kafkaMessageSource.setPayloadType(CreateResponse.class); return kafkaMessageSource; } @Bean QueueChannel inputChannel() { return new QueueChannel(); } }
I’m not sure what is your CreateResponse
, so I have it as simple as this:
public class CreateResponse { private String name; public void setName(String name) { this.name = name; } public String getName() { return name; } }
Then I have a integration test to verify it against an embedded Kafka:
@SpringBootTest @DirtiesContext @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", topics = "Kafka_Topic") class So71806313ApplicationTests { @Autowired QueueChannel inputChannel; @Autowired KafkaTemplate<String, String> kafkaTemplate; @Test void contextLoads() { this.kafkaTemplate.send("Kafka_Topic", "{"name" : "foo"}"); Message<?> receive = this.inputChannel.receive(10_000); assertThat(receive).isNotNull(); assertThat(receive.getPayload()) .isInstanceOf(CreateResponse.class) .extracting("name") .isEqualTo("foo"); } }
If I change that consumeMsg()
to this (pay attention to the commented @Bean
):
@Autowired ConsumerFactory<String, String> consumerFactory;
// @Bean @InboundChannelAdapter(channel = “inputChannel”, poller=@Poller(fixedDelay = “5000”)) public KafkaMessageSource consumeMsg() {
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource(consumerFactory, new ConsumerProperties("Kafka_Topic")); kafkaMessageSource.getConsumerProperties().setGroupId("group_id"); kafkaMessageSource.getConsumerProperties().setClientId("clientid"); kafkaMessageSource.setMessageConverter(new StringJsonMessageConverter()); kafkaMessageSource.setPayloadType(CreateResponse.class); return kafkaMessageSource; }
Then it indeed fails with the:
java.lang.AssertionError: Expecting actual: org.springframework.integration.kafka.inbound.KafkaMessageSource@305289b3 to be an instance of: org.springframework.integration.stackoverflow.so71806313.CreateResponse but was instance of: org.springframework.integration.kafka.inbound.KafkaMessageSource
So, please, revise the configuration on your side.