Skip to content
Advertisement

ActiveMQ batch consumer

I have a requirement to consume the messages from the ActiveMQ topic and persist them in mongo. I am wondering if there is a way/configuration for consuming the messages in batch from the topic instead of reading messages one by one and making a DB call for every message.

I am imagining the end solution will do something like:

  1. Consumes message in a batch size of 100
  2. Use mongo bulk insert for saving the batch into DB
  3. Send ACK to broker for successfully inserted messages and NAK for the failed message.

Answer

The JMS API only allows you to receive one message at a time whether that’s via an asynchronous javax.jms.MessageListener or a synchronous call to javax.jms.MessageConsumer#receive() in JMS 1.1 or javax.jms.JMSConsumer.receive() in JMS 2. However, you can batch the receipt of multiple messages up using a transacted session. Here’s what the javax.jms.Session JavaDoc says about transacted sessions:

A session may be specified as transacted. Each transacted session supports a single series of transactions. Each transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, transactions organize a session’s input message stream and output message stream into series of atomic units. When a transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a transaction rollback is done, the transaction’s sent messages are destroyed and the session’s input is automatically recovered.

So you can receive 100 messages individually using a transacted session, insert that data into Mongo, commit the transacted session or if there’s a failure you can rollback the transacted session (which essentially acts as a negative acknowledgement). For example:

final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
   List messages = new ArrayList<Message>();
   for (int i = 0; i < TX_SIZE; i++) {
      Message message = consumer.receive(1000);
      if (message != null) {
         messages.add(message);
      } else {
         break; // no more messages available for this batch
      }
   }

   if (messages.size() > 0) {
      try {
         // bulk insert data from messages List into Mongo
         session.commit();
      } catch (Exception e) {
         e.printStackTrace();
         session.rollback();
      }
   } else {
      break; // no more messages in the subscription
   }
}

It’s worth noting that if you are only using JMS transacted sessions and not full XA transactions there’s going to be at least some risk of duplicates in Mongo (e.g. if your application crashes after successfully inserting data into Mongo but before committing the transacted session). XA transactions would mitigate this risk for you at the cost of a fair amount of additional complexity depending on your environment.

Lastly, if you run into performance limitations with ActiveMQ “Classic” consider using ActiveMQ Artemis, the next-generation message broker from ActiveMQ.

Advertisement