详解线程池工具类 Executors 的使用以及背后的 ThreadPoolExecutor 类

1 概述

  • java.util.concurrent.Executors

本文详细介绍 Executors 的使用以及背后的 ThreadPoolExecutor 类,并且详解介绍了 ThreadPoolExecutor 类的构造参数,通过这些构造参数可以理解线程池的基本运作原理。具体内容如下

  1. Executors 的四种线程池的创建
  2. ThreadPoolExecutor 构造参数介绍
  3. workQueue 工作队列
  4. RejectedExecutionHandler 拒绝执行策略接口

2 Executors 的四种线程池

2.1 newFixedThreadPool

创建一个固定线程数的线程池,任务队列中的任务数为 int 的最大值。 具体如下

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • corePoolSize 和 maximumPoolSize 参数一致
  • keepAliveTime 为 0
  • workQueue 为 LinkedBlockingQueue,意味着任务队列中的任务数为 int 的最大值
  • RejectedExecutionHandler 为 AbortPolicy,意味着如果队列中的任务满了,还有新增的任务,就会抛出 RejectedExecutionException 异常

2.2 newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • corePoolSize 为 0, maximumPoolSize 为 int 的最大值
  • keepAliveTime 为 60 s, 意味着工作队列中的任务数为 0 的情况下,活动线程最终都会被销毁,
  • workQueue 为 SynchronousQueue,意味着它没有实际的容量,最终取决于向线程池中提交的任务数
  • RejectedExecutionHandler 为 AbortPolicy

2.3 newScheduledThreadPool

创建一个固定线程数 线程池,支持定时及周期性任务执行。

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
  • corePoolSize 为 指定的固定线程数, maximumPoolSize 为 int 的最大值
  • keepAliveTime 为 0
  • workQueue 为 DelayedWorkQueue,这是实现定时功能的关键所在
  • RejectedExecutionHandler 为 AbortPolicy

2.4 newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,具体如下

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • corePoolSize 为 1, maximumPoolSize 为 1
  • keepAliveTime 为 0
  • workQueue 为 LinkedBlockingQueue
  • RejectedExecutionHandler 为 AbortPolicy

3 ThreadPoolExecutor 构造参数介绍

  • java.util.concurrent.ThreadPoolExecutor

从上面可见,Executors 提供的固定线程池都是由 ThreadPoolExecutor 构造函数创建的。

ThreadPoolExecutor 有几个重要的成员变量:

  1. keepAliveTime : poolSize > corePoolSize 的空闲线程的最大存活时间
  2. allowCoreThreadTimeOut:[true/false] 是否允许核心线程超时退出
  3. poolSize:当前线程池中的线程数
  4. corePoolSize:核心线程数
  5. maximumPoolSize:线程池中最大的线程数
  6. workQueue:任务队列,用于存放 Runnable 对象
  7. ThreadFactory:创建 Thread 对象
  8. RejectedExecutionHandler:任务数超过队列最大容量后对新增任务的处理策略

3.1 线程池任务调度

具体的流程如下

ThreadPoolExecutor

3.2 poolSize、corePoolSize、maximumPoolSize 三者的关系是如何的呢

当新提交一个任务时:

  • 如果 poolSize 小于 corePoolSize,新增加一个线程处理新的任务。
  • 如果 poolSize 等于 corePoolSize,新任务会被放入阻塞队列等待。
  • 如果工作队列的容量达到上限,且这时 poolSize 小于 maximumPoolSize,新增线程来处理任务。
  • 如果工作队列队列满了,且 poolSize 等于 maximumPoolSize,那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 来处理新的任务

3.3 allowCoreThreadTimeOut 的详细介绍

  • 如果该值为 false,且 poolSize <= corePoolSize,线程池都会保证这些核心线程处于存活状态,不会超时退出。
  • 如果为true,则不论 poolSize 的大小,都允许超时退出。
  • 如果 poolSize > corePoolSize,则该参数不论 true 还是false,都允许超时退出。
  • 相关判断如下:
1
(poolSize > corePoolSize || allowCoreThreadTimeOut)

4 workQueue 工作队列

上面提到阻塞队列的饱和,那么这个饱和值是多少呢?

通过上面的代码可以看到

4.1 LinkedBlockingQueue

  • 定长线程池和单线程线程都使用 LinkedBlockingQueue,而 LinkedBlockingQueue 默认的大小是 int的最大值,如下:
1
2
3
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

4.2 DelayedWordQueue

  • 计划线程池使用的是 DelayedWordQueue,它默认大小是 16,但是可以动态增长,最大值则是int的最大值,如下:
1
2
3
4
5
6
7
8
9
10
11
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}

4.3 SynchronousQueue

缓存线程池使用的则是 SynchronousQueue,这个比较特殊没有所谓的饱和值,而且前面也看到了缓存线程池的 corePoolSize 默认是 0。

所以它新建一个线程与 SynchronousQueue 的机制有关,具体跟 工作队列 中的任务数有关。

