I have synchronous code that I want to make non-blocking with reactor.
I want to call in parallel different URI, the calls can return a response, an error or nothing.
There are 3 cases:
- A request returns a response, I return it without waiting for the other requests to complete. If other requests returned errors earlier, I drop the errors
- At least one request returned an error and no other requests returned a response, I return an error
- All requests returned nothing (no response, no error), I return nothing
I already did this in a synchronous manner :
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.blockFirst();
return Pair.of(responseBody, responseException.get());
I want to remove the blocking call and return a Mono
From what I understand, I’m kind of keeping a state with the error that happened and I can’t have a state with reactor.
How can I keep track of errors that happened but not emit them as I want to see if other request emit result later ?
Does this version works ?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
Will the AtomicReference be closed over like a closure ?
Advertisement
Answer
I think flatMapDelayError
may achieve what you are looking for, see this example:
int concurrency = 10;
int prefetch = 1;
Flux.just(
Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
.flatMapDelayError(
request -> request,
concurrency,
prefetch)
.next()
.doOnNext(result -> System.out.println("Result: " + result))
In this example the error
completes first, but the -DelayError
operator holds it, then fast
completes and is emitted as the result. Finally slow
never completes, due to .next()
canceling the remaining requests because we have a result.