If I create queues on ActiveMQ Artemis test.A
and a wildcard queue test.#
then I can send a message to test.A
and it will also be delivered to test.#
. However, I am surprised to learn that when I consume the message from test.#
then the message is still present on test.A
How can I change my code or configuration to get the expected behavior?
Example code:
import org.apache.activemq.artemis.api.core.*; import org.apache.activemq.artemis.api.core.client.*; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; public class Main { static String AMQ = "vm://0"; public static void main(String[] args) throws Exception { EmbeddedActiveMQ server = null; try { server = createEmbeddedBroker(); var serverLocator = ActiveMQClient.createServerLocator(AMQ); var clientSessionFactory = serverLocator.createSessionFactory(); createQueues(clientSessionFactory); // queues are empty on creation try (var session = clientSessionFactory.createSession()) { assertQueueLength(session, "test.#", 0); assertQueueLength(session, "test.A", 0); } sendMessage(clientSessionFactory, "test.A"); // expect message is delivered to both try (var session = clientSessionFactory.createSession()) { assertQueueLength(session, "test.#", 1); assertQueueLength(session, "test.A", 1); } consumeMessage(clientSessionFactory, "test.#"); // expect message is consumed from both try (var session = clientSessionFactory.createSession()) { assertQueueLength(session, "test.#", 0); // ok - message gone assertQueueLength(session, "test.A", 0); // fails! } } finally { if (server != null) server.stop(); } } private static EmbeddedActiveMQ createEmbeddedBroker() throws Exception { var config = new ConfigurationImpl(); config.addAcceptorConfiguration("vm", AMQ); config.setSecurityEnabled(false); config.setPersistenceEnabled(false); var server = new EmbeddedActiveMQ(); server.setConfiguration(config); server.start(); return server; } private static void createQueues(ClientSessionFactory csf) throws Exception { var session = csf.createSession(); /* <address name="test.A"> <anycast> <queue name="test.A" /> </anycast> </address> */ var testA = new QueueConfiguration("test.A") .setRoutingType(RoutingType.ANYCAST) .setAddress("test.A"); session.createQueue(testA); /* <address name="test.#"> <anycast> <queue name="test.#" /> </anycast> </address> */ var testWildcard = new QueueConfiguration("test.#") .setRoutingType(RoutingType.ANYCAST) .setAddress("test.#"); session.createQueue(testWildcard); // also tried to create address without a queue, but the message to test.A is not delivered to test.# // session.createAddress(new SimpleString("test.#"), RoutingType.ANYCAST, false); } private static void sendMessage(ClientSessionFactory csf, String queue) throws Exception { var session = csf.createSession(); var producer = session.createProducer(queue); producer.send(session.createMessage(true)); producer.close(); session.close(); } private static void consumeMessage(ClientSessionFactory csf, String queue) throws Exception { var session = csf.createSession(); var consumer = session.createConsumer(queue); consumer.setMessageHandler(message -> { try { log("Consuming one message from " + queue); message.acknowledge(); log("Consumed one message from " + queue); } catch (ActiveMQException e) { throw new IllegalStateException(e); } }); session.start(); Thread.sleep(1000); // hack to wait consumer.close(); session.close(); } private static void assertQueueLength(ClientSession session, String queue, long expected) throws Exception { long actual = session.queueQuery(SimpleString.toSimpleString(queue)).getMessageCount(); if (actual != expected) { throw new IllegalStateException("Queue " + queue + " has " + actual + " messages. Expected " + expected); } else { log("Queue " + queue + " has " + actual + " messages as expected"); } } private static void log(String msg) { System.out.println(">>> " + msg); } }
Dependencies:
org.apache.activemq:artemis-core-client:2.17.0 org.apache.activemq:artemis-server:2.17.0
Advertisement
Answer
What you are seeing is the expected behavior. The key thing to keep in mind here is that you’re leveraging wildcard routing and not wildcard consuming. With wildcard routing messages are not only routed to the queues of the address where the message is explicitly sent but also to any queues on matching wildcard addresses. Each queue to which the message is routed has its own copy of the message.
Wildcard routing was implemented with multicast (i.e. pub/sub) use-cases in mind (e.g. hierarchical topics), but there are a few options if you want to use it with anycast:
- Accept the semantics as-is.
- Create the address
test.A
without a queue, e.g.:This is a perfectly valid configuration, but you won’t be able to consume messages fromsession.createAddress(SimpleString.toSimpleString("test.A"), RoutingType.ANYCAST, false);
test.A
directly since no such queue exists. You’d only be able to consume messages from the queue(s) on the wildcard address. - Set purge-on-no-consumer to
true
on thetest.A
queue, e.g.:This setting will allow the queue to receive messages while a consumer is connected, but as soon as the last consumer disconnects the queue will be purged of all messages and as long as there are no consumers messages will not be routed to it.var testA = new QueueConfiguration("test.A") .setRoutingType(RoutingType.ANYCAST) .setPurgeOnNoConsumers(true) .setAddress("test.A");
Using a wildcard from a consumer to receive messages from multiple queues is a nice convenience feature, but it’s not implemented in ActiveMQ Artemis. However, it shouldn’t be very difficult to create multiple consumers.