回顾 Java Lock 基础,详细介绍了
AbstractQueuedSynchronizer
的实现;并说明了Condition
的原理以及LockSupport
的底层作用。
1. Lock
Lock接口
1 | // 获取锁,当锁获得后,从该方法处返回 |
Lock与synchronized相比拥有的优势
- 可响应中断。synchronized 会在线程持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,而且一旦发生死锁,就没有任何机会来唤醒阻塞的线程。
lockInterruptibly()
可响应中断信号,可以在锁的获取过程中中断当前线程,调用thread.interrupt()方法能够中断线程的等待过程。 - 支持超时。
tryLock(long time, TimeUnit unit)
方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。 - 非阻塞获取锁。
tryLock()
方法会尝试获取锁,如果获取成功则返回true,如果获取失败则返回 false,也就说这个方法无论如何都会立即返回。
除此之外,synchronized 会隐式获取锁,且会自动释放锁,而 Lock 是显式的获取锁,手工释放(synchronized 固化了锁的获取和释放,而 Lock相对的提供了更好的拓展性);Lock可以绑定多个条件:Condition,await,signal。synchronized的wait,notify只可以实现一种条件。
2. AbstractQueuedSynchronizer
队列同步器AbstractQueuedSynchronizer
是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待与唤醒等底层操作细节。同步器的主要使用方式是继承,子类被推荐为自定义同步组件的静态内部类。
a. 基础描述
双向队列
AQS 内部通过 head
和 tail
记录队首和队尾元素,队列元素的类型是 Node。在 Node 类型中:prev
和next
分别记录当前节点的前驱和后置节点;thread
变量用来存放进入AQS队列里的线程;waitStatus
记录当前线程的等待状态:CANCELLED 线程被取消、SIGNAL 线程需要被唤醒、CONDITION 线程在条件队列里面等待、PROPAGATE 释放共享资源时需要通知其他节点。
状态信息
在AQS 中维持了一个单一的状态信息state
,可以通过getState
、setState
、compareAndSetState
函数修改其值。
根据state 是否属于一个线程,操作state 的方式分为独占方式和共享方式。在独占方式下获取和释放资源使用的方法为 acquire / acquirelnterruptibly / release
。在共享方式下获取和释放资源的方法为: acquireShared / acquireSharedInterruptibly / releaseShared
。
对于ReentrantLock 的实现来说, state 可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock 来说, state 的高16位表示读状态,也就是获取该读锁的次数,低16 位表示获取到写锁的线程的可重入次数;对于semaphore 来说, state 用来表示当前可用信号的个数:对于CountDownlatch 来说,state 用来表示计数器当前的值。
Condition
AQS 有个内部类ConditionObject , 用来结合锁实现线程同步。ConditionObject 可以直接访问 AQS 对象内部的变量,比如 state 状态值和 AQS 队列。ConditionObject 是条件变量, 每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后被阻塞的线程,这个条件队列的头、尾元素分别为自 firstWaiter 和 lastWaiter 。
b. AQS可重写的方法
方法名称 | 描述 |
---|---|
tryAcquire(int arg) | 独占获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期状态,然后再进行CAS设置同步状态。 |
tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之失败 |
tryReleaseShared(int arg) | 共享式释放同步状态 |
isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
AQS提供的模板方法:
c. AQS使用示例
示例来自AQS官方文档( non-reentrant mutual exclusion lock)。
1 | class Mutex implements Lock { |
d. 独占式与共享式
如果论述AQS完整实现比较麻烦的话,单独描述下独占式和共享式资源获取与释放,也是可以看清楚AQS的核心的。
独占式:如果一个线程获取到了资源,会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock, 当一个线程获取了ReerrantLock 的锁后,AQS 内部会首先使用CAS 操作把 state 状态值从0变为1 ,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,会把状态值从1变为2 ,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS 阻塞队列后挂起。
共享式:当多个线程去请求资源时通过 CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。比如Semaphore 信号量, 当一个线程通过acquire方法获取信号量时,会首先看当前信号量个数是否满足需要, 不满足则把当前线程放入阻塞队列,如果满足则通过 CAS 获取信号量。
e. AQS实现
独占式资源获取与释放
(1)获取
1 | public final void acquire(int arg) { |
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:
- tryAcquire(int arg),该方法保证线程安全的获取同步状态,具体就是设置状态变量 state 的值,成功则直接返回;
- 如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node node)方法将该节点加入到AQS阻塞队列的尾部;
- 最后调用acquireQueued方法,使用 LockSupport.park(this) 挂起自己,使得该节点以“死循环”的方式获取同步状态。
1 | private Node addWaiter(Node mode) { |
设置尾节点:加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
第一个线程想要获取锁时,直接返回,不会进入阻塞队列的逻辑。第二个线程获取锁时,在addWaiter中的enq方法中,会在阻塞队列头部塞入一个新的空Node(也可以成为“哨兵Node”),第二个线程封装为Node后,放在该空Node后面;之后第二个线程在acquireQueued方法中的shouldParkAfterFailedAcquire中返回false,但waitStatus置为了SIGNAL,再次循环进入shouldParkAfterFailedAcquire方法,由于waitStatus等于SIGNAL,返回true,故使用LockSupport阻塞第二个线程。
当第一个线程返回时,调用release方法,此时head还是之前的空Node,unparkSuccessor(head)会唤醒head的后继者,即第二个线程。第二个线程在acquireQueued方法中的tryAcquire方法中成功返回,会将head设为自己。
若在第二个线程之后,还有n个线程在队列中,在第二个线程release时,会唤醒head的后继者,即队列后续线程,以此类推。
(2)释放
同步器的release(int arg)方法可以释放同步状态)。
1 | public final boolean release(int arg) { |
设置首节点:首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证。
在unparkSuccessor
的方法中,会对节点的后续节点线程调用LockSupport.unpark
方法以激活后续线程。被激活的线程则使用tryAcquire 尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
共享式资源获取与释放
(1)获取
当线程调用acquireShared
获取共享资源时,会首先使用tryAcquireShared
尝试获取资源, 具体是设置状态变量 state 的值,成功则直接返回(成功获取到同步状态并退出自旋的条件就是 tryAcquireShared 方法返回值大于等于0),失败则将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并使用LockSupport.park(this)
方法挂起自己。
1 | public final void acquireShared(int arg) { |
(2)释放
线程调用releaseShared
时会尝试使用tryReleaseShared
操作释放资源,这里是设置状态变量 state 的值,然后在unparkSuccessor
中使用LockSupport.unpark
激活后续线程 。被激活的线程则使用tryAcquireShared
查看当前状态变量 state 的值是否能满足自己的需要,满足则继续向下运行,否则还是会被放入AQS 队列并被挂起。
1 | public final boolean releaseShared(int arg) { |
共享式前N个可以获取资源的线程直接返回, 不会进入阻塞队列的逻辑。后续线程无法获取到资源时,在doAcquireShared的第一行就是addWaiter方法,后续大体逻辑和独占式类似。
自定义同步组件 TwinsLock
TwinsLock: 只允许至多两个线程访问,超过两个线程的访问将被阻塞
1 | public class TwinsLock implements Lock { |
3. Condition
API
1 | void await() throws InterruptedException; |
- await() 方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似;
- awaitUninterruptibly() 方法与 await() 方法基本相同,但是它并不会在等待过程中响应中断;
- singal() 用于唤醒一个在等待中的线程;
- singalAll() 方法会唤醒所有在等待中的线程,和Obejct.notify()方法很类似;
使用范式
1 | Lock lock = new ReentrantLock(); |
官方示例范式中 lock() 并没有放在 try {} 中,应该是因为不排除获取锁(比如自定义锁)的过程中产生异常后,却调用了unlock() 方法。
和Object.wait()和notify()方法一样,当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。
在Condition.signal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。因此,在signal()方法调用之后,一般需要释放相关的锁,谦让给被唤醒的线程,让它可以继续执行。
ArrayBlockingQueue 中 Lock 及 Condition 的使用
1 | final Object[] items; |
Condition实现
lock.newCondition() 的作用其实是 new 了一个在AQS 内部声明的 ConditionObject 对象, ConditionObject 是AQS 的内部类,可以访问 AQS 内部的变量和方法。每个Condition对象都包含着一个队列(等待队列),用来存放调用条件变量的 await() 方法时被阻塞的线程。这个条件队列和 AQS 的同步队列不是一回事,该队列是Condition对象实现等待/通知功能的关键。
等待队列
一个 Condition 包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列。
await()
调用Condition的await()方法(或者以await开头的方法),会构造一个类型为Node.CONDITION 的node 节点,然后将该节点插入条件队列末尾并释放锁,同时当前线程也会被阻塞挂起。之后唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
1 | public final void await() throws InterruptedException { |
signal()
signal() 方法会进行isHeldExclusively()检查,确认当前线程是获取锁的线程。接着会把条件队列里头节点从条件队列里面移除并放入 AQS 的同步队列里面,然后使用 LockSupport 唤醒节点中的线程。
l o ck() 方法获取锁) , 在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并
被唤醒后的线程,将从 await() 方法中的while循环中退出(isOnSyncQueue方法返回true,节点已经在同步队列中),进而调用同步器的 acquireQueued() 方法加入到获取同步状态的竞争中。
1 | public final void signal() { |
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
4. LockSupport
线程阻塞类工具,可以在线程内任何位置让线程阻塞。1. 和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。2. 和Object.wait()相比,它不需要先获得某个对象的锁。3. 也不会抛出InterruptedException异常。
LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法,它们实现了一个限时的等待。 unpark(Thread thread)方法原来唤醒一个被阻塞的线程。
1 | public class LockSupportDemo { |
虽然无法保证unpark()方法发生在park()方法之后。但以上代码自始至终都可以正常的结束,不会因为park()方法而导致线程永久性的挂起。这是因为LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。而unpark()则使得一个许可变为可用(但是和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。这个特点使得:即使unpark()操作发生在park()之前,它也可以使下一次的park()操作立即返回。
如果使用park(Object)函数,还可以为当前线程设置一个阻塞对象。这个阻塞对象会出现在线程Dump中。这样在分析问题时,就更加方便了。
LockSupport.park()还能支持中断响应,如果其他线程调用了阻塞线程的interrupt()方法,设置了中断标志,则阻塞线程会返回。和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,我们可以从Thread.interrupted()等方法获得中断标记。
5. 使用锁的最佳实践
并发大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践:
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
关于最后一条同样是尽量要去遵守:调用其他对象的方法,实在是太不安全了,也许其他方法里面有线程 sleep() 的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,其他类的方法可能也会加锁,然后双重加锁就可能导致死锁。
6. Case
Q:下面的代码是否存在死锁?
A:不出现死锁,但会出现活锁(主动将资源释放给他人使用,那么就会出现资源不断在两个线程中跳动,而没有一个线程可以同时拿到所有资源而正常执行)。
1 | class Account { |
Reference
- 《Java并发编程的艺术》
- 《Java并发编程之美》
- https://time.geekbang.org/column/article/87779