ReentrantLock 描述 ReentrantLock,支持重进入的锁,表示该锁能够支持一个线程对资源的重复加锁 。该锁还支持公平和非公平选择 。如果是公平锁,唤醒阻塞队列中节点的策略就是谁等待的时间长就唤醒谁;如果是非公平锁,则不提供这个公平保证,默认是非公平的。(synchronized关键字隐式的支持重进入,且是非公平的。)
AQS 的 state 状态值表示线程获取该锁的可重入次数, 在默认情况下, state的值为 0 表示当前锁没有被任何线程持有。当一个线程第一次获取该锁时会尝试使用CAS设置state 的值为 1 ,如果 CAS 成功则记录该锁的持有者为当前线程。在该线程没有释放锁的情况下第二次获取该锁后,状态值被设置为2 , 这就是可重入次数。在该线程释放该锁时,会尝试使用CAS 让状态值减1, 如果减1 后状态值为0,则当前线程释放该锁。
Lock的可见性保证可以使用ReentrantLock来举例。ReentrantLock 内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值。可以通过 volatile 变量规则、顺序性规则以及传递性规则推导出 前一个线程在加锁时对变量的修改 Happens-Before 于另一个线程的加锁操作。
实现 (1)获取锁 ReentrantLock 的 lock() 方法直接调用 sync.lock() 方法。ReentrantLock 中有继承 AQS 的抽象类Sync
,内部有NonfairSync
,FairSync
分别代表非公平实现和公平实现。在AQS中,我们知道tryAcquire
方法是要自己实现的。
先以非公平锁开始,调用 Lock 的线程通过 CAS 设置状态值为1成功后,则表示当前线程获取到了锁, 然后 setExclusiveOwnerThread
设置该锁持有者是当前线程。后续线程会调用 acquire
方法,其内部既先调用tryAcquire
,从而调用nonfairTryAcquire
,这个方法就是获取锁的关键:
这里的非公平性体现在何处呢?若线程A持有锁后,线程B持有失败,进入阻塞队列。此时线程A释放锁的同时,线程C到来,其尝试 CAS 成功后,便获取到了锁。或者刚释放锁的线程A想要再次获取锁时,其成功几率也会非常大。
公平锁的tryAcquire
和nonfairTryAcquire
几乎一致,只是多了一个 !hasQueuedPredecessors()
,该方法判断当前线程节点是否有前驱节点。使用公平锁时,任何线程一进来不会让你 CAS 成功就获取到锁,而是进入 tryAcquire
中进行判断队列中是否有线程在等待,若没有等待的线程且CAS成功,则代表获取到锁,设置持有者线程后便返回 true。公平锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
(2)释放锁 ReentrantLock.unlock() 调用的是:sync.release(1)
,方法内会调用tryRelease
方法(实现是在Sync
中,即公平和非公平都是同一种实现)。该方法中,当前线程非持有者线程立即报错。当检查 state 若减去 release 值为 0 后,则清空持有者线程, 正式设置 state 为 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
ReentrantReadWriteLock 读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升,能够简化读写交互场景的编程方式。
ReentrantReadWriteLock支持公平性/非公平性选择、重进入、锁降级。样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class ReadWriteLockTest { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); public static final Object get (String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } public static final Object put (String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } public static final void clear () { w.lock(); try { map.clear(); } finally { w.unlock(); } } }
关于读写状态 读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
写锁的获取与释放 写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。写锁的释放与ReentrantLock的释放过程基本类似,等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
读锁的获取与释放 读锁是一个支持重进入的共享锁,它能够被多个线程同时获取。读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
锁降级 锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
锁降级示例:因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void processData () { readLock.lock(); if (!update) { readLock.unlock(); writeLock.lock(); try { if (!update) { update = true ; } readLock.lock(); } finally { writeLock.unlock(); } } try { } finally { readLock.unlock(); } }
当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。
锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
示例 18个读线程 + 2个写线程,读写操作都是耗时1s,最终耗时3s。换成 ReentrantLock 则为 20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class ReadWriteLockDemo { private static Lock lock = new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock(); private static CountDownLatch latch = new CountDownLatch(20 ); public Object handleRead (Lock lock) throws InterruptedException { try { lock.lock(); System.out.println("start read" ); Thread.sleep(1000 ); return null ; } finally { latch.countDown(); lock.unlock(); } } public void handleWrite (Lock lock) throws InterruptedException { try { lock.lock(); System.out.println("start write" ); Thread.sleep(1000 ); } finally { latch.countDown(); lock.unlock(); } } public static void main (String[] args) throws InterruptedException { final ReadWriteLockDemo demo = new ReadWriteLockDemo(); Runnable readRunnale = () -> { try { demo.handleRead(readLock); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable writeRunnale = () -> { try { demo.handleWrite(writeLock); } catch (InterruptedException e) { e.printStackTrace(); } }; Instant i1 = Instant.now(); for (int i = 0 ; i < 18 ; i++) { new Thread(readRunnale).start(); } for (int i = 18 ; i < 20 ; i++) { new Thread(writeRunnale).start(); } latch.await(); Instant i2 = Instant.now(); System.out.println(Duration.between(i1, i2).toMillis()); } }