I have used ActiveMQ to Send messages and Receive them Asynchronously.
There, I’m having a problem with deciding the best way to waiting in the for messages. Sleeping thread in a loop is one option. But it feels doesn’t look good for me.
Can anyone suggest a better way for this.
AsyncReceiver.java
public class AsyncReceiver implements MessageListener, ExceptionListener{ public static void main(String[] args) throws Exception{ Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); env.put(Context.PROVIDER_URL, "tcp://localhost:61616"); env.put("queue.queueSampleQueue","MyNewQueue"); InitialContext ctx = new InitialContext(env); Queue queue = (Queue) ctx.lookup("queueSampleQueue"); QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); QueueConnection queueConn = connFactory.createQueueConnection(); QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); QueueReceiver queueReceiver = queueSession.createReceiver(queue); AsyncReceiver asyncReceiver = new AsyncReceiver(); queueReceiver.setMessageListener(asyncReceiver); queueConn.setExceptionListener(asyncReceiver); queueConn.start(); // Waiting for messages System.out.print("waiting for messages"); for (int i = 0; i < 10; i++) { Thread.sleep(1000); } queueConn.close(); } public void onMessage(Message message){ TextMessage msg = (TextMessage) message; try { System.out.println("received: " + msg.getText()); } catch (JMSException ex) { ex.printStackTrace(); } } public void onException(JMSException exception){ System.err.println("an error occurred: " + exception); } }
Sender.java
public class Sender{ public static void main(String[] args) throws Exception{ Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); env.put(Context.PROVIDER_URL, "tcp://localhost:61616"); env.put("queue.queueSampleQueue", "MyNewQueue"); InitialContext ctx = new InitialContext(env); Queue queue = (Queue) ctx.lookup("queueSampleQueue"); QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); QueueConnection queueConn = connFactory.createQueueConnection(); QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE); QueueSender queueSender = queueSession.createSender(queue); queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = queueSession.createTextMessage("Hello"); queueSender.send(message); System.out.println("sent: " + message.getText()); queueConn.close(); } }
Advertisement
Answer
There are two ways to process/consume messages in the Queue.
Periodically check the queue for new messages – This is suitable if you run your program periodically. You can do this by implementing a loop with some thread sleeps. Ex. twice a day, once a day etc.
Register consumers (use MessageListener) with the queue. You can do this as below example.
Consumer.java
javax.jms.Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queueName); consumer = session.createConsumer(destination); consumer.setMessageListener(new YourClass());
YourClass.java
public class YourClass implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; inputJsonString = textMessage.getText(); //do what ever you want with inputJsonString message.acknowledge(); }
}