Skip to content
Advertisement

Messages not consumed when using wildcard queues in ActiveMQ Artemis

If I create queues on ActiveMQ Artemis test.A and a wildcard queue test.# then I can send a message to test.A and it will also be delivered to test.#. However, I am surprised to learn that when I consume the message from test.# then the message is still present on test.A

How can I change my code or configuration to get the expected behavior?

Example code:

import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;

public class Main {
    static String AMQ = "vm://0";

    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ server = null;
        try {
            server = createEmbeddedBroker();

            var serverLocator = ActiveMQClient.createServerLocator(AMQ);
            var clientSessionFactory = serverLocator.createSessionFactory();

            createQueues(clientSessionFactory);

            // queues are empty on creation
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0);
                assertQueueLength(session, "test.A", 0);
            }
            
            sendMessage(clientSessionFactory, "test.A");

            // expect message is delivered to both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 1);
                assertQueueLength(session, "test.A", 1);
            }

            consumeMessage(clientSessionFactory, "test.#");

            // expect message is consumed from both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0); // ok - message gone
                assertQueueLength(session, "test.A", 0); // fails! 
            }

        } finally {
            if (server != null) server.stop();
        }
    }

    private static EmbeddedActiveMQ createEmbeddedBroker() throws Exception {
        var config = new ConfigurationImpl();
        config.addAcceptorConfiguration("vm", AMQ);
        config.setSecurityEnabled(false);
        config.setPersistenceEnabled(false);

        var server = new EmbeddedActiveMQ();
        server.setConfiguration(config);
        server.start();

        return server;
    }

    private static void createQueues(ClientSessionFactory csf)  throws Exception {
        var session = csf.createSession();

        /*
        <address name="test.A">
            <anycast>
                <queue name="test.A" />
            </anycast>
        </address>
        */
        var testA = new QueueConfiguration("test.A")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.A");
        session.createQueue(testA);

        /*
        <address name="test.#">
            <anycast>
                <queue name="test.#" />
            </anycast>
        </address>
        */
        var testWildcard = new QueueConfiguration("test.#")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.#");
        session.createQueue(testWildcard);

        // also tried to create address without a queue, but the message to test.A is not delivered to test.#
        // session.createAddress(new SimpleString("test.#"), RoutingType.ANYCAST, false);
    }

    private static void sendMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var producer = session.createProducer(queue);
        producer.send(session.createMessage(true));
        producer.close();
        session.close();
    }

    private static void consumeMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var consumer = session.createConsumer(queue);
        consumer.setMessageHandler(message -> {
            try {
                log("Consuming one message from " + queue);
                message.acknowledge();
                log("Consumed one message from " + queue);
            } catch (ActiveMQException e) {
                throw new IllegalStateException(e);
            }
        });
        session.start();

        Thread.sleep(1000); // hack to wait

        consumer.close();
        session.close();
    }

    private static void assertQueueLength(ClientSession session, String queue, long expected) throws Exception {
        long actual = session.queueQuery(SimpleString.toSimpleString(queue)).getMessageCount();
        if (actual != expected) {
            throw new IllegalStateException("Queue " + queue + " has " + actual + " messages. Expected " + expected);
        } else {
            log("Queue " + queue + " has " + actual + " messages as expected");
        }
    }

    private static void log(String msg) {
        System.out.println(">>> " + msg);
    }
}

Dependencies:

org.apache.activemq:artemis-core-client:2.17.0
org.apache.activemq:artemis-server:2.17.0

Advertisement

Answer

What you are seeing is the expected behavior. The key thing to keep in mind here is that you’re leveraging wildcard routing and not wildcard consuming. With wildcard routing messages are not only routed to the queues of the address where the message is explicitly sent but also to any queues on matching wildcard addresses. Each queue to which the message is routed has its own copy of the message.

Wildcard routing was implemented with multicast (i.e. pub/sub) use-cases in mind (e.g. hierarchical topics), but there are a few options if you want to use it with anycast:

  • Accept the semantics as-is.
  • Create the address test.A without a queue, e.g.:
    session.createAddress(SimpleString.toSimpleString("test.A"), RoutingType.ANYCAST, false);
    
    This is a perfectly valid configuration, but you won’t be able to consume messages from test.A directly since no such queue exists. You’d only be able to consume messages from the queue(s) on the wildcard address.
  • Set purge-on-no-consumer to true on the test.A queue, e.g.:
    var testA = new QueueConfiguration("test.A")
        .setRoutingType(RoutingType.ANYCAST)
        .setPurgeOnNoConsumers(true)
        .setAddress("test.A");
    
    This setting will allow the queue to receive messages while a consumer is connected, but as soon as the last consumer disconnects the queue will be purged of all messages and as long as there are no consumers messages will not be routed to it.

Using a wildcard from a consumer to receive messages from multiple queues is a nice convenience feature, but it’s not implemented in ActiveMQ Artemis. However, it shouldn’t be very difficult to create multiple consumers.

User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement