SynchronousQueue
: 一个不存储元素的阻塞队列;
JUC中的并发队列
阻塞队列可以用于多线程间的数据共享,支持阻塞的插入(队列慢时,队列开始阻塞插入元素的线程,直到队列不满)以及阻塞的移除(队列为空时,获取元素线程会等待队列变为非空)。
方法/处理方式 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add | offer(e) | put | offer(e, time, unit) |
移除 | remove | poll() | take | poll(time, unit) |
查看 | element | peek | \ | \ |
常见的阻塞队列包括:
Class | 描述 |
---|---|
ArrayBlockingQueue | 由数组结构组成的有界阻塞队列 |
LinkedBlockingQueue | 由链表结构组成的有界阻塞队列 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 使用优先级队列实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 由链表结构组成的双向阻塞队列 |
ArrayBlockingQueue
数组实现,有界,阻塞由ReentrantLock
实现,默认非公平。其代码是生产者消费者的典型实现。
其内部有一个数组 items 用来存放队列元素,putIndex 表示入队元素下标, takeIndex 表示出队下标, count 统计队列元素个数。且这些变量并没有使用 volatile 修饰,这也是因为访问这些变量都是在锁块内,加锁己经保证了锁块内变量的内存可见性了。另外, notEmpty 、notFull 条件变量用来进行出、入队的 Condition。
LinkedBlockingQueue
链表实现,无界,默认及最大长度为Integer.MAX_VALUE
,但用户可以指定容量,所以一定长度上来讲,LinkedBlockingQueue
是有界阻塞的。
1 | /** The capacity bound, or Integer.MAX_VALUE if none */ |
LinkedBlockingQueue
使用单向链表实现,有分别代表队首和对尾的 Node,有一个记录队列元素个数的原子变量 count 和容量大小 capacity。此外,有两个 ReentrantLock,takeLock 用来控制只有一个线程可以从队列头部获取元素,putLock 用来控制同时只能有一个线程在队列尾部添加元素,故出队与入队可以同时进行。notEmpty 和 notFull 是对应两个锁的 Condition。
下面的代码看起来和 ArrayBlockingQueue 类似,实则思路完全不同。由于有两个锁,每个锁对应一个 Condition,需重点关注 Condition 的 await 和 signal 操作。
1 | public void put(E e) throws InterruptedException { |
1 | public E take() throws InterruptedException { |
PriorityBlockingQueue
带优先级的无界阻塞队列。
SynchronousQueue
描述
SynchronousQueue
是一个 不存储元素的阻塞队列,每一个 put 操作必须等待一个 take 操作,否则不能继续添加队列。其可以看作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程,队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue 吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
由于没有容量,所以对应 peek、contains、clear、isEmpty 等方法其实是无效的:clear 是不执行任何操作的,contains始终返回false,peek始终返回null,peek方法直接返回null。
SynchronousQueue 直接使用 CAS 实现线程的安全访问,队列的实现策略分为公平模式和非公平模式,默认情况下线程采用非公平模式访问队列,是否公平可以在构造函数中指定。
方法 | 描述 |
---|---|
void put(E o) | 向队列提交一个元素,阻塞直到其他线程take或者poll此元素. |
boolean offer(E o) | 向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者”碰巧”有poll操作,那么将返回true,否则返回false |
E take() | 获取并删除一个元素,阻塞直到有其他线程offer/put |
boolean poll() | 获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者”碰巧”有offer操作,那么将返回true,否则返回false |
E peek() | 总会返回null,硬编码 |
比如,先offer一个值(其返回值是false),3s后进行 take 操作,则会一直阻塞。传统阻塞队列如 ArrayBlocking 则会在 offer 时返回 true,take 时成功拿到值并返回。
实现
实现层面最关键的是E transfer(E e, boolean timed, long nanos)
方法,put 和 take 方法都会使用该方法。当参数 e 为非空时,表示当前值传递给一个消费者,若为空则表示当前操作需要请求一个数据。timed 参数决定是否存在 timeout 时间, nanos 决定了 timeout 的时长。如果返回值非空,则表示数据已经接受或者正常提供,如果为空,则表示失败(超时或者中断)。
由于大量 CAS 代码,很难进行深层次细节理解,这里简单说明实现概述。主要使用LockSupport
来控制线程,使用CAS
来控制 head 元素。
公平模式:
内部实现为 TransferQueue
,它有一个 head 和 tail 指针。生成新 TransferQueue 对象时时,head 和 tail 指向新生成的虚拟节点;线程 put1 执行 put()操作,由于当前没有配对的消费线程,创建节点,并阻塞进程(线程是 QNode 中的属性),tail 指向 put1;线程 put2 执行了 put() 操作,跟前面一样,执行阻塞操作,tail 指向 put2。当来了一个线程 take1,执行了 take 操作时,tail 指向的 put2 跟 take1 线程配对,但此时需要唤醒的是 put1线程(即队尾匹配队头出队
)。
1 | static final class QNode { |
非公平模式:
内部实现为TransferStack
,实现中用 head 指针指向栈顶。线程 put1 执行 put() 操作,由于当前没有配对的消费线程,所以 put1 线程会入栈,创建一个 node 为 DATA 的元素,并阻塞进程;线程 put2 再次执行 put() 操作,跟前面一样,put2 线程会建成节点并入栈;此时来了线程 take1,执行了 take 操作,这时候发现栈顶为 put2 线程,会创建 FULFILL 节点把 take1 线程入栈,并尝试设置 put2 为 take1 的匹配节点(tryMatch),设置成功会激活等待线程,两个节点弹出。
1 | static final class SNode { |