I have an application that connects to a pub/sub and processes messages when the pub/sub subscription publishes. I want to be able to put these messages into a Queue Channel to avoid processing loads of messages at once. However, when i try to add a queue channel i get the below error ? So the way i see it, a message arrives in the inboundChannelAdaptor, outputs the message to the Queue channel, then the messageReciever pulls and actions the messages in the QueueChannel ?
java.lang.IllegalArgumentException: No poller has been defined for Annotation-based endpoint, and no default poller is available within the context. at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE] at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.configurePollingEndpoint(AbstractMethodAnnotationPostProcessor.java:435) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.doCreateEndpoint(AbstractMethodAnnotationPostProcessor.java:377) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:367) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:172) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na] at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE] at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:914) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE] at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] at Application.main(Application.java:65) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.3.RELEASE.jar:2.3.3.RELEASE]
Here is my implementation:
@Bean public MessageChannel inputMessageQueueChannel() { return new QueueChannel(50); } @Bean public PubSubInboundChannelAdapter inboundChannelAdapter( @Qualifier("inputMessageQueueChannel") MessageChannel messageChannel, PubSubTemplate pubSubTemplate) { PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,'sub-one'); adapter.setOutputChannel(messageChannel); adapter.setAckMode(AckMode.MANUAL); adapter.setPayloadType(String.class); return adapter; } @ServiceActivator(inputChannel = "inputMessageQueueChannel") public void messageReceiver( String payload, @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) { log.info("Payload: " + payload); message.ack(); }
Advertisement
Answer
The queue channel does nothing by itself unless it stores sent messages into a message store internally. To be able to receive messages from this queue you need to poll such a queue periodically. For this purpose the PollingConsumer
pattern is used, but it cannot do anything by itself: you need to say it how to poll that queue. So, the poller must be provided. See the poller
attribute of that @ServiceActivator
annotation. Or you can provide a global default one via PollerMetadata
bean definition with a PollerMetadata.DEFAULT_POLLER
name. See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer.
See also this question and its answers: Spring Integration No poller has been defined for endpoint