Given a subscription in a Quarkus application:
Uni.createFrom.Item(1) .chain { it -> processA(it) } .emitOn(Infrastructure.getDefaultWorkerPool()) .chain { it -> processB(it) } .chain { it -> processC(it) } .subscribe().with{ it -> processD(it) }
If I understand correctly, processA
will be executed on the caller thread (so if it is in a Verticle, it should be on a IO thread), processB
and processC
will be executed on the worker threads, processD
will be on the caller/IO thread again.
How can I make processC
being called on the IO thread, while processB
still on the worker thread? Is there a simple method to bring the event back to the caller thread?
Edit: Right now I am using the following workaround:
Uni.createFrom.Item(1) .chain { it -> processA(it) } .chain { i -> Uni.createFrom().future { Infrastructure.getDefaultWorkerPool().submit{ processB(i) } } } .chain { it -> processC(it) } .subscribe().with{ it -> processD(it) }
Advertisement
Answer
You would need to capture the Vert.x Context and switch back to it. Something like this would work (using Java, as my Kotlin is not great):
Context context = Vertx.currentContext(); Uni.createFrom().item(1) .chain(it -> processA(it)) .emitOn(Infrastructure.getDefaultWorkerPool()) .chain(it -> processB(it)) .emitOn(runnable -> context.runOnContext(ignored -> runnable.run()) .chain(it -> process(it)) .subscribe().with(it -> processD(it));