详解 JUC 之 TransferQueue 接口以及 LinkedTransferQueue 实现类

1 概述

  • java.util.concurrent.TransferQueue

TransferQueue

TransferQueue 接口继承 BlockingQueue 接口,也属于阻塞队列。

主要用于协调多个生产者线程和消费者线程的消息传递,关键是通过 transfer 方法来阻塞生产者线程,只有当 生产者线程 的消息被 消费者线程消费后,才能继续通过 transfer 方法生产新的消息。

2 关键点

  1. 生产者线程通过 transfer 方法来生产消息,并且会被阻塞
  2. 消费者线程通过 take 或者 poll 方法来消费消息
  3. 队列中的消息数量 小于或者等于 生产者线程数

3 方法

  1. boolean tryTransfer(E e) 生产者线程尝试向队列中存放消息,如果有消费者线程就立即返回 true, 否则返回 false,不会阻塞。
  2. void transfer(E e) throws InterruptedException 生产者线程向队列中存放消息,如果没有消费者线程就被会阻塞。
  3. boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException 生产者线程向队列中存放消息,如果没有消费者线程就被会阻塞,并且可以设置阻塞的最长时间。
  4. boolean hasWaitingConsumer() 判断是否有消费者线程尝试通过 take 或者 poll 方法从队列中取出数据。
  5. int getWaitingConsumerCount() 返回 消费者线程 数量

4 实现生产者和消费者

  • java.util.concurrent.LinkedTransferQueue

LinkedTransferQueue

这里以 TransferQueue 的实现类 LinkedTransferQueue 进行举例

4.1 生产者线程对象

生产者线程向容器存入指定总量的 消息

  • TransferQueueProducer, 通过 transfer 或者 tryTransfer 方法向队列生产消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.ckjava.xutils.WaitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TransferQueue;
import java.util.stream.IntStream;

/**
* 生产者线程向容器存入指定总量的 消息
*
*/
public class TransferQueueProducer implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(TransferQueueProducer.class);

// 容器
private TransferQueue<String> queue;
// 生产指定的数量
private Integer numberOfElementsToProduce;

public TransferQueueProducer(TransferQueue<String> queue, Integer numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}

@Override
public void run() {
IntStream.range(0, numberOfElementsToProduce).forEach(i -> {
try {
// 向队列中存入任务
String msg = String.format("task_%s", i);
logger.info(String.format("生产者线程生产消息:%s", msg));
//queue.tryTransfer(msg, 3, TimeUnit.SECONDS); // 设置最长阻塞时间
queue.transfer(msg); // 阻塞当前线程
WaitUtils.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

4.2 消费者线程对象

消费者线程向容器 消费 指定总量的 消息

  • TransferQueueConsumer 通过 take 方法从队列中消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TransferQueue;
import java.util.stream.IntStream;

/**
* 消费者线程向容器 消费 指定总量的 消息
*
*/
public class TransferQueueConsumer implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(TransferQueueConsumer.class);

private TransferQueue<String> queue;
private Integer numberOfElementsToProduce;

public TransferQueueConsumer(TransferQueue<String> queue, Integer numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}

@Override
public void run() {
IntStream.range(0, numberOfElementsToProduce).forEach(i -> {

try {
// 从队列中获取任务,并执行任务
String msg = queue.take();
logger.info(String.format("消费者线程从队列中消费消息:%s", msg));
} catch (Exception e) {
e.printStackTrace();
}

});
}
}

4.3 测试

  1. 启动两个线程
  2. 生产者生产 3 个消息
  3. 消费者消费这 3 个消息

具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

import com.ckjava.synchronizeds.appCache.WaitUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

/**
* 启动两个线程 <br>
* 生产者生产 3 个消息 <br>
* 消费者消费这 3 个消息 <br>
*/
public class TestTransferQueue {
public static void main(String[] args) {

TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(new TransferQueueProducer(transferQueue, 3));
executorService.submit(new TransferQueueConsumer(transferQueue, 3));

executorService.shutdown();
WaitUtils.waitUntil(() -> executorService.isTerminated(), 100000l);
}
}
  • 输出如下
1
2
3
4
5
6
16:47:14.747 [pool-1-thread-1] INFO  c.c.T.TransferQueueProducer - 生产者线程生产消息:task_0
16:47:14.749 [pool-1-thread-2] INFO c.c.T.TransferQueueConsumer - 消费者线程从队列中消费消息:task_0
16:47:14.749 [pool-1-thread-1] INFO c.c.T.TransferQueueProducer - 生产者线程生产消息:task_1
16:47:14.749 [pool-1-thread-2] INFO c.c.T.TransferQueueConsumer - 消费者线程从队列中消费消息:task_1
16:47:14.749 [pool-1-thread-1] INFO c.c.T.TransferQueueProducer - 生产者线程生产消息:task_2
16:47:14.749 [pool-1-thread-2] INFO c.c.T.TransferQueueConsumer - 消费者线程从队列中消费消息:task_2
  • 从输出上可以看出
  1. 队列中最多的时候只有一个消息,并且和生产者线程数 相等
  2. 生产者线程生成消息后,消费者线程在消费完消息后,生产者线程才继续生产新的消息。

5 使用场景分析

常见的队列只有当队列满的的时候或者为空的时候才会阻塞 生产者线程 或者 消费者线程。如果当队列的容量为 Integer.MAX_VALUE, 生产者线程不停的存放消息数据,而没有 消费者线程消费,这种场景下就可能会导致 OutOfMemory 错误。

TransferQueue 就非常适合这种场景,其中的 transfer 方法可以确保 生产者线程 生产的消息在被消费后 才能够 继续生产新的消息。

6 参考

Buy me a cup of coffee