I want to handle Flux
to limit concurrent HTTP requests made by List of Mono
.
When some requests are done (received responses), then service requests another until the total count of waiting requests is 15.
A single request returns a list and triggers another request depending on the result.
At this point, I want to send requests with limited concurrency. Because consumer side, too many HTTP requests make an opposite server in trouble.
I used flatMapMany
like below.
public Flux<JsonNode> syncData() { return service1 .getData(param1) .flatMapMany(res -> { List<Mono<JsonNode>> totalTask = new ArrayList<>(); Map<String, Object> originData = service2.getDataFromDB(param2); res.withArray("data").forEach(row -> { String id = row.get("id").asText(); if (originData.containsKey(id)) { totalTask.add(service1.updateRequest(param3)); } else { totalTask.add(service1.deleteRequest(param4)); } originData.remove(id); }); for (left) { totalTask.add(service1.createRequest(param5)); } return Flux.merge(totalTask); }); }
void syncData() { syncDataService.syncData().????; }
I tried chaining .window(15)
, but it doesn’t work. All the requests are sent simultaneously.
How can I handle Flux
for my goal?
Advertisement
Answer
I am afraid Project Reactor doesn’t provide any implementation of either rate or time limit.
However, you can find a bunch of 3rd party libraries that provide such functionality and are compatible with Project Reactor. As far as I know, resilience4-reactor supports that and is also compatible with Spring and Spring Boot frameworks.
The
RateLimiterOperator
checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, theRateLimiterOperator
could either delay requesting data from the upstream or it can emit aRequestNotPermitted
error to the downstream subscriber.
RateLimiter rateLimiter = RateLimiter.ofDefaults("name"); Mono.fromCallable(backendService::doSomething) .transformDeferred(RateLimiterOperator.of(rateLimiter))
More about RateLimiter module itself here: https://resilience4j.readme.io/docs/ratelimiter