I am learning Queue Length Limit(https://www.rabbitmq.com/maxlength.html), as it says, queue is set to 'x-max-length:10'
,and 'x-overflow:reject-publish'
, and also, I enable publisher confirms
. So, when the number of messages in the queue reaches 10, the publisher will be informed of the reject via a basic.nack
message.
And it is: my confirm callback got a false ack, but cause
is null
. I’m wondering shouldn’t it return something so that I can distinguish this situation. Part of the code is as follows:
@Bean
public AmqpTemplate amqpTemplate(@Autowired CachingConnectionFactory amqpConnectionFactory) {
amqpConnectionFactory.setPublisherReturns(true);
amqpConnectionFactory.setPublisherConfirms(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(amqpConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
return rabbitTemplate;
}
static RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(ack); // when number of messages reach 10, print false
System.out.println(cause); // when number of messages reach 10, print null
}
};
@Bean
public Queue queue() {
return QueueBuilder.durable(DURABLE_QUEUE).withArgument("x-max-length", 10).withArgument("x-overflow", "reject-publish").build();
}
@Scheduled(fixedDelay = 1000L)
public void produce() {
Message msg = new Message(UUID.randomUUID().toString(), "sth");
amqpTemplate.convertAndSend("sth", "sth", msg );
}
Advertisement
Answer
Unfortunately, the AMQP protocol and Java client provides no information about why a publish failed. Only ack/nack and whether the confirmation is for multiple messages:
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple)
throws IOException;
void handleNack(long deliveryTag, boolean multiple)
throws IOException;
}
We added the cause
because, in some circumstances, the framework synthesizes a nack (for example when a channel is closed while we are waiting for confirmations, where we add Channel closed by application
as the cause
.
The framework can’t speculate the reason for which we got a nack from the broker.