I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline. Prefetch n Messages from the queue. Make async calls to external system like fun remoteCall(message: Message): Mono<Void>. Pull next message(s) from the queue after external calls will complete. I don’t want to use Poller with scheduler to pull messages from the queue ahead of time.
Tag: project-reactor
Reduce a flux to mono using data from flux
I have this scenario. I have one paginated API which gives me the data for last 12 months. The response of the API is like: Now I have to collect all the data and then calculate the sum of all the netReceivable and return as a Mono<CompanyIncome>. This pojo is like To do this I have written something like: Now
Is Reactor chain still running
I have an application based on Spring WebFlux and Reactor. Application starts everyday on a schedule and loads statistics. But there is also available a manual launch option through controller. So if …
Java reactor how to properly start async cancellable sideeffect
I’m trying to write something using reactor which I know how to write using completable futures. I’m getting “Calling subscribe in non-blocking scope” warning in it. My goal is to call turnOn() with a timeout which should call turnOff() after the timeout. If turnOn() is called again it should cancel the old timeout and wait for a new timeout. How
How to create a generic wrapper for just any method call?
I want to create a helper method that can wrap/convert just any sync method call into an async Mono. The following is close, but shows an error: This is my code: Answer First you call Mono.fromCallable with a Callable<Callable<? extend T>>. You need to change the call like this: Mono.fromCallable(supplier). Then you will have a problem because Mono.fromCallable will be
Continue consuming subsequent records in reactor kafka after deserialization exception
I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages. Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn and using combination of (doOnError and onErrorContinue), however, it helped
How to handle file access in a reactive environment
I’m refactoring some blocking code to reactive (using Reactor). I think most of the methods in the java.nio.file.Files class are blocking. Is it right if I replace a method like: with: I think it’s necessary, especially when cheap HDDs are used. Or does a library exists in Reactor for this kind of file manipulation? Answer Generally yes, but your code
Combine two Stream into one Flux
How can I combine two streams Stream<String> into Flux? What I understand is that I might need to use Flux create method to create this but I am not really sure about it: Please help. Answer Concat the Streams into one and then invoke Flux#fromStream: Another way of doing this would be to create a Flux using Flux#fromStream and then
Is ReactorDebugAgent suitable for production?
I read how to enable ReactorDebugAgent here: https://projectreactor.io/docs/core/release/reference/#reactor-tools-debug But I don’t know if it’s feasible to have such debug agent enabled by default on the production environment or only when something unusual happens? Answer The section name, as of 18th Feb 2022, is: 7.4. Production-ready Global Debugging which seems to suggest that it is indeed suitable to be used in
Obtaining a nested objects using Spring Data R2DBC
I’m new to Project Reactor and R2DBC. How to receive and merge Flux<Child> with Mono<Parent> properly using Spring Data R2DBC reactive repositories? Parent: ParentRepository: Child: ChildRepository: ParentPersistenceAdapter: My solution is: Answer Assuming the existence of a withChildren(Flux<Child> children) type method, you can just do: However, this is a bit odd – you wouldn’t usually have a Flux on a DAO