Skip to content
Advertisement

How to take items from queue in chunks?

I have mulitple producer threads that add objects to a shared queue concurrently.

I want to create a singlethread consumer that reads from that shared queue for further data processing (database batch insert).

Problem: I want to only take the data from the queue in chunks for better performance during batch insert. Thus I somehow have to detect how many items are in the queue, then take all of those items from the queue, and empty the queue again.

 BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();

 ExecutorService pes = Executors.newFixedThreadPool(4);
 ExecutorService ces = Executors.newFixedThreadPool(1);

 pes.submit(new Producer(sharedQueue, 1));
 pes.submit(new Producer(sharedQueue, 2));
 pes.submit(new Producer(sharedQueue, 3));
 pes.submit(new Producer(sharedQueue, 4));
 ces.submit(new Consumer(sharedQueue, 1));

class Producer implements Runnable {
    run() {
            ...
            sharedQueue.put(obj);
    }
}

class Consumer implements Runnable {
    run() {
            ...
            sharedQueue.take();
    }
}

Question for the Consumer: how can I poll the shared queue, wait for the queue having X items, then take all items and simultaneous empty the queue (so the consumer can start again polling and waiting)?

I’m open for any suggestions and am not necessarily bound to the code above.

Advertisement

Answer

Instead of checking size of the queue you’d better create an inner List in consumer and take objects from queue and add to that list. Once list has X items you do your processing and then empty internal list.

class Consumer implements Runnable {
  private List itemsToProcess = new ArrayList();
  run() {
        while (true) { // or when producers are stopped and queue is empty
          while (itemsToProcess.size() < X) {
            itemsToProcess.add(sharedQueue.take());
          }
          // do process
          itemsToProcess.clear();
        }
  }
}

And instead of BlockingQueue.take() you might use BlockingQueue.poll(timeout) with some reasonable timeout and checking results for null to detect situation when all producers are done and queue is empty to be able to shutdown your consumer.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement