Skip to content
Advertisement

How to control child-thread lifecycle and synchonize it with main-thread?

I am trying to create another thread that processes data while main thread doing some more. Main thread must wait till another thread finishes doStuff with all elements. And my implementation is pretty straight forward.

Please, take a look at processData and tell me is there some more Java-like way to do it?

I read about Phaser but still can’t imagine how to use it or what else can I try?

public class MyClass {
    private final NodeQueue queue;
    
    MyClass() {
        queue = new NodeQueue();
    }
    
    public void processData(Set<String> dataSet) {
        // allow transfer
        queue.transferEnable()
        Thread transfer = new Thread(() -> {
            queue.transferData();
        })
        transfer.start();
        
        // doStuff in another thread
        for (String element : dataSet) {
            queue.add(element);
            // do something more
        }
        
        // stop transfer
        queue.waitTillEmptyQueue();
        queue.transferDisable();
        try {
            transfer.join();
        } catch (...) {
            // catch
        }
    }
    
    
    
    public class NodeQueue {
        private final ConcurrentLinkedQueue<String> queue;
        
        private boolean transferEnabled;
        
        protected NodeQueue() {
            queue = new ConcurrentLinkedQueue<>();
            transferEnabled = true;
        }
        
        protected void transfer() {
            while (!queue.isEmpty()) {
                doStuff(queue.poll());
            }
        }
        
        public void transferData() {
            while (tranfserEnabled) {
                transfer();
            }
        }
        
        public synchronized void transferEnable() {
            transferEnabled = true;
        }
        
        public synchronized void transferDisable() {
            transferEnabled = false;
        }
        
        public void add(String s) {
            queue.add(s);
        }
        
        public synchronized void waitTillEmptyQueue() {
            while (!queue.isEmpty()) {
                if (queue.isEmpty()) {
                    break;
                }
            }
        }
    }
}

Advertisement

Answer

Let me copy the Phaser example from my own post

Main thread

// Add producer as a party
Phaser phaser = new Phaser(1);

for (int i=0; i<10000; ++i) {
  // Add each task as a party
  phaser.register();
  queue.put(new Task());
}

// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();

// At the end, there is only 1 party left which is the producer itself

Consumer

while (true) {
  Task task = queue.take();
  processTask(task);
  // Task completed and remove itself as a party
  phaser.arriveAndDeregister();
}
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement