Skip to content
Advertisement

Rabbit MQ doesn’t flush acks?

The problem appeared in logs: Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?

We try to open handlers for like 50 queues dynamically by SimpleMessageListenerContainer.addQueueNames(), then application is started. It consumes some messages, but the RabbitMQ admin panel shows that they are unacked. After a significant amount of time, messages are stacking up to 6 unacked messages (queue has fairly low amount of messages per minute) per queue, which sums up to 300 messages total, something happens and they all become consumed and acked. While messages are unacked, the container seems to be trying to load another consumer until it bumps into the limit.

We rely on AUTO acknowledgment mode now, when it was MANUAL, it was fine.

There are two questions:

  1. What can be the reason for unacked messages? Is there any flushing mechanism that doesn’t trigger often?

  2. What do I do with “not enough threads” message?

Those two seem to be really related one to another.

Here’s the setup:

@Bean
fun queueMessageListenerContainer(
    connectionFactory: ConnectionFactory,
    retryOperationsInterceptor: RetryOperationsInterceptor,
    vehicleQueueListenerFactory: QueueListenerFactory,
): SimpleMessageListenerContainer {
    return SimpleMessageListenerContainer().also {
        it.connectionFactory = connectionFactory
        it.setConsumerTagStrategy { queueName -> consumerTag(queueName) }
        it.setMessageListener(vehicleQueueListenerFactory.create())
        it.setConcurrentConsumers(2)
        it.setMaxConcurrentConsumers(5)
        it.setListenerId("queue-consumer")
        it.setAdviceChain(retryOperationsInterceptor)
        it.setRecoveryInterval(RABBIT_HEARTH_BEAT.toMillis())
        //had 10-100 threads, didn't help
        it.setTaskExecutor(rabbitConsumersExecutorService)
        // AUTO suppose to set ack for the messages, right?
        it.acknowledgeMode = AcknowledgeMode.AUTO
    }
}

@Bean
fun connectionFactory(rabbitProperties: RabbitProperties): AbstractConnectionFactory {
    val rabbitConnectionFactory = com.rabbitmq.client.ConnectionFactory().also { connectionFactory ->
        connectionFactory.isAutomaticRecoveryEnabled = true
        connectionFactory.isTopologyRecoveryEnabled = true
        connectionFactory.networkRecoveryInterval = RABBIT_HEARTH_BEAT.toMillis()
        connectionFactory.requestedHeartbeat = RABBIT_HEARTH_BEAT.toSeconds().toInt()
        // was up to 100 connections, didn't help
        connectionFactory.setSharedExecutor(rabbitConnectionExecutorService)
        connectionFactory.host = rabbitProperties.host
        connectionFactory.port = rabbitProperties.port ?: connectionFactory.port
    }
    return CachingConnectionFactory(rabbitConnectionFactory)
        .also {
            it.cacheMode = rabbitProperties.cache.connection.mode
            it.connectionCacheSize = rabbitProperties.cache.connection.size
            it.setConnectionNameStrategy { "simulation-gateway:${springProfiles.firstOrNull()}:event-consumer" }
        }
}

class QueueListenerFactory { 
  fun create(){
     return MessageListener {
        try {
            // no ack, rely on AUTO acknowledgement mode
            handle()
        } catch (e: Throwable) {
            ...
        }
     }
  }
}

Advertisement

Answer

Okay, I figured out what the problem was. Basically, it couldn’t start all of the queues consumers in time, since it not only is slow process for too slow for SimpleMessageListenerContainer, but also we tried to addQueueNames one by one.

   userRepository.findAll()
        .map { user -> queueName(user) }
        .onEach { queueName ->
            simpleContainerListener.addQueueNames(queueName)
        }

But the following line of documentation for SimpleMessageListenerContainer remained unnoticed:

The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created 

Which means what actually happened is recreation of (1, 2, … N) consumers. What made it even worse is that if the request comes from the API, we did exactly the same simpleContainerListener.addQueueNames(queueName) after handling the request, which recreated all of consumers after that.

Also, recreation of the consumers was the reason why AUTO acknowledgement didn’t work: threads were hanging trying to build enough consumers before the timeout.

I fixed this by adding DirectMessageListenerContainer to handle recently added queues, which is blazing fast, compared to SimpleMessageListenerContainer for the particular case of adding just one new consumer.

DirectMessageListenerContainer(connectionFactory).also {
        it.setConsumerTagStrategy { queueName -> consumerTag(queueName, RECENT_CONSUMER_TAG) }
        it.setMessageListener(ListenerFactory.create())
        it.setListenerId("queue-consumer-recent")
        it.setAdviceChain(retryOperationsInterceptor)
        it.setTaskExecutor(recentQueuesTaskExecutor)
        it.acknowledgeMode = AcknowledgeMode.AUTO
    }

The downside is DirectMessageListenerContainer using 1 thread per queue on every instance. This is exactly why I didn’t want to use it in the first place, but using both DirectMessageListenerContainer for recent and SimpleContainerListener for already existing queues significantly reduces amount of thread required to handle those queues. As far as I understand, an overwhelming usage of DirectMessageListenerContainer will lead to OOM eventually, so the next step can be to transfer queues from direct to simple container listener in batches.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement