Hey I just started to dive into reactive programming and I can’t figure out how to process a List<String>
in a synchronous way after flatMap
.
What I am trying to achieve:
- Get domain list from external service
- Filter out existing domains in database
- Make another http request to external service to get domain info. These calls should be executed in a synchronous way with
Duration.ofSeconds(new Random().nextInt(5))
delay applied one after another, likeThread.sleep
and not in parallel way. - Store new domain data into database
client.fetchDomainList() // Flux<DomainListResponse> .flatMap(response -> Flux.fromIterable(response.getDomainList())) .filter(hostname -> ! domainRepository.existsByHostname(hostname)) .collectList() // this next bit is sketchy. // flatMap will doesn't work here (in my mind) // because it will apply delay in parallel way .map(list -> Flux.fromIterable(list) .map(hostname -> client.fetchDomainInfo(hostname) .delayElements(Duration.ofSeconds(new Random().nextInt(3)))) .map(domainInfoResponse -> { return new Domain(); }) ) .flatMap(s -> { // s -> Flux<Domain> here. Should be simply Domain // save into database? })
Advertisement
Answer
You might want to wrap that synchronous call in a Mono.fromCallable
which yields 0-1 items depending on whether the condition is met or not.
Mono<T> checkDomain(T domain) { // consider adding (if appropriate) subscribeOn to switch to another scheduler suitable for this, such as parallel() perhaps return Mono.fromCallable(() -> { boolean filterMatches = ... your blocking HTTP request ... return filterMatches; }).flatMap(filterMatches -> filterMatches ? Mono.just(domain) : Mono.empty()); } ... client.fetchDomainList() // Flux<DomainListResponse> .flatMap(response -> Flux.fromIterable(response.getDomainList())) // we are mapping 1 domain to 0-1 domain, depending on whether the condition is met. .delayElements(/* your Duration */) .flatMap(domain -> checkDomain(domain))
Docs for Flux.delayElements
:
/** * Delay each of this {@link Flux} elements ({@link Subscriber#onNext} signals) * by a given {@link Duration}. Signals are delayed and continue on the * {@link Schedulers#parallel() parallel} default Scheduler, but empty sequences or * immediate error signals are not delayed. * * <p> * <img class="marble" src="doc-files/marbles/delayElements.svg" alt=""> * * @param delay duration by which to delay each {@link Subscriber#onNext} signal * @return a delayed {@link Flux} * @see #delaySubscription(Duration) delaySubscription to introduce a delay at the beginning of the sequence only */ public final Flux<T> delayElements(Duration delay) { return delayElements(delay, Schedulers.parallel()); }
I made a small PoC to test this out myself and came up with the following
static Mono<Integer> verifyDomain(Integer t) { return Mono.fromCallable(() -> { Thread.sleep(100); // simulate HTTP request return t % 2 == 0; // is even? }).flatMap(condition -> condition ? Mono.just(t) : Mono.empty()); } public static void main(String[] args) throws InterruptedException { Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .delayElements(Duration.ofSeconds(5)) .flatMap(i -> verifyDomain(i)) .subscribe(i -> System.out.println(LocalDateTime.now() + " - " + i)); Thread.sleep(100000); // just to stay alive }
And the output is as expected:
2021-10-19T16:58:28.644437500 - 2 2021-10-19T16:58:38.898397500 - 4 2021-10-19T16:58:49.130059 - 6 2021-10-19T16:58:59.376111100 - 8 2021-10-19T16:59:09.603982600 - 10