I need to post-process result of CompletableFuture.supplyAsync
execution to get intermediate result.
My code looks following
var executor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, 2L, TimeUnit.SECONDS, // size of queue has to be restricted since Java Heap Space could appear; // default size of queue is Integer.MAX_VALUE new LinkedBlockingQueue<>(10_000_000)); var resultOfBatch = new ResultOfBatch(); var lock = new ReentrantLock(); // usually `settings.getRuns()` could be up to 1_000_000_000 runs LongStream.range(0, settings.getRuns()) .forEach(l -> { CompletableFuture.supplyAsync(task, executor) // collecting result per run to resultOfBatch (mainly simple operations like adding values to primitives) .thenApply(resultPerRun -> { lock.lock(); return resultOfBatch.addResultPerBatch(resultPerRun); }) // the idea in logging partial result - ex.,every 10K passes .thenAccept(resultPerBatch -> { if (resultPerBatch.getRuns() % 10_000 == 0) { // log intermediate result of execution resultOfBatch.reset(); } lock.unlock(); }); });
In a result I’m facing with java.util.concurrent.CompletionException: java.lang.IllegalMonitorStateException
on .thenAccept(resultPerBatch -> {
line
Seems like I’m using lock
in wrong way but I cannot figure out how to avoid this kind of exception.
Advertisement
Answer
There’s no guarantee that the Function
passed to thenApply
and the Consumer
passed to thenAccept
will execute on the same thread.
In this case, there is no need to separate them into separate steps:
CompletableFuture.supplyAsync(() -> l, executor) .thenAcceptAsync(resultPerRun -> { lock.lock(); try { var resultPerBatch = resultOfBatch.addResultPerBatch(resultPerRun); if (resultPerBatch.getRuns() % 10_000 == 0) { System.out.println(resultPerBatch.getRuns()); resultOfBatch.reset(); } } finally { lock.unlock(); } }, executor);
However, it is probably a better idea to process this data in batches rather than trying to create so many threads. This will ultimately either run out of memory, out of available native threads, or reject work because the queue is full.