5 RejectedExecutionHandler 拒绝执行策略接口

5.1 AbortPolicy

  1. AbortPolicy:终止策略是默认的饱和策略,当队列满时,会抛出一个 RejectExecutionException 异常,实际项目中可以捕获这个异常,根据需求编写自己的处理代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

/**
* 总是抛出异常.
*
* @param r 工作队列中即将执行的 runnable 对象
* @param e ThreadPoolExecutor 对象
* @throws 抛出 RejectedExecutionException
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}

5.2 DiscardPolicy

  1. DiscardPolicy:策略会悄悄抛弃该任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

/**
* 什么都不做
*
* @param r 工作队列中即将执行的 runnable 对象
* @param e ThreadPoolExecutor 对象
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

5.3 DiscardOldestPolicy

该策略将会抛弃下一个将要执行的任务,如果此策略配合优先队列 PriorityBlockingQueue,该策略将会抛弃优先级最高的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

/**
* 从工作队列中抛出下一个将要执行的任务,并执行参数中的任务
*
* @param r 工作队列中即将执行的 runnable 对象
* @param e ThreadPoolExecutor 对象
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

5.4 CallerRunsPolicy

调用者运行策略,该策略不会抛出异常,不会抛弃任务,而是将任务回退给调用者线程执行(调用 execute/submit 方法的线程),由于任务需要执行一段时间,所以在此期间不能提交任务,从而使工作线程有时间执行正在执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }

/**
*
* 在调用 execute/submit 方法的线程线程中执行任务,并且线程池没有被关闭
*
* @param r 工作队列中即将执行的 runnable 对象
* @param e ThreadPoolExecutor 对象
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

6 测试验证

6.1 CallerRunsPolicy 策略

任务数在超过工作队列最大容量的情况下采用 CallerRunsPolicy 的策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 任务数在超过工作队列最大容量的情况下采用 CallerRunsPolicy 的策略
@Test
public void test1() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
3,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
new MyThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

IntStream.range(0, 5).forEach(i -> executor.execute(new Task(String.valueOf(i))));

// 等待任务执行完毕
WaitUtils.waitUntil(() -> executor.isTerminated(), 100000l);
}

// 任务对象
private static class Task implements Runnable {

private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
// 模拟执行任务
WaitUtils.sleep(500);
logger.info(String.format("%s finish task_%s", Thread.currentThread().getName(), taskName));
}
}

// 自定义线程工程
private static class MyThreadFactory implements ThreadFactory {
private final SecurityManager s = System.getSecurityManager();
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "test-pool-" + poolNumber.getAndIncrement() + "-thread-";

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
logger.info(String.format("%s create", t.getName()));
return t;
}
}
  • 输出如下
1
2
3
4
5
6
7
8
18:50:10.266 [main] INFO  c.c.ExecuteService.TestExecutors - test-pool-1-thread-1 create
18:50:10.269 [main] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-2 create
18:50:10.269 [main] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-3 create
18:50:10.770 [test-pool-1-thread-3] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-3 finish task_3
18:50:10.770 [test-pool-1-thread-1] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-1 finish task_0
18:50:10.770 [main] INFO c.c.ExecuteService.TestExecutors - main finish task_4
18:50:10.771 [test-pool-1-thread-2] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-2 finish task_1
18:50:11.272 [test-pool-1-thread-3] INFO c.c.ExecuteService.TestExecutors - test-pool-1-thread-3 finish task_2
  • 从输出可见:在采用 CallerRunsPolicy 的策略下,task_4 由 main 线程执行了。

7 java.util.concurrent.RejectedExecutionException 异常

出现 RejectedExecutionException 异常有两种原因:

  1. 线程池关闭以后,再次提交任务
1
2
3
4
5
6
7
8
9
10
@Test
public void test2() {
ExecutorService service = Executors.newCachedThreadPool();
//执行下面的会发生RejectedExecutionException
service.shutdown();
service.submit(new Task("1"));

// 等待任务执行完毕
WaitUtils.waitUntil(() -> service.isTerminated(), 100000l);
}
  • 具体如下
1
2
3
4
5
6
7
8
9
10
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@4d15107f rejected from java.util.concurrent.ThreadPoolExecutor@50378a4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.ckjava.ExecuteService.TestExecutors.test2(TestExecutors.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
  1. 提交线程的数量大于最大线程数+任务队列中排队的个数
1
2
3
4
5
6
7
8
@Test
public void test3() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,3,30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
IntStream.range(0, 5).forEach(i -> executor.execute(new Task(String.valueOf(i))));

// 等待任务执行完毕
WaitUtils.waitUntil(() -> executor.isTerminated(), 100000l);
}

这里我们创建了一个线程池,最大线程数是 3,任务队列中允许排队的线程个数是1,然后提交了5个任务,由于此时提交的任务个数大于最大线程树和排队的个数总和,所以发生了异常。

Buy me a cup of coffee