Skip to content
Advertisement

Process List with synchronous delay after flatMap?

Hey I just started to dive into reactive programming and I can’t figure out how to process a List<String> in a synchronous way after flatMap.

What I am trying to achieve:

  1. Get domain list from external service
  2. Filter out existing domains in database
  3. Make another http request to external service to get domain info. These calls should be executed in a synchronous way with Duration.ofSeconds(new Random().nextInt(5)) delay applied one after another, like Thread.sleep and not in parallel way.
  4. Store new domain data into database
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
.filter(hostname -> ! domainRepository.existsByHostname(hostname))
.collectList()

// this next bit is sketchy. 
// flatMap will doesn't work here (in my mind) 
// because it will apply delay in parallel way
.map(list -> Flux.fromIterable(list)
    .map(hostname -> client.fetchDomainInfo(hostname)
        .delayElements(Duration.ofSeconds(new Random().nextInt(3))))
    .map(domainInfoResponse -> {
        return new Domain();
    })
)

.flatMap(s -> { // s -> Flux<Domain> here. Should be simply Domain
 // save into database?
})

Advertisement

Answer

You might want to wrap that synchronous call in a Mono.fromCallable which yields 0-1 items depending on whether the condition is met or not.

Mono<T> checkDomain(T domain) {
   // consider adding (if appropriate) subscribeOn to switch to another scheduler suitable for this, such as parallel() perhaps
   return Mono.fromCallable(() -> {
      boolean filterMatches = ... your blocking HTTP request ...
      return filterMatches;
   }).flatMap(filterMatches -> filterMatches ? Mono.just(domain) : Mono.empty());
}
...

client.fetchDomainList() // Flux<DomainListResponse>
    .flatMap(response -> Flux.fromIterable(response.getDomainList()))
    // we are mapping 1 domain to 0-1 domain, depending on whether the condition is met.
    .delayElements(/* your Duration */)
    .flatMap(domain -> checkDomain(domain)) 

Docs for Flux.delayElements:

/**
 * Delay each of this {@link Flux} elements ({@link Subscriber#onNext} signals)
 * by a given {@link Duration}. Signals are delayed and continue on the
 * {@link Schedulers#parallel() parallel} default Scheduler, but empty sequences or
 * immediate error signals are not delayed.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delayElements.svg" alt="">
 *
 * @param delay duration by which to delay each {@link Subscriber#onNext} signal
 * @return a delayed {@link Flux}
 * @see #delaySubscription(Duration) delaySubscription to introduce a delay at the beginning of the sequence only
 */
public final Flux<T> delayElements(Duration delay) {
    return delayElements(delay, Schedulers.parallel());
}

I made a small PoC to test this out myself and came up with the following

    static Mono<Integer> verifyDomain(Integer t) {
        return Mono.fromCallable(() -> {
            Thread.sleep(100); // simulate HTTP request
            return t % 2 == 0; // is even?
        }).flatMap(condition -> condition ? Mono.just(t) : Mono.empty());
    }

    public static void main(String[] args) throws InterruptedException {
        Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                .delayElements(Duration.ofSeconds(5))
                .flatMap(i -> verifyDomain(i))
                .subscribe(i -> System.out.println(LocalDateTime.now() + " - " + i));

        Thread.sleep(100000); // just to stay alive
    }

And the output is as expected:

2021-10-19T16:58:28.644437500 - 2
2021-10-19T16:58:38.898397500 - 4
2021-10-19T16:58:49.130059 - 6
2021-10-19T16:58:59.376111100 - 8
2021-10-19T16:59:09.603982600 - 10
Advertisement