Skip to content
Advertisement

Bring Uni event back to the caller thread

Given a subscription in a Quarkus application:

Uni.createFrom.Item(1)
    .chain { it -> processA(it) }
    .emitOn(Infrastructure.getDefaultWorkerPool())
    .chain { it -> processB(it) }
    .chain { it -> processC(it) }
    .subscribe().with{ it -> processD(it) }

If I understand correctly, processA will be executed on the caller thread (so if it is in a Verticle, it should be on a IO thread), processB and processC will be executed on the worker threads, processD will be on the caller/IO thread again.

How can I make processC being called on the IO thread, while processB still on the worker thread? Is there a simple method to bring the event back to the caller thread?


Edit: Right now I am using the following workaround:

Uni.createFrom.Item(1)
    .chain { it -> processA(it) }
    .chain { i -> Uni.createFrom().future { Infrastructure.getDefaultWorkerPool().submit{ processB(i) } } }
    .chain { it -> processC(it) }
    .subscribe().with{ it -> processD(it) }

Advertisement

Answer

You would need to capture the Vert.x Context and switch back to it. Something like this would work (using Java, as my Kotlin is not great):

Context context = Vertx.currentContext();
Uni.createFrom().item(1)
   .chain(it -> processA(it))
   .emitOn(Infrastructure.getDefaultWorkerPool())
   .chain(it -> processB(it))
   .emitOn(runnable -> context.runOnContext(ignored -> runnable.run())
   .chain(it -> process(it))
   .subscribe().with(it -> processD(it));
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement