I am trying to write a simple consumer for a rabbitMQ queue on which a DLX (binded to another queue is configured). I am using camel 3.14.5 at the moment.
My camel route declaration looks like :
from("spring-rabbitmq:my-exchange?connectionFactory=#rabbitMQConnectionFactory&queues=my-queue") .onException(Exception.class) .log(LoggingLevel.ERROR, "Something went wrong!") .end() .process("myProcessor");
And my connectionFactory :
<bean id="rabbitMQConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="uri" value="amqps://myhost:5671"/> <property name="username" value="my-user"/> <property name="password" value="my-password"/> <property name="virtualHost" value="my-virtual-host"/> </bean>
But then when my processor throw an exception I get the following logs :
16:33:10,441 ERROR [org.apache.camel.component.springrabbit.CamelDirectMessageListenerContainer] (pool-4-thread-5) Failed to invoke listener: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) [spring-retry-1.3.3.jar:] at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) [spring-retry-1.3.3.jar:] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) [spring-retry-1.3.3.jar:] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) [spring-retry-1.3.3.jar:] at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) [spring-retry-1.3.3.jar:] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.3.22.jar:5.3.22] at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) [spring-aop-5.3.22.jar:5.3.22] at org.springframework.amqp.rabbit.listener.$Proxy52.invokeListener(Unknown Source) [:2.4.6] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1577) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1568) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1512) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1108) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1068) [spring-rabbit-2.4.6.jar:2.4.6] at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.13.1.jar:5.13.1] at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.13.1.jar:5.13.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [rt.jar:1.8.0_342] Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException ... 20 more Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1784) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674) [spring-rabbit-2.4.6.jar:2.4.6] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1589) [spring-rabbit-2.4.6.jar:2.4.6] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [rt.jar:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [rt.jar:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) [rt.jar:1.8.0_342] at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.3.22.jar:5.3.22] at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.3.22.jar:5.3.22] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.3.22.jar:5.3.22] at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) [spring-retry-1.3.3.jar:] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) [spring-retry-1.3.3.jar:] ... 15 more Caused by: org.apache.camel.RuntimeCamelException: my processing exception
How should I configure the queue/connection factory for this message to be nack and send to the DLX configure on the rabbitMQ broker ?
Advertisement
Answer
I don’t know camel, but from a Spring perspective, you could use a DeadLetterPublishingRecoverer
instead of a RejectAndDontRequeueRecoverer
; this will re-publish the failed message to the DLQ with exception information in the headers.
If you just want the original message moved to the DLQ by the broker, you must configure the queue with a dead-letter-exchange and routing key.
See QueueBuilder
.
/** * Set the dead-letter exchange to which to route expired or rejected messages. * @param dlx the dead-letter exchange. * @return the builder. * @since 2.2 * @see #deadLetterRoutingKey(String) */ public QueueBuilder deadLetterExchange(String dlx) { return withArgument("x-dead-letter-exchange", dlx); } /** * Set the routing key to use when routing expired or rejected messages to the * dead-letter exchange. * @param dlrk the dead-letter routing key. * @return the builder. * @since 2.2 * @see #deadLetterExchange(String) */ public QueueBuilder deadLetterRoutingKey(String dlrk) { return withArgument("x-dead-letter-routing-key", dlrk); }