Skip to content
Advertisement

How to limit concurrent http requests with Mono & Flux

I want to handle Flux to limit concurrent HTTP requests made by List of Mono.

When some requests are done (received responses), then service requests another until the total count of waiting requests is 15.

A single request returns a list and triggers another request depending on the result.

At this point, I want to send requests with limited concurrency. Because consumer side, too many HTTP requests make an opposite server in trouble.

I used flatMapMany like below.

public Flux<JsonNode> syncData() {
    return service1
        .getData(param1)
        .flatMapMany(res -> {
                List<Mono<JsonNode>> totalTask = new ArrayList<>();
                Map<String, Object> originData = service2.getDataFromDB(param2);
                res.withArray("data").forEach(row -> {
                       String id = row.get("id").asText();
                       if (originData.containsKey(id)) {
                           totalTask.add(service1.updateRequest(param3));
                       } else {
                            totalTask.add(service1.deleteRequest(param4));
                       }
                       originData.remove(id);
                });
                for (left) {
                    totalTask.add(service1.createRequest(param5));
                }
                return Flux.merge(totalTask);
        });
}
void syncData() {
    syncDataService.syncData().????;
}

I tried chaining .window(15), but it doesn’t work. All the requests are sent simultaneously.

How can I handle Flux for my goal?

Advertisement

Answer

I am afraid Project Reactor doesn’t provide any implementation of either rate or time limit.

However, you can find a bunch of 3rd party libraries that provide such functionality and are compatible with Project Reactor. As far as I know, resilience4-reactor supports that and is also compatible with Spring and Spring Boot frameworks.

The RateLimiterOperator checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, the RateLimiterOperator could either delay requesting data from the upstream or it can emit a RequestNotPermitted error to the downstream subscriber.

RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
    .transformDeferred(RateLimiterOperator.of(rateLimiter))

More about RateLimiter module itself here: https://resilience4j.readme.io/docs/ratelimiter

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement