JohnShen's Blog.

[回顾并发基础] JUC中的并发队列

字数统计: 2.1k阅读时长: 8 min
2019/10/05 Share

SynchronousQueue: 一个不存储元素的阻塞队列;

JUC中的并发队列

阻塞队列可以用于多线程间的数据共享,支持阻塞的插入(队列慢时,队列开始阻塞插入元素的线程,直到队列不满)以及阻塞的移除(队列为空时,获取元素线程会等待队列变为非空)。

方法/处理方式 抛出异常 返回特殊值 阻塞 超时退出
插入 add offer(e) put offer(e, time, unit)
移除 remove poll() take poll(time, unit)
查看 element peek \ \

常见的阻塞队列包括:

Class 描述
ArrayBlockingQueue 由数组结构组成的有界阻塞队列
LinkedBlockingQueue 由链表结构组成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 使用优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列
LinkedTransferQueue 由链表结构组成的无界阻塞队列
LinkedBlockingDeque 由链表结构组成的双向阻塞队列

ArrayBlockingQueue

数组实现,有界,阻塞由ReentrantLock实现,默认非公平。其代码是生产者消费者的典型实现。

其内部有一个数组 items 用来存放队列元素,putIndex 表示入队元素下标, takeIndex 表示出队下标, count 统计队列元素个数。且这些变量并没有使用 volatile 修饰,这也是因为访问这些变量都是在锁块内,加锁己经保证了锁块内变量的内存可见性了。另外, notEmpty 、notFull 条件变量用来进行出、入队的 Condition。

LinkedBlockingQueue

链表实现,无界,默认及最大长度为Integer.MAX_VALUE,但用户可以指定容量,所以一定长度上来讲,LinkedBlockingQueue是有界阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Head of linked list. Invariant: head.item == null */
transient Node<E> head;
/** Tail of linked list. Invariant: last.next == null */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue 使用单向链表实现,有分别代表队首和对尾的 Node,有一个记录队列元素个数的原子变量 count 和容量大小 capacity。此外,有两个 ReentrantLock,takeLock 用来控制只有一个线程可以从队列头部获取元素,putLock 用来控制同时只能有一个线程在队列尾部添加元素,故出队与入队可以同时进行。notEmpty 和 notFull 是对应两个锁的 Condition。

下面的代码看起来和 ArrayBlockingQueue 类似,实则思路完全不同。由于有两个锁,每个锁对应一个 Condition,需重点关注 Condition 的 await 和 signal 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void put(E e) throws InterruptedException {
// null元素抛空指针错误
if (e == null) throw new NullPointerException();
int c = -1;
// 构造节点,开始获取锁
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队满则等待
while (count.get() == capacity) {
notFull.await();
}
// 进队列
enqueue(node);
// 递增计数
c = count.getAndIncrement();
// 若还可以继续放,则继续唤醒下一个
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 队列里至少有一个元素了,通知非空
if (c == 0)
signalNotEmpty();
}

private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取锁
takeLock.lockInterruptibly();
try {
// 无元素,挂起
while (count.get() == 0) {
notEmpty.await();
}
// 出队 计数减一
x = dequeue();
c = count.getAndDecrement();
// 若还可以继续拿的,则继续唤醒下一个
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 队列里至少有一个元素被拿掉,队列必不满,通知非满
if (c == capacity)
signalNotFull();
return x;
}

private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

PriorityBlockingQueue

带优先级的无界阻塞队列。

SynchronousQueue

描述

SynchronousQueue 是一个 不存储元素的阻塞队列,每一个 put 操作必须等待一个 take 操作,否则不能继续添加队列。其可以看作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程,队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue 吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

由于没有容量,所以对应 peek、contains、clear、isEmpty 等方法其实是无效的:clear 是不执行任何操作的,contains始终返回false,peek始终返回null,peek方法直接返回null。

SynchronousQueue 直接使用 CAS 实现线程的安全访问,队列的实现策略分为公平模式和非公平模式,默认情况下线程采用非公平模式访问队列,是否公平可以在构造函数中指定。

方法 描述
void put(E o) 向队列提交一个元素,阻塞直到其他线程take或者poll此元素.
boolean offer(E o) 向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者”碰巧”有poll操作,那么将返回true,否则返回false
E take() 获取并删除一个元素,阻塞直到有其他线程offer/put
boolean poll() 获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者”碰巧”有offer操作,那么将返回true,否则返回false
E peek() 总会返回null,硬编码

比如,先offer一个值(其返回值是false),3s后进行 take 操作,则会一直阻塞。传统阻塞队列如 ArrayBlocking 则会在 offer 时返回 true,take 时成功拿到值并返回。

实现

实现层面最关键的是E transfer(E e, boolean timed, long nanos)方法,put 和 take 方法都会使用该方法。当参数 e 为非空时,表示当前值传递给一个消费者,若为空则表示当前操作需要请求一个数据。timed 参数决定是否存在 timeout 时间, nanos 决定了 timeout 的时长。如果返回值非空,则表示数据已经接受或者正常提供,如果为空,则表示失败(超时或者中断)。

由于大量 CAS 代码,很难进行深层次细节理解,这里简单说明实现概述。主要使用LockSupport来控制线程,使用CAS来控制 head 元素。

公平模式

内部实现为 TransferQueue,它有一个 head 和 tail 指针。生成新 TransferQueue 对象时时,head 和 tail 指向新生成的虚拟节点;线程 put1 执行 put()操作,由于当前没有配对的消费线程,创建节点,并阻塞进程(线程是 QNode 中的属性),tail 指向 put1;线程 put2 执行了 put() 操作,跟前面一样,执行阻塞操作,tail 指向 put2。当来了一个线程 take1,执行了 take 操作时,tail 指向的 put2 跟 take1 线程配对,但此时需要唤醒的是 put1线程(即队尾匹配队头出队)。

1
2
3
4
5
6
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
}

非公平模式

内部实现为TransferStack,实现中用 head 指针指向栈顶。线程 put1 执行 put() 操作,由于当前没有配对的消费线程,所以 put1 线程会入栈,创建一个 node 为 DATA 的元素,并阻塞进程;线程 put2 再次执行 put() 操作,跟前面一样,put2 线程会建成节点并入栈;此时来了线程 take1,执行了 take 操作,这时候发现栈顶为 put2 线程,会创建 FULFILL 节点把 take1 线程入栈,并尝试设置 put2 为 take1 的匹配节点(tryMatch),设置成功会激活等待线程,两个节点弹出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
}

/* Modes for SNodes, ORed together in node fields */
/** Node represents an unfulfilled consumer */
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;

Reference

https://www.iteye.com/blog/shift-alt-ctrl-1840385

https://zhuanlan.zhihu.com/p/29227508

CATALOG
  1. 1. JUC中的并发队列
    1. 1.1. ArrayBlockingQueue
    2. 1.2. LinkedBlockingQueue
    3. 1.3. PriorityBlockingQueue
    4. 1.4. SynchronousQueue
      1. 1.4.1. 描述
      2. 1.4.2. 实现
  2. 2. Reference