李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
54.ReentrantReadWriteLock实现原理详解
Leefs
2022-11-28 PM
1059℃
0条
[TOC] ### 一、ReentrantReadWriteLock简单流程 #### 1.1 独占获取锁简单流程 ![54.ReentrantReadWriteLock实现原理详解01.png](https://lilinchao.com/usr/uploads/2022/11/1460366531.png) **独占获取锁流程** 1. 独占锁获取(**writeLock**写锁),首先判断是否有线程获取了锁,**是否有线程获取了锁的判断**通过读写锁中通过32位int类型state可以获取,其中低16位表示读锁,高16表示写锁。 2. 有读锁:直接排队阻塞。 3. 有写锁:还需要判断写锁线程是否是自己,如果是自己就是**锁重入**了,如果不是自己说明已经有其他的线程获取锁正在执行,那么当前线程需要排队阻塞。 4. 无锁:直接获取锁,其他抢占的独占锁线程需要排队阻塞,当前线程执行完毕后释放锁通知下一个排队线程获取锁。 #### 1.2 共享获取锁简单流程 ![54.ReentrantReadWriteLock实现原理详解02.png](https://lilinchao.com/usr/uploads/2022/11/3554337000.png) **共享锁获取锁流程** 1. 独占锁获取(**readLock**读锁),首先判断是否有线程获取了锁。 2. 有读锁:当前线程发现此时**读锁状态被占用,说明有线程获取了读锁**。该线程通过cas自旋【死循环】获取到读锁为止。 3. 有写锁:还需要判断持有写锁的线程是否是自己,如果是自己而且此时获取的是读锁会获取锁成功,我们称为**锁降级**,如果不是自己说明此时有其他线程获取了写锁,那么当前线程需要排队阻塞。 4. 无锁:直接获取锁。 ### 二、读写锁原理 下面通过例子看一下ReentrantWriteReadLock是如何控制写锁和读锁的,这里只以非公平锁为例说明。 (1)当没有线程加读锁,也没有线程加写锁时,读写锁如下所示,state高位和低位计数都为0。 ![54.ReentrantReadWriteLock实现原理详解03.jpg](https://lilinchao.com/usr/uploads/2022/11/3399765807.jpg) (2)如果此时线程1想要加写锁,由于此时并没有线程加锁。因此加锁成功,修改state为`0_1`。 ```java public static class WriteLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); } } ``` `acqurie()`为AQS中的方法,如下所示: ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` `tryAcquire()`定义如下: ```java protected final boolean tryAcquire(int acquires) { // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。 // 如果计数饱和,则失败。只有在count不为零时,才可能发生这种情况。 // 否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。 // 如果是这样,请更新状态并设置所有者。 Thread current = Thread.currentThread(); int c = getState(); // 获取写锁被所有线程获取的次数 int w = exclusiveCount(c); // 如果c!=0,表示写锁已经被线程获取 if (c != 0) { // 如果读锁也被获取,而且持锁线程不是当前线程 if ( w == 0 || current != getExclusiveOwnerThread() ) { // 那么当前线程获取锁失败 return false; } // 写锁计数超过低 16 位, 报异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 写锁重入, 获得写锁成功 setState(c + acquires); return true; } // 判断写锁是否该阻塞,或尝试更改写锁计数值 if ( writerShouldBlock() || !compareAndSetState(c, c + acquires)) { // 获得锁失败 return false; } // 获得锁成功 setExclusiveOwnerThread(current); return true; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } ``` **源码分析** > 1. 获取 state , 如果 state 不为 0 则判断是否为当前线程重入获取。 > 2. state 为 0 ,则当前线程 CAS 更新 state,获取锁。 > 3. 更新成功之后绑定当前线程。 > 4. 如果失败会继续调用 AQS 的 acquireQueued,将当前阻塞放在 AQS 队列中。AQS 会不断循环,等待上一个锁释放后,尝试获得锁。 ![54.ReentrantReadWriteLock实现原理详解15.png](https://lilinchao.com/usr/uploads/2022/11/2902330806.png) **如图所示** ![54.ReentrantReadWriteLock实现原理详解04.jpg](https://lilinchao.com/usr/uploads/2022/11/1139830814.jpg) (3)接着线程2想要加读锁,根据加锁规则,此时线程2应该被阻塞。它首先进入读锁的`acquireShared(1)`流程,接着进入`tryAqcquire()`尝试获取锁。因为t1已经加了写锁,方法返回-1表示加锁失败。 > 方法的返回值有三种情况: > > - -1:失败 > - 0:成功,但是不会继续唤醒后继节点 > - 整数:成功,数值表示后续需要唤醒的节点个数 ```java public static class ReadLock implements Lock, java.io.Serializable { public void lock() { sync.acquireShared(1); } } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } ``` 其中尝试获取读锁的方法`tryAcquireShared()`定义如下: ```java protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果此时已有其他的线程持有写锁,并且持锁线程不是当前线程,则读锁获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取读状态位 int r = sharedCount(c); if ( // 读锁不该阻塞 !readerShouldBlock() && // 读锁计数小于最大值 r < MAX_COUNT && // 使用CAS尝试增加state中读锁计数 compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } static int sharedCount(int c) { return c >>> SHARED_SHIFT; } ``` > 不断尝试 for (;;) 获取读锁, 执行过程中无阻塞 ```java final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 无限循环 for (;;) { int c = getState(); // 是否有写锁 if (exclusiveCount(c) != 0) { // 有写锁,但是不是当前线程,直接返回失败 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 需要阻塞 // 没有写锁,确保没有重新获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 当前线程的读锁计数 ThreadLocal 中 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); // 计数结束,remove 掉 if (rh.count == 0) readHolds.remove(); } } // 为 0 直接失败 if (rh.count == 0) return -1; } } // 到达上限 抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS 设置读锁 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } ``` **源码分析** > 1. 首先会一直循环 > 2. 有写锁,但是不是当前线程,直接返回失败。**但是,有写锁,如果是当前线程,是会继续执行的。** > 3. 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。 当存在写锁(独占锁)时,方法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 方法,循环获取资源。doAcquireShared 方法会不断循环,尝试获取读锁,一旦获取到读锁,当前节点会立即唤醒后续节点,后续节点开始尝试获取读锁,依次传播。 ![54.ReentrantReadWriteLock实现原理详解14.png](https://lilinchao.com/usr/uploads/2022/11/990641087.png) **如图所示** ![54.ReentrantReadWriteLock实现原理详解05.jpg](https://lilinchao.com/usr/uploads/2022/11/2153152795.jpg) (4)由于线程2加锁被阻塞,此时会进入`doAcquireShared(1)`流程,也是调用`addWaiter()`添加节点,此时的节点被设置为**Node.SHARED**模式,而且线程2此时仍处于活跃状态。 ![54.ReentrantReadWriteLock实现原理详解06.jpg](https://lilinchao.com/usr/uploads/2022/11/841795380.jpg) ```java private void doAcquireShared(int arg) { // 将当前线程关联到一个 Node 对象上, 模式为共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 再一次尝试获取读锁 int r = tryAcquireShared(arg); // 成功 if (r >= 0) { // r 表示可用资源数, 在这里总是1,允许链式唤醒,继续唤醒下一个SHARED节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if ( // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL) shouldParkAfterFailedAcquire(p, node) && // park 当前线程 parkAndCheckInterrupt() ) { interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } ``` (5)此时,线程2会看他是不是哨兵节点后的第一个节点,如果是,继续调用tryAcquireShared(1)尝试加锁。 但由于线程1还没有释放写锁,加锁依然会失败。 线程2会在doAcquireShared()内的死循环中继续循环一次,将它的前驱节点的waitStatus改为-1。 然后再次调用tryAcquireShared(1)尝试加锁,如果还是失败,则在parkAndCheckInterrupt()处被阻塞,不再尝试加锁。 ![54.ReentrantReadWriteLock实现原理详解07.jpg](https://lilinchao.com/usr/uploads/2022/11/1784934313.jpg) (6)线程2加锁失败后,假设又有线程3想要加读锁,线程4想要加写锁。但是由于线程1仍然持有写锁,它们加锁也会失败,进入等待队列。 ![54.ReentrantReadWriteLock实现原理详解08.jpg](https://lilinchao.com/usr/uploads/2022/11/3083321653.jpg) ```java private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark for (;;) { Node h = head; // 队列还有节点 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 下一个节点 unpark 如果成功获取读锁 // 并且下下个节点还是 shared, 继续 doReleaseShared unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` (7)千辛万苦,终于等到线程1释放锁,修改exclusiveOwnerThread为null,如下所示 ![54.ReentrantReadWriteLock实现原理详解09.jpg](https://lilinchao.com/usr/uploads/2022/11/172814837.jpg) (8)释放锁成功后调用`unparkSuccessor()`唤醒等待队列中可能的阻塞线程,此时,线程2在`doAcquireShared()`内`parkAndCheckInterrupt()`处恢复运行。 接着再执行一次循环,调用`tryAcquireShared()`让读状态计数加一。 ```java static final class NonfairSync extends Sync { public void unlock() { sync.release(1); } public final boolean release(int arg) { // 尝试释放写锁成功 if (tryRelease(arg)) { // 唤醒等待队列中的线程 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 因为可重入的原因, 写锁计数为 0, 才算释放成功 boolean free = exclusiveCount(nextc) == 0; if (free) { setExclusiveOwnerThread(null); } setState(nextc); return free; } } ``` 因此,此时state值为`1_0` ![54.ReentrantReadWriteLock实现原理详解10.jpg](https://lilinchao.com/usr/uploads/2022/11/1264987046.jpg) (9)线程2加锁成功后,调用`setHeadAndPropagate()`将本来所在的节点设置为头节点。 然后在方法内检查下一个节点的状态是否是SHARED,如果是,则调用`doReleaseShared()`将头节点的waitStatus修改为-1,并唤醒后继线程3,它会在`parkAndCheckInterrupt()`处恢复运行。 执行上面相同的操作,修改state为`2_0`,并将原本的节点设置为头节点,waitStatus修改为-1 ![54.ReentrantReadWriteLock实现原理详解11.jpg](https://lilinchao.com/usr/uploads/2022/11/2452502655.jpg) (10)不久之后,线程2和线程3相继执行结束,调用`releaseShared(1)`修改读状态计数值为0。 ```java static final class NonfairSync extends Sync { public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { // ... 省略不重要的代码 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程 // 计数为 0 才是真正释放 return nextc == 0; } } } private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0 // 防止 unparkSuccessor 被多次执行 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 如果已经是 0 了,改为 -3,用来解决传播性 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } } ``` (11)然后调用`doReleaseShared()`将头节点waitStatus修改为0,唤醒线程4。 ![54.ReentrantReadWriteLock实现原理详解12.jpg](https://lilinchao.com/usr/uploads/2022/11/2049091830.jpg) (12)线程4同样在`parkAndCheckInterrupt()`处恢复运行,并且发现自己是哨兵节点的后继,而且此时没有其他的线程竞争锁,则加锁成功,修改state为`0_1`。 ![54.ReentrantReadWriteLock实现原理详解13.jpg](https://lilinchao.com/usr/uploads/2022/11/2764159732.jpg) ### 三、锁降级 锁降级是指把持住写锁,再获取读锁,随后释放写锁的过程。 经过锁降级之后,写锁就会被降级为读锁。 之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程对于数据的更新对当前线程不可见。 如果当前线程先获取读锁,那么想要获取写锁的线程就都会被阻塞,只有当前线程成功释放了写锁,其他竞争写锁的线程才能成功获取到。 *附参考原文链接地址* *https://blog.csdn.net/Forlogen/article/details/108434798*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2650.html
上一篇
53.ReentrantReadWriteLock应用之缓存
下一篇
55.StampedLock介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
队列
序列化和反序列化
Spark Streaming
Kafka
nginx
VUE
Spark RDD
Scala
机器学习
查找
Azkaban
Hbase
工具
栈
正则表达式
Shiro
Elastisearch
SQL练习题
Eclipse
Stream流
Jenkins
BurpSuite
Redis
并发线程
SpringCloudAlibaba
CentOS
Git
数据结构
FastDFS
Spark Core
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