I am trying to create another thread that processes data while main thread doing some more. Main thread must wait till another thread finishes doStuff
with all elements.
And my implementation is pretty straight forward.
Please, take a look at processData
and tell me is there some more Java-like way to do it?
I read about Phaser
but still can’t imagine how to use it or what else can I try?
public class MyClass { private final NodeQueue queue; MyClass() { queue = new NodeQueue(); } public void processData(Set<String> dataSet) { // allow transfer queue.transferEnable() Thread transfer = new Thread(() -> { queue.transferData(); }) transfer.start(); // doStuff in another thread for (String element : dataSet) { queue.add(element); // do something more } // stop transfer queue.waitTillEmptyQueue(); queue.transferDisable(); try { transfer.join(); } catch (...) { // catch } } public class NodeQueue { private final ConcurrentLinkedQueue<String> queue; private boolean transferEnabled; protected NodeQueue() { queue = new ConcurrentLinkedQueue<>(); transferEnabled = true; } protected void transfer() { while (!queue.isEmpty()) { doStuff(queue.poll()); } } public void transferData() { while (tranfserEnabled) { transfer(); } } public synchronized void transferEnable() { transferEnabled = true; } public synchronized void transferDisable() { transferEnabled = false; } public void add(String s) { queue.add(s); } public synchronized void waitTillEmptyQueue() { while (!queue.isEmpty()) { if (queue.isEmpty()) { break; } } } } }
Advertisement
Answer
Let me copy the Phaser
example from my own post
Main thread
// Add producer as a party Phaser phaser = new Phaser(1); for (int i=0; i<10000; ++i) { // Add each task as a party phaser.register(); queue.put(new Task()); } // Producer arrived and wait for completion of all tasks phaser.arriveAndAwaitAdvance(); // At the end, there is only 1 party left which is the producer itself
Consumer
while (true) { Task task = queue.take(); processTask(task); // Task completed and remove itself as a party phaser.arriveAndDeregister(); }