I have a chain of Async and Sync method invocation on Mutiny’s Uni, some methods are a long-running process with return type void
.
What is the proper way of invoking/calling them without blocking downstream?
Below is the simple analogy code.
class Root { public static void main(String[] args) { final Response response = getResponsePayload(); // Gets the the Payload from upstream service Uni.createFrom().item(response) .onItem().invoke(() -> System.out.println("Process Started")) .onItem().call(res -> { longRunningMethodAsync(res); // long running blocking method, I want to run on a worker thread return Uni.createFrom().voidItem(); // This line I created, because of the ppipeline will be broken if the Uni is not returned from here }) .onItem().transform(item -> item.hello + " mutiny") .onItem().transform(String::toUpperCase) .subscribe().with( item -> System.out.println(">> " + item)); // This is printed to the console } // Boilerplate method - I created to invoke/call the actual method actual method - `longRunningMethod`, this method basically an adapter // This is the best apprach I could come up, but I'm looking for better thatn this as I'm not conviced I'm doing it right private static UniSubscribe<Void> longRunningMethodAsync(final Response response) { return Uni.createFrom().voidItem().invoke(() -> longRunningMethod(response)) .runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe(); } // Important - this is the method I want to run asynchronously independently of main *event-loop* thread. private static void longRunningMethod(final Response response) { System.out.println("Long running process started"); // Doesn't get printed, which means this is never called at all, not even in the blocked manner by the main even-loop thread } // Not as importatnt, I provded this in case if you like to run on your local box private static Response getResponsePayload() { return new Response(); } private static class Response { public final String hello = "hello"; } }
Advertisement
Answer
In general, use runSubscriptionOn
and pass a specific executor:
longRunningMethodAsync .runSubscriptionOn(executor);
Note that it will constraint the concurrency to the number of threads available in the executor.
Reference: