Skip to content
Advertisement

Camel – Acknowledgement management with spring-rabbitmq

I am trying to write a simple consumer for a rabbitMQ queue on which a DLX (binded to another queue is configured). I am using camel 3.14.5 at the moment.

My camel route declaration looks like :

from("spring-rabbitmq:my-exchange?connectionFactory=#rabbitMQConnectionFactory&queues=my-queue")
            .onException(Exception.class)
                .log(LoggingLevel.ERROR, "Something went wrong!")
            .end()
            .process("myProcessor");

And my connectionFactory :

    <bean id="rabbitMQConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="uri" value="amqps://myhost:5671"/>
    <property name="username" value="my-user"/>
    <property name="password" value="my-password"/>
    <property name="virtualHost" value="my-virtual-host"/>
</bean>

But then when my processor throw an exception I get the following logs :

 16:33:10,441 ERROR [org.apache.camel.component.springrabbit.CamelDirectMessageListenerContainer] (pool-4-thread-5) Failed to invoke listener: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) [spring-retry-1.3.3.jar:]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) [spring-retry-1.3.3.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) [spring-retry-1.3.3.jar:]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) [spring-retry-1.3.3.jar:]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) [spring-retry-1.3.3.jar:]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.3.22.jar:5.3.22]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) [spring-aop-5.3.22.jar:5.3.22]
    at org.springframework.amqp.rabbit.listener.$Proxy52.invokeListener(Unknown Source) [:2.4.6]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1577) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1568) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1512) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1108) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1068) [spring-rabbit-2.4.6.jar:2.4.6]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.13.1.jar:5.13.1]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.13.1.jar:5.13.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_342]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_342]
    at java.lang.Thread.run(Thread.java:750) [rt.jar:1.8.0_342]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException
    ... 20 more
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1784) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674) [spring-rabbit-2.4.6.jar:2.4.6]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1589) [spring-rabbit-2.4.6.jar:2.4.6]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [rt.jar:1.8.0_342]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [rt.jar:1.8.0_342]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_342]
    at java.lang.reflect.Method.invoke(Method.java:498) [rt.jar:1.8.0_342]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.3.22.jar:5.3.22]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.3.22.jar:5.3.22]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.3.22.jar:5.3.22]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) [spring-retry-1.3.3.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) [spring-retry-1.3.3.jar:]
    ... 15 more
Caused by: org.apache.camel.RuntimeCamelException: my processing exception

How should I configure the queue/connection factory for this message to be nack and send to the DLX configure on the rabbitMQ broker ?

Advertisement

Answer

I don’t know camel, but from a Spring perspective, you could use a DeadLetterPublishingRecoverer instead of a RejectAndDontRequeueRecoverer; this will re-publish the failed message to the DLQ with exception information in the headers.

If you just want the original message moved to the DLQ by the broker, you must configure the queue with a dead-letter-exchange and routing key.

See QueueBuilder.

    /**
     * Set the dead-letter exchange to which to route expired or rejected messages.
     * @param dlx the dead-letter exchange.
     * @return the builder.
     * @since 2.2
     * @see #deadLetterRoutingKey(String)
     */
    public QueueBuilder deadLetterExchange(String dlx) {
        return withArgument("x-dead-letter-exchange", dlx);
    }

    /**
     * Set the routing key to use when routing expired or rejected messages to the
     * dead-letter exchange.
     * @param dlrk the dead-letter routing key.
     * @return the builder.
     * @since 2.2
     * @see #deadLetterExchange(String)
     */
    public QueueBuilder deadLetterRoutingKey(String dlrk) {
        return withArgument("x-dead-letter-routing-key", dlrk);
    }
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement