Skip to content
Advertisement

Callable in ExecutorService with shared data

I have a scenario which I have somehow simulated into the following: I have a list of Callable tasks that are being executed in 4 threads. The execution should return a value(a Map<String, String>). Each task has an item attribute.

String[] itemList = {"WINDOWS", "MATRIX", "3D", "FACEBOOK", "HOTDOG", "SECRET"};
Random rand = new Random();
ArrayList<CallableTask> taskQueue = new ArrayList<>();
for(int i = 0; i<16; i++){
    int x = rand.nextInt(6);
    taskQueue.add(new CallableTask(itemList[x]));
}

If for a task I get timeout (I have a Timeout flag in a Map<> that each Callable returns) or an exception then I want to cancel or remove all the tasks which has the same item attribute or I want to notify the threads to avoid those tasks.
I am using invokeAll() for execution.

ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); 
List<Future<Map<String, String>>> executionResults = executorService.invokeAll(taskQueue);

Below is my Callable class code:

public class CallableTask implements Callable<Map<String, String>>{
    
    private String item = "";
    
    CallableTask(String input){
        super();
        item = input;
    }

    @Override
    public Map<String, String> call() throws Exception {
        
        Map<String, String> resultMap = new HashMap<>();
        
        
        String[] sample = {"Y", "N", "N", "N", "Y"};
        Random rand = new Random();
        int x = 0;
        
        try{
            Thread.sleep(2000);
            x = rand.nextInt(5);
            resultMap.put("Thread", Thread.currentThread().getName());
            int temp = 9/(x-1);
        } 
        catch (Exception e){
            throw new Exception("Divide by 0!!");
        }
        finally{
            resultMap.put("Timeout", sample[x]);
            resultMap.put("Item", item);
            
            return resultMap;
        }
        
        //return resultMap;
    }
    
}

How do I achieve this? I wanted to create a custom ThreadPoolExecutor and try the afterExecute() or beforeExecute() hooks to intervene between tasks assuming I could access Future<?> in there, but only Runnable is allowed in ThreadPoolExecutor. On the otherhand, with Runnable I cant get the results.

Answer

When creating the tasks, share a concurrent structure with each thread so that they can coordinate:

Map<String, Boolean> abort = new ConcurrentHashMap<>();
...
taskQueue.add(new CallableTask(abort, itemList[x]));

Then, in each task, check the shared state before starting, and update the shared state as appropriate:

@Override
public Map<String, String> call() throws Exception {
    if (abort.getOrDefault(item, false)) throw new ExecutionException();
    ...
    try {
        ...
    } catch (Exception ex) {
        abort.put(item, true);
    }
    ...
Advertisement