1 概述
- java.util.concurrent.TransferQueue
TransferQueue 接口继承 BlockingQueue 接口,也属于阻塞队列。
主要用于协调多个生产者线程和消费者线程的消息传递,关键是通过 transfer 方法来阻塞生产者线程,只有当 生产者线程 的消息被 消费者线程消费后,才能继续通过 transfer 方法生产新的消息。
2 关键点
- 生产者线程通过 transfer 方法来生产消息,并且会被阻塞
- 消费者线程通过 take 或者 poll 方法来消费消息
- 队列中的消息数量 小于或者等于 生产者线程数
3 方法
boolean tryTransfer(E e)
生产者线程尝试向队列中存放消息,如果有消费者线程就立即返回 true, 否则返回 false,不会阻塞。void transfer(E e) throws InterruptedException
生产者线程向队列中存放消息,如果没有消费者线程就被会阻塞。boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException
生产者线程向队列中存放消息,如果没有消费者线程就被会阻塞,并且可以设置阻塞的最长时间。boolean hasWaitingConsumer()
判断是否有消费者线程尝试通过 take 或者 poll 方法从队列中取出数据。int getWaitingConsumerCount()
返回 消费者线程 数量
4 实现生产者和消费者
- java.util.concurrent.LinkedTransferQueue
这里以 TransferQueue 的实现类 LinkedTransferQueue 进行举例
4.1 生产者线程对象
生产者线程向容器存入指定总量的 消息
- TransferQueueProducer, 通过 transfer 或者 tryTransfer 方法向队列生产消息
1 | import com.ckjava.xutils.WaitUtils; |
4.2 消费者线程对象
消费者线程向容器 消费 指定总量的 消息
- TransferQueueConsumer 通过 take 方法从队列中消费消息
1 | import org.slf4j.Logger; |
4.3 测试
- 启动两个线程
- 生产者生产 3 个消息
- 消费者消费这 3 个消息
具体如下
1 |
|
- 输出如下
1 | 16:47:14.747 [pool-1-thread-1] INFO c.c.T.TransferQueueProducer - 生产者线程生产消息:task_0 |
- 从输出上可以看出
- 队列中最多的时候只有一个消息,并且和生产者线程数 相等
- 生产者线程生成消息后,消费者线程在消费完消息后,生产者线程才继续生产新的消息。
5 使用场景分析
常见的队列只有当队列满的的时候或者为空的时候才会阻塞 生产者线程 或者 消费者线程。如果当队列的容量为 Integer.MAX_VALUE
, 生产者线程不停的存放消息数据,而没有 消费者线程消费,这种场景下就可能会导致 OutOfMemory 错误。
TransferQueue 就非常适合这种场景,其中的 transfer 方法可以确保 生产者线程 生产的消息在被消费后 才能够 继续生产新的消息。