Problem
I am using a completion service and spawning child threads to perform some ETL. As I debug in my IDE and then stop all processes, I notice I still have a bunch of zombie threads killing my CPU. This is due to the fact that I’m not terminating the child threads properly.
Minimum Example
Future<Boolean> future = completionService.submit(conversionProcessor); boolean isCompleted = false; while (!isCompleted && !closed.get()) { try { isCompleted = future.get(CONSUMER_HEARTBEAT_INTERVAL, TimeUnit.SECONDS); // Wait until heartbeat interval exceeds if (isCompleted) { // do some things here future.cancel(true); break; } } catch (TimeoutException e) { // Used to keep consumer alive in the cluster consumer.poll(Duration.ofSeconds(CONSUMER_HEARTBEAT_INTERVAL)); // does heart-beat } catch (CancellationException e) { future.cancel(true); break; } catch (InterruptedException e) { future.cancel(true); break; } catch (WakeupException we) { future.cancel(true); break; } catch (Exception e) { future.cancel(true); break; }
Thoughts
Essentially, I submit my Callable<Boolean>
to my completion service.
ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>( Executors.newSingleThreadExecutor());
If I stop the debugger, this thread is presumably still running. I just added this future.cancel(true)
piece, which seems to have stopped continuously uploaded files from my child thread, but I still see these java processes running on my activity monitor.
I’m wondering how I should be thinking about this? I want the callable as it tells me when the underlying ETL has completed or not (true/false)
edit: future.cancel actually seems to be helping quite a bit.. Is this what I want to be using?
Advertisement
Answer
Once you are done with your CompletionService
you need to shutdown underlying executor so you need to do the following
ExecutorService es = Executors.newSingleThreadExecutor(); ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>(es);
And a the end call
es.shutdown(); es.awaitTermination(1, TimeUnit.SECONDS);