I have a scenario which I have somehow simulated into the following:
I have a list of Callable tasks that are being executed in 4 threads. The execution should return a value(a Map<String, String>
). Each task has an item attribute.
String[] itemList = {"WINDOWS", "MATRIX", "3D", "FACEBOOK", "HOTDOG", "SECRET"};
Random rand = new Random();
ArrayList<CallableTask> taskQueue = new ArrayList<>();
for(int i = 0; i<16; i++){
int x = rand.nextInt(6);
taskQueue.add(new CallableTask(itemList[x]));
}
If for a task I get timeout (I have a Timeout flag in a Map<>
that each Callable returns) or an exception then I want to cancel or remove all the tasks which has the same item attribute or I want to notify the threads to avoid those tasks.
I am using invokeAll() for execution.
ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
List<Future<Map<String, String>>> executionResults = executorService.invokeAll(taskQueue);
Below is my Callable class code:
public class CallableTask implements Callable<Map<String, String>>{
private String item = "";
CallableTask(String input){
super();
item = input;
}
@Override
public Map<String, String> call() throws Exception {
Map<String, String> resultMap = new HashMap<>();
String[] sample = {"Y", "N", "N", "N", "Y"};
Random rand = new Random();
int x = 0;
try{
Thread.sleep(2000);
x = rand.nextInt(5);
resultMap.put("Thread", Thread.currentThread().getName());
int temp = 9/(x-1);
}
catch (Exception e){
throw new Exception("Divide by 0!!");
}
finally{
resultMap.put("Timeout", sample[x]);
resultMap.put("Item", item);
return resultMap;
}
//return resultMap;
}
}
How do I achieve this? I wanted to create a custom ThreadPoolExecutor and try the afterExecute()
or beforeExecute()
hooks to intervene between tasks assuming I could access Future<?>
in there, but only Runnable is allowed in ThreadPoolExecutor. On the otherhand, with Runnable I cant get the results.
Advertisement
Answer
When creating the tasks, share a concurrent structure with each thread so that they can coordinate:
Map<String, Boolean> abort = new ConcurrentHashMap<>();
taskQueue.add(new CallableTask(abort, itemList[x]));
Then, in each task, check the shared state before starting, and update the shared state as appropriate:
@Override
public Map<String, String> call() throws Exception {
if (abort.getOrDefault(item, false)) throw new ExecutionException();
try {
} catch (Exception ex) {
abort.put(item, true);
}