深入理解 Spring 中的 ThreadPoolTaskExecutor 与 ListenableFuture 对象

1 概述

以 jdk1.8 和 Spring Framework 4.3.4.RELEASE 为基准

  1. 本文详细分析 Spring 中的 ThreadPoolTaskExecutor 与 ListenableFutureTask 对象;并且比较 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 之间的区别。
  2. 介绍 ThreadPoolTaskExecutor 的基本使用
  3. 比较 ListenableFuture 与 Future
  4. 深入解析 ListenableFuture 对象

2 ThreadPoolTaskExecutor 对比 ThreadPoolExecutor

  1. ThreadPoolExecutor 是 JDK 自带,ThreadPoolTaskExecutor 是 Spring 在 ThreadPoolExecutor 的基础上进行了一层封装。
  • java.util.concurrent.ThreadPoolExecutor
  • org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
  1. 相比 ThreadPoolExecutor,ThreadPoolTaskExecutor 增加了 submitListenable 方法,该方法返回 ListenableFuture 接口对象,该接口完全抄袭了 google 的 guava。
  2. ListenableFuture 接口对象,增加了线程执行完毕后成功和失败的回调方法。从而避免了 Future 需要以阻塞的方式调用 get,然后再执行成功和失败的方法。
  • ThreadPoolTaskExecutor 中具体的初始化线程池方法如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);

}

if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}

this.threadPoolExecutor = executor;
return executor;
}

3 如何使用 ThreadPoolTaskExecutor

具体使用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(2);
executorService.setMaxPoolSize(2);
executorService.setKeepAliveSeconds(60);
executorService.setQueueCapacity(Integer.MAX_VALUE);
executorService.initialize();

executorService.submitListenable(() -> {

// 休息 5 秒,模拟工作的情况
TimeUnit.SECONDS.sleep(5);
// 通过抛出 RuntimeException 异常来模拟异常
//throw new RuntimeException("出现异常");
return true;

}).addCallback(data -> logger.info("success,result = {}", data), ex -> logger.info("**异常信息**:{}", ExceptionUtils.getExceptionMsg(ex)));
}
  1. 通过 new 获取 ThreadPoolTaskExecutor 对象
  2. 通过 setCorePoolSize 等方法可以配置线程池相关参数
  3. 最重要的是通过 initialize 方法完成线程池初始化,否则抛出:java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized 异常
  4. 调用 submitListenable 方法返回 ListenableFuture 对象
  5. 调用 ListenableFuture 对象的 addCallback 方法增加 成功和失败的回调处理
  6. 其中成功的回调对象实现了 SuccessCallback 接口,其中的方法为:void onSuccess(T result)
  7. 其中失败的回调对象实现了 FailureCallback 接口,其中的方法为:void onFailure(Throwable ex)

4 比较 ListenableFuture 与 Future

  • org.springframework.util.concurrent.ListenableFuture
  • java.util.concurrent.Future

ListenableFuture 接口继承 Future 接口,并增加了如下两个方法

1
2
3
void addCallback(ListenableFutureCallback<? super T> callback);

void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

5 深入解析 ListenableFuture 对象

首先看看下面这张图

ListenableFutureTask 类图

  1. ThreadPoolTaskExecutor 的 submitListenable 方法,传入一个 Runnable 或者 Callable 对象,实际上 Runnable 或者 Callable 对象被包装到 ListenableFutureTask 对象中,然后提交到 ExecutorService 对象,最后返回的是 ListenableFutureTask 对象,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
executor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
  1. 在 ListenableFutureTask 中可以发现其继承了 FutureTask 对象并实现了 ListenableFuture 对象, 其中 FutureTask 对象中的 run 是最终线程执行的方法,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
  1. 上面 run 方法中的 set 方法将线程的执行结果通知出去,在 set 方法中可以发现其调用了 finishCompletion 方法,finishCompletion 方法会一直循环判断线程池中的队列的任务是否执行完毕,一旦执行完毕就会调用 done 方法
  2. ListenableFutureTask 重写了 done 方法, 在正常执行完毕的情况下通过 this.callbacks.success(result) 调用成功回调函数,在出现 InterruptedException 异常的情况下既不会调用 成功的回调,也不会调用失败的回调,其他类型的异常出现的时候才会通过 this.callbacks.failure(cause) 调用失败回调函数,ListenableFutureTask 中的 done 方法具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void done() {
Throwable cause;
try {
T result = get();
this.callbacks.success(result);
return;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex;
}
this.callbacks.failure(cause);
}
  • 从上面的流程分析中,可以发现:因为 FutureTask 中重写了 run 方法,所以才实现了线程执行完毕后可以执行回调方法,其中使用了模板方法设计模式。
  • 模板方法设计模式的主要特点在于:在接口中定义方法,在抽象类中实现方法,并定义抽象方法,实现的方法中又调用抽象方法,最终的子类中重写抽象方法。
  • Runnable 和 ListenableFuture 是接口, FutureTask 是抽象类,ListenableFutureTask 是最终的子类

6 ListenableFuture 的好处以及 Future 带来的阻塞问题

  1. ListenableFuture 相比 Future 是不需要知道 执行结果的情况下就可以将 成功或者失败的业务代码 通过回调的方式 预埋,带来的好处就是异步,不需要阻塞当前线程,从而可以提高系统的吞吐量
  2. Future 需要通过 get() 方法阻塞当前线程,在获取线程的执行结果后再根据执行结果编写相关的业务代码

下面通过一个例子来演示下:

  1. 通过 1 个线程,循环执行 10000 次,使用 submitListenable 方法,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(1);
executorService.setMaxPoolSize(1);
executorService.initialize();

long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
ListenableFuture<Boolean> asyncResult = executorService.submitListenable(() -> {

// 休息5毫秒,模拟执行
TimeUnit.MILLISECONDS.sleep(5);
//throw new RuntimeException("出现异常");
return true;

});
asyncResult.addCallback(data -> {
try {
// 休息3毫秒模拟获取到执行结果后的操作
TimeUnit.MILLISECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}
}, ex -> logger.info("**异常信息**:{}", ExceptionUtils.getExceptionMsg(ex)));
}
System.out.println(String.format("总结耗时:%s", System.currentTimeMillis() - start));
}

通过测试3次,总计耗时在 50 - 60 毫秒之间

  1. 通过 1 个线程,循环执行 10000 次,使用 submit 方法和 Future 对象,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(1);
executorService.setMaxPoolSize(1);
executorService.initialize();

long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
Future<Boolean> future = executorService.submit(() -> {
try {
// 休息5毫秒,模拟执行
TimeUnit.MILLISECONDS.sleep(5);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
});

try {
// 以阻塞的方式获取执行结果
Boolean result = future.get();
// logger.info(String.format("执行结果:%s", result));
// 休息3毫秒模拟获取到执行结果后的操作
TimeUnit.MILLISECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}

}
System.out.println(String.format("总结耗时:%s", System.currentTimeMillis() - start));
}

通过测试3次,总计耗时基本在 87000 毫秒以上

Buy me a cup of coffee