I create a thread pool to deal with the task, after deal with the task,I find I can not add and start the other task? How to fix it? If I change the executor by executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("timeOutThread"));
,It will run OK.But if the task is canceled because of timeout,do this will cause memory leak?
ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory( "timeOutThread")); List<Callable<String>> callableList = new ArrayList<>(); IntStream.range(0, 3).forEach(index -> { callableList.add(() -> request(index)); }); List<Future<String>> futureList = executor.invokeAll(callableList, 1, TimeUnit.SECONDS); for (int i = 0; i < futureList.size(); i++) { Future<String> future = futureList.get(i); try { list.add(future.get()); } catch (CancellationException e) { log.info("timeOut task:{}", i); } catch (Exception e) { log.error(e.getMessage(), e); } Thread.sleep(1000); callableList.clear(); IntStream.range(0, 3).forEach(index -> { callableList.add(() -> request(index)); }); long start1 = System.currentTimeMillis(); // Task java.util.concurrent.FutureTask@5fdcaa40 rejected from java.util.concurrent.ThreadPoolExecutor@6dc17b83 List<Future<String>> futureList = executor.invokeAll(callableList, 1, TimeUnit.SECONDS); for (int i = 0; i < futureList.size(); i++) { Future<String> future = futureList.get(i); try { list.add(future.get()); } catch (CancellationException e) { log.info("timeOut Task:{}", i); } catch (Exception e) { log.error(e.getMessage(), e); } } public String request() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(200000); return "A"; }
Advertisement
Answer
I can reproduce your error with the following simplified code:
import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws InterruptedException { var pool = new ThreadPoolExecutor( 3, 3, 0L, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>(1)); try { System.out.println("Executing first batch of tasks..."); submitTasks(pool); System.out.println("Executing second batch of tasks..."); submitTasks(pool); } finally { pool.shutdown(); } } private static void submitTasks(ExecutorService executor) throws InterruptedException { var tasks = new ArrayList<Callable<Void>>(3); for (int i = 0; i < 3; i++) { tasks.add(() -> { Thread.sleep(2_000L); return null; }); } executor.invokeAll(tasks); } }
Which gives this output:
Executing first batch of tasks... Executing second batch of tasks... Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@87aac27[Not completed, task = Main$$Lambda$1/0x0000000800c009f0@816f27d] rejected from java.util.concurrent.ThreadPoolExecutor@3e3abc88[Running, pool size = 3, active threads = 0, queued tasks = 1, completed tasks = 3] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247) at Main.submitTasks(Main.java:32) at Main.main(Main.java:18)
The problem is caused by the queue being too small. The LinkedBlockingQueue
is created with a capacity of only one, but three tasks are submitted to the pool at once. So, the question becomes, why does it only fail on the second call to invokeAll
?
The reason has to do with how ThreadPoolExecutor
is implemented. When an instance is first created, none of the core threads are started. They are started lazily as tasks are submitted. When the submission of a task results in a thread being started, that task is immediately given to the thread. The queue is bypassed. So, when invokeAll
is called the first time, each of the three core threads is started and none of the tasks go into the queue.
But the second time invokeAll
is called, the core threads have already been started. Since submitting the tasks does not result in a thread being created, the tasks are put into the queue. But the queue is too small, resulting in the RejectedExecutionException
. If you’re wondering why the core threads are still alive despite the keep-alive time being set to zero, that’s because core threads are not allowed to die due to timeout by default (you have to explicitly configure the pool to allow that).
You can see this lazily-started-core-threads is the cause of the problem by modifying the code slightly. Simply adding:
pool.prestartAllCoreThreads();
Just after creating the pool causes the first call to invokeAll
to now fail with a RejectedExecutionException
.
Also, if you change the queue’s capacity from one to three, then the RejectedExecutionException
will no longer occur.
Here’s some relevant documentation:
Any
BlockingQueue
may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
- If fewer than
corePoolSize
threads are running, theExecutor
always prefers adding a new thread rather than queuing.- If
corePoolSize
or more threads are running, theExecutor
always prefers queuing a request rather than adding a new thread.- If a request cannot be queued, a new thread is created unless this would exceed
maximumPoolSize
, in which case, the task will be rejected.