I have synchronous code that I want to make non-blocking with reactor.
I want to call in parallel different URI, the calls can return a response, an error or nothing.
There are 3 cases:
- A request returns a response, I return it without waiting for the other requests to complete. If other requests returned errors earlier, I drop the errors
- At least one request returned an error and no other requests returned a response, I return an error
- All requests returned nothing (no response, no error), I return nothing
I already did this in a synchronous manner :
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>(); String responseBody = Flux.fromIterable(uriList) .flatMap(url -> repo.sendRequest(uri)) // sendRequest returns a Mono that either emit a response, an error or nothing .onErrorContinue(WebClientResponseException.class, (error, element) -> { var webclientError = (WebClientResponseException) error; responseException.set(webclientError); }) .blockFirst(); return Pair.of(responseBody, responseException.get());
I want to remove the blocking call and return a Mono
From what I understand, I’m kind of keeping a state with the error that happened and I can’t have a state with reactor.
How can I keep track of errors that happened but not emit them as I want to see if other request emit result later ?
Does this version works ?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>(); return Flux.fromIterable(uriList) .flatMap(url -> repo.sendRequest(uri)) // sendRequest returns a Mono that either emit a response, an error or nothing .onErrorContinue(WebClientResponseException.class, (error, element) -> { var webclientError = (WebClientResponseException) error; responseException.set(webclientError); }) .next() .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
Will the AtomicReference be closed over like a closure ?
Advertisement
Answer
I think flatMapDelayError
may achieve what you are looking for, see this example:
int concurrency = 10; int prefetch = 1; Flux.just( Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)), Mono.just("fast").delaySubscription(Duration.ofSeconds(4)), Mono.just("slow").delaySubscription(Duration.ofSeconds(6))) .flatMapDelayError( request -> request, concurrency, prefetch) .next() .doOnNext(result -> System.out.println("Result: " + result))
In this example the error
completes first, but the -DelayError
operator holds it, then fast
completes and is emitted as the result. Finally slow
never completes, due to .next()
canceling the remaining requests because we have a result.