Skip to content
Advertisement

Killing threads from completion service?

Problem

I am using a completion service and spawning child threads to perform some ETL. As I debug in my IDE and then stop all processes, I notice I still have a bunch of zombie threads killing my CPU. This is due to the fact that I’m not terminating the child threads properly.

Minimum Example

  Future<Boolean> future = completionService.submit(conversionProcessor);
  boolean isCompleted = false;
  while (!isCompleted && !closed.get()) {
    try {
      isCompleted = future.get(CONSUMER_HEARTBEAT_INTERVAL,
          TimeUnit.SECONDS); // Wait until heartbeat interval exceeds
      if (isCompleted) {
        // do some things here
        future.cancel(true);
        break;
      }
    } catch (TimeoutException e) {
      // Used to keep consumer alive in the cluster
      consumer.poll(Duration.ofSeconds(CONSUMER_HEARTBEAT_INTERVAL)); // does heart-beat
    } catch (CancellationException e) {
      future.cancel(true);
      break;
    } catch (InterruptedException e) {
      future.cancel(true);
      break;
    } catch (WakeupException we) {
      future.cancel(true);
      break;
    } catch (Exception e) {
      future.cancel(true);
      break;
    }

Thoughts

Essentially, I submit my Callable<Boolean> to my completion service.

    ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>(
        Executors.newSingleThreadExecutor());

If I stop the debugger, this thread is presumably still running. I just added this future.cancel(true) piece, which seems to have stopped continuously uploaded files from my child thread, but I still see these java processes running on my activity monitor.

I’m wondering how I should be thinking about this? I want the callable as it tells me when the underlying ETL has completed or not (true/false)

edit: future.cancel actually seems to be helping quite a bit.. Is this what I want to be using?

Advertisement

Answer

Once you are done with your CompletionService you need to shutdown underlying executor so you need to do the following

ExecutorService es = Executors.newSingleThreadExecutor();
ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>(es);

And a the end call

es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement