Skip to content

Tag: project-reactor

Spring Integration: How to consume QueueChannel on-demand as backpressure aware reactive (Flux) pipeline

I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline. Prefetch n Messages from the queue. Make async calls to external system like fun remoteCall(message: Message): Mono<Void>. Pull next message(s) from the queue after external calls will complete. I don’t want to use Poller with scheduler to pull messages from the queue ahead of time.

Reduce a flux to mono using data from flux

I have this scenario. I have one paginated API which gives me the data for last 12 months. The response of the API is like: Now I have to collect all the data and then calculate the sum of all the netReceivable and return as a Mono<CompanyIncome>. This pojo is like To do this I have written something like: Now

Continue consuming subsequent records in reactor kafka after deserialization exception

I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages. Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn and using combination of (doOnError and onErrorContinue), however, it helped

How to handle file access in a reactive environment

I’m refactoring some blocking code to reactive (using Reactor). I think most of the methods in the java.nio.file.Files class are blocking. Is it right if I replace a method like: with: I think it’s necessary, especially when cheap HDDs are used. Or does a library exists in Reactor for this kind of file manipulation? Answer Generally yes, but your code

Combine two Stream into one Flux

How can I combine two streams Stream<String> into Flux? What I understand is that I might need to use Flux create method to create this but I am not really sure about it: Please help. Answer Concat the Streams into one and then invoke Flux#fromStream: Another way of doing this would be to create a Flux using Flux#fromStream and then

Is ReactorDebugAgent suitable for production?

I read how to enable ReactorDebugAgent here: But I don’t know if it’s feasible to have such debug agent enabled by default on the production environment or only when something unusual happens? Answer The section name, as of 18th Feb 2022, is: 7.4. Production-ready Global Debugging which seems to suggest that it is indeed suitable to be used in

Obtaining a nested objects using Spring Data R2DBC

I’m new to Project Reactor and R2DBC. How to receive and merge Flux<Child> with Mono<Parent> properly using Spring Data R2DBC reactive repositories? Parent: ParentRepository: Child: ChildRepository: ParentPersistenceAdapter: My solution is: Answer Assuming the existence of a withChildren(Flux<Child> children) type method, you can just do: However, this is a bit odd – you wouldn’t usually have a Flux on a DAO
