I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline. Prefetch n Messages from the queue. Make async calls to external system like fun remoteCall(message: Message): Mono<Void>. Pull next message(s) from the queue after external calls will complete. I don̵…
Tag: project-reactor
Reduce a flux to mono using data from flux
I have this scenario. I have one paginated API which gives me the data for last 12 months. The response of the API is like: Now I have to collect all the data and then calculate the sum of all the netReceivable and return as a Mono<CompanyIncome>. This pojo is like To do this I have written something li…
Is Reactor chain still running
I have an application based on Spring WebFlux and Reactor. Application starts everyday on a schedule and loads statistics. But there is also available a manual launch option through controller. So if …
Java reactor how to properly start async cancellable sideeffect
I’m trying to write something using reactor which I know how to write using completable futures. I’m getting “Calling subscribe in non-blocking scope” warning in it. My goal is to call turnOn() with a timeout which should call turnOff() after the timeout. If turnOn() is called again it…
How to create a generic wrapper for just any method call?
I want to create a helper method that can wrap/convert just any sync method call into an async Mono. The following is close, but shows an error: This is my code: Answer First you call Mono.fromCallable with a Callable<Callable<? extend T>>. You need to change the call like this: Mono.fromCallable(…
Continue consuming subsequent records in reactor kafka after deserialization exception
I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages. Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn an…
How to handle file access in a reactive environment
I’m refactoring some blocking code to reactive (using Reactor). I think most of the methods in the java.nio.file.Files class are blocking. Is it right if I replace a method like: with: I think it’s necessary, especially when cheap HDDs are used. Or does a library exists in Reactor for this kind of…
Combine two Stream into one Flux
How can I combine two streams Stream<String> into Flux? What I understand is that I might need to use Flux create method to create this but I am not really sure about it: Please help. Answer Concat the Streams into one and then invoke Flux#fromStream: Another way of doing this would be to create a Flux …
Is ReactorDebugAgent suitable for production?
I read how to enable ReactorDebugAgent here: https://projectreactor.io/docs/core/release/reference/#reactor-tools-debug But I don’t know if it’s feasible to have such debug agent enabled by default on the production environment or only when something unusual happens? Answer The section name, as of…
Obtaining a nested objects using Spring Data R2DBC
I’m new to Project Reactor and R2DBC. How to receive and merge Flux<Child> with Mono<Parent> properly using Spring Data R2DBC reactive repositories? Parent: ParentRepository: Child: ChildRepository: ParentPersistenceAdapter: My solution is: Answer Assuming the existence of a withChildren(Flu…