JohnShen's Blog.

[回顾并发基础] ReentrantLock & ReentrantReadWriteLock & StampedLock

字数统计: 2.9k阅读时长: 11 min
2019/09/22 Share

ReentrantLock

描述

ReentrantLock,支持重进入的锁,表示该锁能够支持一个线程对资源的重复加锁。该锁还支持公平和非公平选择。如果是公平锁,唤醒阻塞队列中节点的策略就是谁等待的时间长就唤醒谁;如果是非公平锁,则不提供这个公平保证,默认是非公平的。(synchronized关键字隐式的支持重进入,且是非公平的。)

AQS 的 state 状态值表示线程获取该锁的可重入次数, 在默认情况下, state的值为 0 表示当前锁没有被任何线程持有。当一个线程第一次获取该锁时会尝试使用CAS设置state 的值为 1 ,如果 CAS 成功则记录该锁的持有者为当前线程。在该线程没有释放锁的情况下第二次获取该锁后,状态值被设置为2 , 这就是可重入次数。在该线程释放该锁时,会尝试使用CAS 让状态值减1, 如果减1 后状态值为0,则当前线程释放该锁。

Lock的可见性保证可以使用ReentrantLock来举例。ReentrantLock 内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值。可以通过 volatile 变量规则、顺序性规则以及传递性规则推导出 前一个线程在加锁时对变量的修改 Happens-Before 于另一个线程的加锁操作。

实现

(1)获取锁

ReentrantLock 的 lock() 方法直接调用 sync.lock() 方法。ReentrantLock 中有继承 AQS 的抽象类Sync,内部有NonfairSyncFairSync分别代表非公平实现和公平实现。在AQS中,我们知道tryAcquire方法是要自己实现的。

先以非公平锁开始,调用 Lock 的线程通过 CAS 设置状态值为1成功后,则表示当前线程获取到了锁, 然后 setExclusiveOwnerThread 设置该锁持有者是当前线程。后续线程会调用 acquire方法,其内部既先调用tryAcquire,从而调用nonfairTryAcquire,这个方法就是获取锁的关键:

  • 当 state 为 0 时,代表锁空闲,尝试 CAS 获取后再设置持有者线程为当前线程,便成功返回。

  • 当 state 非 0 时,即锁已被占用,若持有线程不是当前线程便直接返回失败,后放入 AQS 阻塞队列;若是当前线程,则状态值加一。需要注意的是,nextc < 0 是指可重入次数溢出了。

这里的非公平性体现在何处呢?若线程A持有锁后,线程B持有失败,进入阻塞队列。此时线程A释放锁的同时,线程C到来,其尝试 CAS 成功后,便获取到了锁。或者刚释放锁的线程A想要再次获取锁时,其成功几率也会非常大。

公平锁的tryAcquirenonfairTryAcquire几乎一致,只是多了一个 !hasQueuedPredecessors() ,该方法判断当前线程节点是否有前驱节点。使用公平锁时,任何线程一进来不会让你 CAS 成功就获取到锁,而是进入 tryAcquire中进行判断队列中是否有线程在等待,若没有等待的线程且CAS成功,则代表获取到锁,设置持有者线程后便返回 true。公平锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// NonfairSync 实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS 中的 acquire 方法
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// 代码实现在 Sync 抽象类中,具体实现见下
return nonfairTryAcquire(acquires);
}
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// FairSync 实现
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
// AQS 中的 acquire 方法
acquire(1);
}
// 与 nonfairTryAcquire 的唯一区别在于多了 !hasQueuedPredecessors() 判断
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

(2)释放锁

ReentrantLock.unlock() 调用的是:sync.release(1),方法内会调用tryRelease方法(实现是在Sync中,即公平和非公平都是同一种实现)。该方法中,当前线程非持有者线程立即报错。当检查 state 若减去 release 值为 0 后,则清空持有者线程, 正式设置 state 为 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// in AQS 
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// in ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

ReentrantReadWriteLock

读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升,能够简化读写交互场景的编程方式。

ReentrantReadWriteLock支持公平性/非公平性选择、重进入、锁降级。样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ReadWriteLockTest {

static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

关于读写状态

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写

写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。写锁的释放与ReentrantLock的释放过程基本类似,等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取。读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。

锁降级

锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

锁降级示例:因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}

当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。

锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

示例

18个读线程 + 2个写线程,读写操作都是耗时1s,最终耗时3s。换成 ReentrantLock 则为 20s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
/** 用于计时 */
private static CountDownLatch latch = new CountDownLatch(20);

public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
System.out.println("start read");
Thread.sleep(1000);
return null;
} finally {
latch.countDown();
lock.unlock();
}
}

public void handleWrite(Lock lock) throws InterruptedException {
try {
lock.lock();
System.out.println("start write");
Thread.sleep(1000);
} finally {
latch.countDown();
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnale = () -> {
try {
demo.handleRead(readLock);
// demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writeRunnale = () -> {
try {
demo.handleWrite(writeLock);
// demo.handleWrite(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

Instant i1 = Instant.now();

for (int i = 0; i < 18; i++) {
new Thread(readRunnale).start();
}
for (int i = 18; i < 20; i++) {
new Thread(writeRunnale).start();
}
latch.await();
Instant i2 = Instant.now();
System.out.println(Duration.between(i1, i2).toMillis());
}
}
CATALOG
  1. 1. ReentrantLock
    1. 1.1. 描述
    2. 1.2. 实现
      1. 1.2.1. (1)获取锁
      2. 1.2.2. (2)释放锁
  2. 2. ReentrantReadWriteLock
    1. 2.1. 关于读写状态
    2. 2.2. 写锁的获取与释放
    3. 2.3. 读锁的获取与释放
    4. 2.4. 锁降级
    5. 2.5. 示例