Skip to content
Advertisement

Configure Apache Qpid JMS (jakarta.jms) connectionfactory in Apache Camel XML DSL in AMQP component

Trying to use the jakarta.jms Apache Qpid AMQP client to process messages.

I am trying to use poolable connection factory using org.messagehub.

Standlone java code works, refer the java changes. When I try use the same in Spring XML DSL in Camel, the AMQP component doesn’t support the jakarta.jmx connection factory.

Does the apache-amqp component support JMS 2.0 from Apache Qpid client yet?

package org.example;


import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.camel.component.jms.JmsConfiguration;
import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;

import javax.naming.Context;
        import javax.naming.InitialContext;

public class AMQPArtemisClient {
    public static void main(String[] args) throws Exception {
 
        try {
            // The configuration for the Qpid InitialContextFactory has been supplied in
            // a jndi.properties file in the classpath, which results in it being picked
            // up automatically by the InitialContext constructor.
            Context context = new InitialContext();

            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");

            System.setProperty("USER","admin");
            System.setProperty("PASSWORD","admin");

            //added for poolable connection
            JmsPoolConnectionFactory poolConnectionFactory = new JmsPoolConnectionFactory();
            poolConnectionFactory.setMaxConnections(5);
            poolConnectionFactory.setConnectionFactory(factory);

            //  Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            Connection connection = poolConnectionFactory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);

            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);

            if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
            } else {
                System.out.println("No message received within the given timeout!");
            }

            connection.close();

        } catch (Exception exp) {
            System.out.println("Caught exception, exiting.");
            exp.printStackTrace(System.out);
            System.exit(1);
        }
    }

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}
  • resources/jndi.properties
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
connectionfactory.myFactoryLookup = amqp://localhost:5672
queue.myQueueLookup = queue
topic.myTopicLookup = topic
  • pom.xml
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId> <!--version 3.17.0 is used-->
    </dependency>

<dependency>
          <groupId>org.messaginghub</groupId>
          <artifactId>pooled-jms</artifactId>
          <version>3.0.0</version>  <!-- supports jakarta.jms connecton factory -->
      </dependency>
      <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-jms-client</artifactId>
          <version>2.0.0</version>
      </dependency>
  • Camel component I tried to configure the AMPQ component
   <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory" >
        <property name="username" value="admin"/>
        <property name="password" value="secret"/>
       <property name="remoteURI" value="amqp://localhost:5672" />
    </bean>

   <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="5" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>


    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
        <property name="concurrentConsumers" value="5" />
    </bean>
    <!-- uses the  AMQP component -->
   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
        <property name="configuration" ref="jmsConfig" />
       <property name="connectionFactory" ref="jmsPooledConnectionFactory"/>
    </bean>

When I try configuration like below I get exception jakarta.jms exception where it expects javax.jms, for now i had to lower the version of messaging hub to 2.0.5 and use qpid-jms-client jar to 1.6.0.

<bean id="amqp" class="org.apache.camel.component.amqp.AmqpComponent">
   <property name="connectionFactory">
     <bean class="org.apache.qpid.jms.JmsConnectionFactory" factory-method="createFromURL">
       <property name="remoteURI" value="amqp://localhost:5672" />
       <property name="topicPrefix" value="topic://" />  <!-- only necessary when connecting to ActiveMQ over AMQP 1.0 -->
     </bean>
   </property>
 </bean>

Advertisement

Answer

The Camel AMQP component you are using doesn’t support Jakarta JMS an so you must revert to older versions of the Qpid JMS client and the Pooled JMS wrapper in order to make it work. This is your only options until there is a version of the Camel bits that supports Jakarta and you move on to it.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement