Skip to content
Advertisement

Flux – parallel flatMap with webclient – limit to fixed batched rate

The code I have is this:

return Flux.fromIterable(new Generator()).log()
        .flatMap(
            s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //only request 6 segments in parallel via webClient
        .take(6) //we need only 6 200 OK responses
        .sort();

It just requests HEAD, until first 6 requests are successful.

Parallelization works here, but the problem is that after 1 of the requests is complete, it immediatley triggers next request (to maintain parallelization level of 6). What I need here is to have parallelization level of 6, but in batches. So – trigger 6 requests, wait until all complete, trigger again 6 requests …

This is the output of the log() above:

: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- from here NOT OK; wait until all complete and request(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()

UPDATE

this is what I tried with the buffer:

return Flux.fromIterable(new Generator())
        .buffer(6)
        .flatMap(Flux::fromIterable)
        .log()
        .flatMap(
            s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //only request 6 segments in parallel via webClient
        .take(6)
        .sort();

Advertisement

Answer

OK, It seems I have the code that works. Here I use window:

return Flux.fromIterable(new Generator())
        .window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
        .log()
        .flatMap(
            s -> s.log().flatMap(x -> webClient
                .head()
                .uri(
                    MessageFormat.format(
                        "/my-{2,number,#00}.xml",
                        channel, timestamp, x))
                .exchangeToMono(r -> Mono.just(r.statusCode()))
                .filter(HttpStatus::is2xxSuccessful)
                .map(r -> x), 6), 1)  //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
        .take(6, true) //pass through only 6 elements (cancel afterwards)
        .sort();
Advertisement