Skip to content
Advertisement

Better way to wait to receive Async messages in ActiveMQ

I have used ActiveMQ to Send messages and Receive them Asynchronously.

There, I’m having a problem with deciding the best way to waiting in the for messages. Sleeping thread in a loop is one option. But it feels doesn’t look good for me.

Can anyone suggest a better way for this.

AsyncReceiver.java

public class AsyncReceiver implements MessageListener, ExceptionListener{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();                                  
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue","MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

        QueueReceiver queueReceiver = queueSession.createReceiver(queue);
        AsyncReceiver asyncReceiver = new AsyncReceiver();
        queueReceiver.setMessageListener(asyncReceiver);
        queueConn.setExceptionListener(asyncReceiver);
        queueConn.start();

        // Waiting for messages
        System.out.print("waiting for messages");
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
        }

        queueConn.close();
    }

    public void onMessage(Message message){
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("received: " + msg.getText());
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }

    public void onException(JMSException exception){
        System.err.println("an error occurred: " + exception);
    }
}

Sender.java

public class Sender{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue", "MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);

        QueueSender queueSender = queueSession.createSender(queue);
        queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = queueSession.createTextMessage("Hello");
        queueSender.send(message);
        System.out.println("sent: " + message.getText());

        queueConn.close();
    }
}

Advertisement

Answer

There are two ways to process/consume messages in the Queue.

  1. Periodically check the queue for new messages – This is suitable if you run your program periodically. You can do this by implementing a loop with some thread sleeps. Ex. twice a day, once a day etc.

  2. Register consumers (use MessageListener) with the queue. You can do this as below example.

Consumer.java

        javax.jms.Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;


        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        connection = connectionFactory.createConnection();
        connection.start();


        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(queueName);


        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new YourClass());

YourClass.java

public class YourClass implements MessageListener {
@Override 
public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                inputJsonString = textMessage.getText();
               //do what ever you want with inputJsonString
                message.acknowledge(); 
           }

}

User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement