I’m using ActiveMQ Artemis 2.18.0 and version 2.5.5 of the spring-boot-starter-artemis
dependency on a Spring Boot client. In my use case clients are required to communicate with each other via topics. The issue is that the string jms.topic.
is getting prefixed to every topic defined on the client. For example the topic foo.sendInfo
becomes jms.topic.foo.sendInfo
.
The broker.xml
file is as shown below. The acceptor
used by the Spring Boot client is the netty-ssl-acceptor
on port 61617
.
<?xml version='1.0'?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core "> <name>0.0.0.0</name> <persistence-enabled>true</persistence-enabled> <journal-type>NIO</journal-type> <paging-directory>data/paging</paging-directory> <bindings-directory>data/bindings</bindings-directory> <journal-directory>data/journal</journal-directory> <large-messages-directory>data/large-messages</large-messages-directory> <journal-datasync>true</journal-datasync> <journal-min-files>2</journal-min-files> <journal-pool-files>10</journal-pool-files> <journal-device-block-size>4096</journal-device-block-size> <journal-file-size>10M</journal-file-size> <!-- This value was determined through a calculation. Your system could perform 1.18 writes per millisecond on the current journal configuration. That translates as a sync write every 844000 nanoseconds. Note: If you specify 0 the system will perform writes directly to the disk. We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false. --> <journal-buffer-timeout>844000</journal-buffer-timeout> <journal-max-io>1</journal-max-io> <!-- how often we are looking for how many bytes are being used on the disk in ms --> <disk-scan-period>5000</disk-scan-period> <!-- once the disk hits this limit the system will block, or close the connection in certain protocols that won't support flow control. --> <max-disk-usage>90</max-disk-usage> <!-- should the broker detect dead locks and other issues --> <critical-analyzer>true</critical-analyzer> <critical-analyzer-timeout>120000</critical-analyzer-timeout> <critical-analyzer-check-period>60000</critical-analyzer-check-period> <critical-analyzer-policy>HALT</critical-analyzer-policy> <page-sync-timeout>844000</page-sync-timeout> <acceptors> <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> <!-- amqpCredits: The number of credits sent to AMQP producers --> <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false as duplicate detection requires applicationProperties to be parsed on the server. --> <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data. default: 102400, -1 would mean to disable large mesasge control --> <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url. See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. --> <!-- Acceptor for every supported protocol --> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor> <!-- STOMP Acceptor. --> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor> <!-- MQTT Acceptor --> <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor> <!-- SSL Acceptor --> <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor> <acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor> </acceptors> <security-settings> <security-setting match="#"> <permission type="createNonDurableQueue" roles="admins, users"/> <permission type="deleteNonDurableQueue" roles="admins, users"/> <permission type="createDurableQueue" roles="admins, users"/> <permission type="deleteDurableQueue" roles="admins, users"/> <permission type="createAddress" roles="admins, users"/> <permission type="deleteAddress" roles="admins, users"/> <permission type="consume" roles="admins, users"/> <permission type="browse" roles="admins, users"/> <permission type="send" roles="admins, users"/> <!-- we need this otherwise ./artemis data imp wouldn't work --> <permission type="manage" roles="admins"/> </security-setting> </security-settings> <address-settings> <!-- if you define auto-create on certain queues, management has to be auto-create --> <address-setting match="activemq.management#"> <dead-letter-address>DLQ</dead-letter-address> <expiry-address>ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <!-- with -1 only the global-max-size is in use for limiting --> <max-size-bytes>-1</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>PAGE</address-full-policy> <auto-create-queues>true</auto-create-queues> <auto-create-addresses>true</auto-create-addresses> <auto-create-jms-queues>true</auto-create-jms-queues> <auto-create-jms-topics>true</auto-create-jms-topics> </address-setting> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>DLQ</dead-letter-address> <expiry-address>ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <!-- with -1 only the global-max-size is in use for limiting --> <max-size-bytes>-1</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>PAGE</address-full-policy> <auto-create-queues>true</auto-create-queues> <auto-create-addresses>true</auto-create-addresses> <auto-create-jms-queues>true</auto-create-jms-queues> <auto-create-jms-topics>true</auto-create-jms-topics> <auto-delete-queues>false</auto-delete-queues> <auto-delete-addresses>false</auto-delete-addresses> </address-setting> </address-settings> <addresses> <address name="DLQ"> <anycast> <queue name="DLQ" /> </anycast> </address> <address name="ExpiryQueue"> <anycast> <queue name="ExpiryQueue" /> </anycast> </address> </addresses> </core> </configuration>
The connection factory on the Spring Boot client is configured as shown below.
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.DeliveryMode; import javax.jms.JMSException; @Configuration @EnableJms public class MQTTConfig { @Value("${JMS_BROKER_TRUSTSTORE}") private String pathToTrustStore; @Value("${JMS_BROKER_KEYSTORE}") private String pathToKeystore; @Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}") private String truststorePassword; @Value("${JMS_BROKER_KEYSTORE_PASSWORD}") private String keystorePassword; @Bean public ActiveMQConnectionFactory artemisSSLConnectionFactory() { ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" + "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit"); artemisConnectionFactory.setUser("user"); artemisConnectionFactory.setPassword("password"); return artemisConnectionFactory; } /** * Initialise {@link JmsTemplate} as required */ @Bean public JmsTemplate jmsTemplate() throws JMSException { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory()); jmsTemplate.setExplicitQosEnabled(true); //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues jmsTemplate.setPubSubDomain(true); return jmsTemplate; } /** * Initialise {@link DefaultJmsListenerContainerFactory} as required */ @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(artemisSSLConnectionFactory()); //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues factory.setPubSubDomain(true); return factory; } }
Below is the POM file, with only the relevant dependencies.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> <version>2.5.5</version> </dependency>
The below code snippet shows a producer that publishes to the topic server.weatherForecast
and a consumer that subscribes to the same topic. Messages are exchanged, without issue, between this producer and consumer as jms.topic.
is prefixed to every topic defined on the Spring Boot client. However, when I use an external tool to subscribe to MQTT messages, messages are not received on the topic defined on the tool unless the topic being subscribed to is changed from server.weatherForecast
to jms.topic.server.weatherForecast
.
import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public void samplePC() { @Autowired private JMSTemplate jmsTemplate; //producer that is called by a cron job public void tester() { JSONObject jsonObject = new JSONObject(); jsonObject.put("serialNumber", "105"); jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray()); } //consumer (a message from the producer should be received here, but nothing arrives) @JmsListener(destination = "server/forecast") private void consumeWeatherForecastRequest(char[] incomingMessage) { //some logic jmsTemplate.convertAndSend("someTopic", "someMessage"); } }
Upon enabling TRACE
logging for the RemotingConnectionImpl
, I saw that in the CreateSessionResponseMessage
, the serverVersion
attribute had a value of 131, and in the CreateSessionMessage
, the version
attribute had a value of 127.
How do I ensure that jms.topic.
is not prefixed to topic names?
A minimal reproducible example can be downloaded from this GitHub repository.
I tried to log the prefix in the code, but didn’t find any means of doing so, all the logs simply showed the topic name without the prefix. However, subscribing to the topic being published to from an external client should indicate the prefixing. On subscribing to topicName
and jms.topic.topicName
, it will be evident that the message will be delivered to the latter. I’ve noticed that some clients parse the “.” as a “/”, so that could be something else to try in case the “.” doesn’t work.
Advertisement
Answer
I took your reproducer and I managed to re-create the problem you’re seeing where the client is using jms.topic.test.topic
. However, once I added multicastPrefix=jms.topic.
to the “artemis” acceptor
in broker.xml
the problem went away. The broker now strips the client’s prefix and uses test.topic
instead.
You did have multicastPrefix=jms.topic.
set on the “netty-ssl-acceptor” acceptor
, but your client wasn’t actually using that acceptor.
I also ran mvn dependency:tree
to see why your application is using the ActiveMQ Artemis 1.3.0 client. This is what it output (in part):
[INFO] - org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile [INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile [INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile [INFO] - org.apache.activemq:artemis-jms-client:jar:1.3.0:compile [INFO] +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile [INFO] | +- org.jgroups:jgroups:jar:3.6.9.Final:compile [INFO] | +- org.apache.activemq:artemis-commons:jar:1.3.0:compile [INFO] | | +- commons-beanutils:commons-beanutils:jar:1.9.2:compile [INFO] | | | - commons-collections:commons-collections:jar:3.2.2:compile [INFO] | | - com.google.guava:guava:jar:18.0:compile [INFO] | - io.netty:netty-all:jar:4.0.32.Final:compile [INFO] +- org.apache.activemq:artemis-selector:jar:1.3.0:compile [INFO] - javax.inject:javax.inject:jar:1:compile
So it appears that the dependency on org.apache.activemq:artemis-jms-client:jar:1.3.0
is coming directly from org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5
which is really strange since it has clearly defined a dependency on org.apache.activemq:artemis-jms-client:jar:2.17.0
. However, if I change the <parent>
to use 2.5.5
instead of 1.4.1.RELEASE
the problem goes away, e.g.:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent>
This is what mvn dependency:tree
outputs now (in part):
[INFO] - org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile [INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile [INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile [INFO] - org.apache.activemq:artemis-jms-client:jar:2.17.0:compile [INFO] +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile [INFO] | +- org.jgroups:jgroups:jar:3.6.13.Final:compile [INFO] | +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile [INFO] | +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile [INFO] | | - io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile [INFO] | +- io.netty:netty-codec-http:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-buffer:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-transport:jar:4.1.68.Final:compile [INFO] | | - io.netty:netty-resolver:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-handler:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-codec:jar:4.1.68.Final:compile [INFO] | +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile [INFO] | - io.netty:netty-common:jar:4.1.68.Final:compile [INFO] +- org.apache.activemq:artemis-commons:jar:2.17.0:compile [INFO] | +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile [INFO] | - commons-beanutils:commons-beanutils:jar:1.9.4:compile [INFO] | - commons-collections:commons-collections:jar:3.2.2:compile [INFO] - org.apache.activemq:artemis-selector:jar:2.17.0:compile