Skip to content
Advertisement

Locking execution within CompletableFuture

I need to post-process result of CompletableFuture.supplyAsync execution to get intermediate result. My code looks following

var executor = new ThreadPoolExecutor(
                       Runtime.getRuntime().availableProcessors(),
                       Integer.MAX_VALUE,
                       2L,
                       TimeUnit.SECONDS,
                       // size of queue has to be restricted since Java Heap Space could  appear; 
                       // default size of queue is Integer.MAX_VALUE
                       new LinkedBlockingQueue<>(10_000_000));
var resultOfBatch = new ResultOfBatch();
var lock = new ReentrantLock();
// usually `settings.getRuns()` could be up to 1_000_000_000 runs
LongStream.range(0, settings.getRuns())
    .forEach(l -> {
            CompletableFuture.supplyAsync(task, executor)

                // collecting result per run to resultOfBatch (mainly simple operations like adding values to primitives)
                .thenApply(resultPerRun -> {
                    lock.lock();
                    return resultOfBatch.addResultPerBatch(resultPerRun);
                })

                // the idea in logging partial result - ex.,every 10K passes
                .thenAccept(resultPerBatch -> {
                    if (resultPerBatch.getRuns() % 10_000 == 0) {
                        // log intermediate result of execution
                        resultOfBatch.reset();
                    }
                    lock.unlock();
                });
    });

In a result I’m facing with java.util.concurrent.CompletionException: java.lang.IllegalMonitorStateException on .thenAccept(resultPerBatch -> { line

Seems like I’m using lock in wrong way but I cannot figure out how to avoid this kind of exception.

Advertisement

Answer

There’s no guarantee that the Function passed to thenApply and the Consumer passed to thenAccept will execute on the same thread.

In this case, there is no need to separate them into separate steps:

CompletableFuture.supplyAsync(() -> l, executor)
        .thenAcceptAsync(resultPerRun -> {
            lock.lock();
            try {
                var resultPerBatch = resultOfBatch.addResultPerBatch(resultPerRun);
                if (resultPerBatch.getRuns() % 10_000 == 0) {
                    System.out.println(resultPerBatch.getRuns());
                    resultOfBatch.reset();
                }
            } finally {
                lock.unlock();
            }
        }, executor);

However, it is probably a better idea to process this data in batches rather than trying to create so many threads. This will ultimately either run out of memory, out of available native threads, or reject work because the queue is full.

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement