The code I have is this:
return Flux.fromIterable(new Generator()).log() .flatMap( s -> webClient .head() .uri( MessageFormat.format( "/my-{2,number,#00}.xml", channel, timestamp, s)) .exchangeToMono(r -> Mono.just(r.statusCode())) .filter(HttpStatus::is2xxSuccessful) .map(r -> s), 6) //only request 6 segments in parallel via webClient .take(6) //we need only 6 200 OK responses .sort();
It just requests HEAD
, until first 6 requests are successful.
Parallelization works here, but the problem is that after 1 of the requests is complete, it immediatley triggers next request (to maintain parallelization level of 6). What I need here is to have parallelization level of 6, but in batches. So – trigger 6 requests, wait until all complete, trigger again 6 requests …
This is the output of the log()
above:
: | request(6) : | onNext(7) : | onNext(17) : | onNext(27) : | onNext(37) : | onNext(47) : | onNext(57) : | request(1) <---- from here NOT OK; wait until all complete and request(6) : | onNext(8) : | request(1) : | onNext(18) : | request(1) : | onNext(28) : | request(1) : | onNext(38) : | request(1) : | onNext(48) : | request(1) : | onNext(58) : | cancel()
UPDATE
this is what I tried with the buffer:
return Flux.fromIterable(new Generator()) .buffer(6) .flatMap(Flux::fromIterable) .log() .flatMap( s -> webClient .head() .uri( MessageFormat.format( "/my-{2,number,#00}.xml", channel, timestamp, s)) .exchangeToMono(r -> Mono.just(r.statusCode())) .filter(HttpStatus::is2xxSuccessful) .map(r -> s), 6) //only request 6 segments in parallel via webClient .take(6) .sort();
Advertisement
Answer
OK, It seems I have the code that works. Here I use window
:
return Flux.fromIterable(new Generator()) .window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17] .log() .flatMap( s -> s.log().flatMap(x -> webClient .head() .uri( MessageFormat.format( "/my-{2,number,#00}.xml", channel, timestamp, x)) .exchangeToMono(r -> Mono.just(r.statusCode())) .filter(HttpStatus::is2xxSuccessful) .map(r -> x), 6), 1) //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5) .take(6, true) //pass through only 6 elements (cancel afterwards) .sort();