详解 JUC 之 SynchronousQueue

1 概述

  • java.util.concurrent.SynchronousQueue

SynchronousQueue

  • SynchronousQueue 是一种阻塞队列,其实现了 BlockingQueue 接口。
  • 容量为 0,该队列仅用于数据的交换,比如有两个线程,一个新增数据,另一个必须删除数据,否则就会阻塞新增或者删除的线程。
  • 非常适合切换设计,在该设计中,一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务。
  • 尽管 SynchronousQueue 也实现了 Queue 接口,但是真正有用的就只有 take 和 put 这两个方法,并且都会阻塞调用这两个方法的线程。
  • 功能上和 TransferQueue 以及 Exchanger 非常类似。
  • 线程池工具类 Executors 的 newCachedThreadPool 就是用 SynchronousQueue 作为任务队列的。

2 关键点

  1. 用于两个线程间以同步的方式来交换一些信息,事件或任务
  2. 内部通过 TransferQueue 来实现
  3. 容量为 0,调用其中的 size 和 remainingCapacity 方法都返回为 0,isEmpty 方法永远返回 true.

3 举例说明

比如去食堂打饭的时候,你把空餐盘递给阿姨后,阿姨在餐盘上放上饭菜,然后再递给你。

这里把打饭窗口比作 SynchronousQueue 队列,食客先将餐盘放到队列中,阿姨才能取出餐盘,阿姨打饭后将餐盘放到队列,食客才能取出。

3.1 餐盘对象 Plate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 餐盘,存储食物,根据食物进行结算
*/
public class Plate {

private Float rice; // 米饭
private Float soup; // 汤
private Float greens; // 蔬菜
private Float meat; // 肉类

public Plate(Float rice, Float soup, Float greens, Float meat) {
this.rice = rice;
this.soup = soup;
this.greens = greens;
this.meat = meat;
}

public Plate() {
}

/**
* 获取总金额
*
* @return Float
*/
public Float getMoney() {
return rice*1 + soup*0 + greens*2 + meat*5;
}

/**
* 获取食物明细
*
* @return String
*/
public String getFood() {
return String.format("rice=%s, soup=%s, greens=%s, meat=%s", rice, soup, greens, meat);
}

public Float getRice() {
return rice;
}

public void setRice(Float rice) {
this.rice = rice;
}

public Float getSoup() {
return soup;
}

public void setSoup(Float soup) {
this.soup = soup;
}

public Float getGreens() {
return greens;
}

public void setGreens(Float greens) {
this.greens = greens;
}

public Float getMeat() {
return meat;
}

public void setMeat(Float meat) {
this.meat = meat;
}
}

3.2 食客线程对象 EaterWorker

  1. 食客线程先将餐盘交给阿姨,在队列上 put
  2. 阿姨打饭完毕后,食客从队列中 take 出食物
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.SynchronousQueue;

/**
* 食客线程先将餐盘交给阿姨,在队列上 put <br>
* 阿姨打饭完毕后,食客从队列中 take 出食物 <br>
*/
public class EaterWorker implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(EaterWorker.class);

private SynchronousQueue<Plate> foodQueue;
private Plate plate;

public EaterWorker(SynchronousQueue<Plate> foodQueue, Plate plate) {
this.foodQueue = foodQueue;
this.plate = plate;
}

public void run() {
try {
// 食客在窗口上放上餐盘,等待阿姨打饭
logger.info("食客在窗口上放上餐盘,等待阿姨打饭");
foodQueue.put(plate);

// 拿到食物
Plate plate = foodQueue.take();
logger.info("拿回餐盘,刷钱离开, 食物: {}, 共计: {}", plate.getFood(), plate.getMoney());
} catch (Exception e) {
logger.error(this.getClass().getName().concat("has error"), e);
}

}
}

3.3 打饭阿姨线程对象 AuntWorker

  1. 阿姨线程先从队列中拿到食客的餐盘 take
  2. 在餐盘上放置食物后,在把餐盘 put 到队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import com.ckjava.synchronizeds.appCache.WaitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.SynchronousQueue;

/**
* 阿姨线程先拿到食客的餐盘 take
* 在餐盘上放置食物后,在把餐盘 put 到队列中
*/
public class AuntWorker implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(AuntWorker.class);

private SynchronousQueue<Plate> foodQueue;

public AuntWorker(SynchronousQueue<Plate> foodQueue) {
this.foodQueue = foodQueue;
}

public void run() {
try {
Plate plate = foodQueue.take();
logger.info("阿姨拿到餐盘");

// 阿姨在餐盘放置食物
logger.info("阿姨开始在餐盘放置食物");
plate.setRice((float) 1.00);
plate.setSoup((float) 0.55);
plate.setGreens((float) 0.55);
plate.setMeat((float) 0.55);
// 模拟打饭
WaitUtils.sleep(3000);
logger.info("阿姨打饭完毕");
// 递给吃客

logger.info("递给吃客");
foodQueue.put(plate);

} catch (Exception e) {
logger.error(this.getClass().getName().concat("has error"), e);
}
}
}

3.4 测试

在线程池中启动了两个线程,一个是阿姨线程,一个是吃客线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.ckjava.synchronizeds.appCache.WaitUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;

/**
* 1. 在线程池中启动了两个线程,一个是阿姨线程,一个是吃客线程
* 2. 吃客线程将餐盘 put 到队列
* 3. 阿姨线程从队列中取出 take 餐盘
* 4. 阿姨向餐盘中放置食物,然后 put 到队列
* 5. 食客线程从队列中 take 含有食物的餐盘,并结算
*/
public class TestSynchronizedQueue {

private static final SynchronousQueue<Plate> foodQueue = new SynchronousQueue<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 吃客开始打饭
executorService.submit(new EaterWorker(foodQueue, new Plate()));
// 打饭阿姨
executorService.submit(new AuntWorker(foodQueue));

executorService.shutdown();
WaitUtils.waitUntil(() -> executorService.isTerminated(), 100000l);
}
}
  • 输出如下
1
2
3
4
5
6
11:33:08.510 [pool-1-thread-1] INFO  c.c.SynchronizedQueue.EaterWorker - 食客在窗口上放上餐盘,等待阿姨打饭
11:33:08.514 [pool-1-thread-2] INFO c.c.SynchronizedQueue.AuntWorker - 阿姨拿到餐盘
11:33:08.514 [pool-1-thread-2] INFO c.c.SynchronizedQueue.AuntWorker - 阿姨开始在餐盘放置食物
11:33:11.515 [pool-1-thread-2] INFO c.c.SynchronizedQueue.AuntWorker - 阿姨打饭完毕
11:33:11.515 [pool-1-thread-2] INFO c.c.SynchronizedQueue.AuntWorker - 递给吃客
11:33:11.526 [pool-1-thread-1] INFO c.c.SynchronizedQueue.EaterWorker - 拿回餐盘,刷钱离开, 食物=rice=1.0, soup=0.55, greens=0.55, meat=0.55, 共计=4.85
Buy me a cup of coffee