Skip to content
Advertisement

Resubmit Callable to executorService on exception

My situation

I’m trying to craft a functionality which would execute n (where n >=0) requests to a given endpoint, but I do understand that sometimes that endpoint might not respond due to 500 error or other issue, so I want to repeat my requests to an endpoint (with a small interval in between [not yet implemented]) till I get a response, or till I get an unknown error which would indicate what I can’t repeat, because of other reasons than a crashed server.

So, I’ve tried to implement this piece of functionality using Executors and concurrency provided by Java 11 and it does not work as I want

I can’t resubmit failed tasks till I get all the responses and I don’t know why

I have a method

private void DoMyTasks(List<MyRequest> requests) {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<MyReqResDto> completionService =
            new ExecutorCompletionService<>(executorService);
    for (final MyRequest MyRequest : requests) {
        completionService.submit(new MyCallableRequest(webClient, MyRequest));
    }

    List<MyReqResDto> responses = new ArrayList<>();

    for (int i = 0; i < requests.size(); ++i) {
        try {
            final Future<MyReqResDto> future = completionService.take();
            if (future.get().getEx() != null) {
                completionService.submit(new MyCallableRequest(webClient, future.get().getMyRequest()));
            }
            responses.add(future.get());
        } catch (ExecutionException | InterruptedException e) {
            log.warn("Error"));
        } catch (Exception exception) {
            log.error("Other error");
        } finally {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(10, TimeUnit.MINUTES)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                executorService.shutdownNow();
            }
        }
    }
    responses.size();
}

I’m trying to repeat failed tasks with

if (future.get().getEx() != null) {
   completionService.submit(new MyCallableRequest(webClient, future.get().getMyRequest()));
}

and yet, at the end of execution I don’t get all responses for my requests. What I get is at most 3 to 5 responses when I try executing 10 requests. Why? How to fix it?

My callable class is

public class MyCallableRequest implements Callable<MyReqResDto> {

    private final WebClient webClient;

    private final MyRequest myRequest;

    public MyCallableRequest(WebClient webClient, MyRequest myRequest) {
        this.webClient = webClient;
        this.myRequest = myRequest;
    }

    @Override
    public MyReqResDto call() throws Exception {
        try {
            if (new Random().nextInt(10) % 2 == 0) {
                throw new TestException();
            }
            if (new Random().nextInt(10) % 7 == 0) {
                throw new RuntimeException();
            }
            WebClient.UriSpec<WebClient.RequestBodySpec> uriSpec = webClient.post();
            WebClient.RequestBodySpec bodySpec = uriSpec.uri(
                    s -> s.path("/myEndpoint").build());
            MyRequestDto myMyRequestDto = new MyRequestDto();
            WebClient.RequestHeadersSpec<?> headersSpec =
                    bodySpec.body(Mono.just(myMyRequestDto), MyRequestDto.class);
            ResponseDto responseDto = headersSpec.exchangeToMono(s -> {
                if (s.statusCode().equals(HttpStatus.OK)) {
                    return s.bodyToMono(ResponseDto.class);
                } else if (s.statusCode().is1xxInformational()) {
                    return s.createException().flatMap(Mono::error);
                } else if (s.statusCode().is3xxRedirection()) {
                    return s.createException().flatMap(Mono::error);
                } else if (s.statusCode().is4xxClientError()) {
                    return s.createException().flatMap(Mono::error);
                } else if (s.statusCode().is5xxServerError()) {
                    return s.createException().flatMap(Mono::error);
                } else {
                    return s.createException().flatMap(Mono::error);
                }
                //return null;
            }).block();
            return new MyReqResDto(myRequest, responseDto, null);
        } catch (Exception exception) {
            return new MyReqResDto(myRequest, null, exception);
        }
    }
}

Update NO. 1

I changed a for loop to a while loop according to a comment provided by Slaw and an answer provided by erickson. And this solutions works, meaning that it is hammering an endpoint till all responses are received without any errors. But I’m still not sure it feels that I’m building a sh** tower with this solution. Is there any thread related issues that I should be aware while using executor like this?

    while (true) {
        Future < MyReqResDto > future = null;
        try {
            future = completionService.take();
            if (future.get().getEx() != null /*and check exception if possible to handle, if not break from a loop*/) {
                completionService.submit(new MyCallableRequest(webClient, future.get().getRequestCT());
            } else {
                responseDtos.add(future.get());
            }
        } catch (ExecutionException | InterruptedException e) {
            log.warn("Error while downloading", e.getCause());
            // test if I can recover from these exceptions if no
               break;
        }
    }
    if (responseDtos.size() == requests.size()) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10, TimeUnit.MINUTES)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
        break;
    }

Advertisement

Answer

You are shutting down the executor as soon as you get one response. Perhaps a few more have completed in this time, but you are not allowing time for any others to complete.

Your logic here is wrong. The executor should only be shut down when you are sure no more tasks will be submitted; at soonest, that is after the loop responsible for re-submitting failures.

Here is a simplified view of your code to highlight the premature shutdown:

for (int i = 0; i < requests.size(); ++i) {
  try {
    final Future<MyReqResDto> future = completionService.take();
    ...
    responses.add(future.get());
    ...
  } finally {
    executorService.shutdown();
  }
}
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement