Skip to content
Advertisement

ActiveMQ Artemis prefixes “jms.topic.” to all topic names defined on Spring Boot Client

I’m using ActiveMQ Artemis 2.18.0 and version 2.5.5 of the spring-boot-starter-artemis dependency on a Spring Boot client. In my use case clients are required to communicate with each other via topics. The issue is that the string jms.topic. is getting prefixed to every topic defined on the client. For example the topic foo.sendInfo becomes jms.topic.foo.sendInfo.

The broker.xml file is as shown below. The acceptor used by the Spring Boot client is the netty-ssl-acceptor on port 61617.

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>

      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 1.18 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 844000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>844000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>844000</page-sync-timeout>

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
         <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
                                      as duplicate detection requires applicationProperties to be parsed on the server. -->
         <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
                                       default: 102400, -1 would mean to disable large mesasge control -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->


         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
         
         <!-- SSL Acceptor -->
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="admins, users"/>
            <permission type="deleteNonDurableQueue" roles="admins, users"/>
            <permission type="createDurableQueue" roles="admins, users"/>
            <permission type="deleteDurableQueue" roles="admins, users"/>
            <permission type="createAddress" roles="admins, users"/>
            <permission type="deleteAddress" roles="admins, users"/>
            <permission type="consume" roles="admins, users"/>
            <permission type="browse" roles="admins, users"/>
            <permission type="send" roles="admins, users"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="admins"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

The connection factory on the Spring Boot client is configured as shown below.

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;


@Configuration
@EnableJms
public class MQTTConfig {

    @Value("${JMS_BROKER_TRUSTSTORE}")
    private String pathToTrustStore;

    @Value("${JMS_BROKER_KEYSTORE}")
    private String pathToKeystore;

    @Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
    private String truststorePassword;

    @Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
    private String keystorePassword;



    @Bean
    public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
                "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
        artemisConnectionFactory.setUser("user");
        artemisConnectionFactory.setPassword("password");
        return artemisConnectionFactory;
    }

    /**
     * Initialise {@link JmsTemplate} as required
     */
    @Bean
    public JmsTemplate jmsTemplate() throws JMSException {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
        jmsTemplate.setExplicitQosEnabled(true);

        //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }

    /**
     * Initialise {@link DefaultJmsListenerContainerFactory} as required
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(artemisSSLConnectionFactory());
        //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
        factory.setPubSubDomain(true);
        return factory;
    }
}

Below is the POM file, with only the relevant dependencies.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
    <version>2.5.5</version>
</dependency>

The below code snippet shows a producer that publishes to the topic server.weatherForecast and a consumer that subscribes to the same topic. Messages are exchanged, without issue, between this producer and consumer as jms.topic. is prefixed to every topic defined on the Spring Boot client. However, when I use an external tool to subscribe to MQTT messages, messages are not received on the topic defined on the tool unless the topic being subscribed to is changed from server.weatherForecast to jms.topic.server.weatherForecast.

import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public void samplePC() {

    @Autowired
    private JMSTemplate jmsTemplate;

    //producer that is called by a cron job
    public void tester() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("serialNumber", "105");
        jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
    }

    //consumer (a message from the producer should be received here, but nothing arrives)
    @JmsListener(destination = "server/forecast")
    private void consumeWeatherForecastRequest(char[] incomingMessage) {
        //some logic
        jmsTemplate.convertAndSend("someTopic", "someMessage");
    }
}

Upon enabling TRACE logging for the RemotingConnectionImpl, I saw that in the CreateSessionResponseMessage, the serverVersion attribute had a value of 131, and in the CreateSessionMessage, the version attribute had a value of 127.

How do I ensure that jms.topic. is not prefixed to topic names?

A minimal reproducible example can be downloaded from this GitHub repository. I tried to log the prefix in the code, but didn’t find any means of doing so, all the logs simply showed the topic name without the prefix. However, subscribing to the topic being published to from an external client should indicate the prefixing. On subscribing to topicName and jms.topic.topicName, it will be evident that the message will be delivered to the latter. I’ve noticed that some clients parse the “.” as a “/”, so that could be something else to try in case the “.” doesn’t work.

Advertisement

Answer

I took your reproducer and I managed to re-create the problem you’re seeing where the client is using jms.topic.test.topic. However, once I added multicastPrefix=jms.topic. to the “artemis” acceptor in broker.xml the problem went away. The broker now strips the client’s prefix and uses test.topic instead.

You did have multicastPrefix=jms.topic. set on the “netty-ssl-acceptor” acceptor, but your client wasn’t actually using that acceptor.

I also ran mvn dependency:tree to see why your application is using the ActiveMQ Artemis 1.3.0 client. This is what it output (in part):

[INFO] - org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    - org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.9.Final:compile
[INFO]       |  +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
[INFO]       |  |  +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
[INFO]       |  |  |  - commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       |  |  - com.google.guava:guava:jar:18.0:compile
[INFO]       |  - io.netty:netty-all:jar:4.0.32.Final:compile
[INFO]       +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
[INFO]       - javax.inject:javax.inject:jar:1:compile

So it appears that the dependency on org.apache.activemq:artemis-jms-client:jar:1.3.0 is coming directly from org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5 which is really strange since it has clearly defined a dependency on org.apache.activemq:artemis-jms-client:jar:2.17.0. However, if I change the <parent> to use 2.5.5 instead of 1.4.1.RELEASE the problem goes away, e.g.:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

This is what mvn dependency:tree outputs now (in part):

[INFO] - org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    - org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.13.Final:compile
[INFO]       |  +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
[INFO]       |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
[INFO]       |  |  - io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-buffer:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport:jar:4.1.68.Final:compile
[INFO]       |  |  - io.netty:netty-resolver:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
[INFO]       |  - io.netty:netty-common:jar:4.1.68.Final:compile
[INFO]       +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
[INFO]       |  +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
[INFO]       |  - commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO]       |     - commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       - org.apache.activemq:artemis-selector:jar:2.17.0:compile
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement