Skip to content

How to emit filtered out error if flux is empty

I have synchronous code that I want to make non-blocking with reactor.

I want to call in parallel different URI, the calls can return a response, an error or nothing.

There are 3 cases:

  • A request returns a response, I return it without waiting for the other requests to complete. If other requests returned errors earlier, I drop the errors
  • At least one request returned an error and no other requests returned a response, I return an error
  • All requests returned nothing (no response, no error), I return nothing

I already did this in a synchronous manner :

    AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    String responseBody = Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .blockFirst();
    
    return Pair.of(responseBody, responseException.get());

I want to remove the blocking call and return a Mono

From what I understand, I’m kind of keeping a state with the error that happened and I can’t have a state with reactor.

How can I keep track of errors that happened but not emit them as I want to see if other request emit result later ?

Does this version works ?

AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    return Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .next()
            .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));

Will the AtomicReference be closed over like a closure ?

Answer

I think flatMapDelayError may achieve what you are looking for, see this example:

int concurrency = 10;
int prefetch = 1;

Flux.just(
        Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
        Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
        Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
    .flatMapDelayError(
        request -> request,
        concurrency,
        prefetch)
    .next()
    .doOnNext(result -> System.out.println("Result: " + result))

In this example the error completes first, but the -DelayError operator holds it, then fast completes and is emitted as the result. Finally slow never completes, due to .next() canceling the remaining requests because we have a result.