李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
54.ReentrantReadWriteLock实现原理详解
Leefs
2022-11-28 PM
594℃
0条
[TOC] ### 一、ReentrantReadWriteLock简单流程 #### 1.1 独占获取锁简单流程 ![54.ReentrantReadWriteLock实现原理详解01.png](https://lilinchao.com/usr/uploads/2022/11/1460366531.png) **独占获取锁流程** 1. 独占锁获取(**writeLock**写锁),首先判断是否有线程获取了锁,**是否有线程获取了锁的判断**通过读写锁中通过32位int类型state可以获取,其中低16位表示读锁,高16表示写锁。 2. 有读锁:直接排队阻塞。 3. 有写锁:还需要判断写锁线程是否是自己,如果是自己就是**锁重入**了,如果不是自己说明已经有其他的线程获取锁正在执行,那么当前线程需要排队阻塞。 4. 无锁:直接获取锁,其他抢占的独占锁线程需要排队阻塞,当前线程执行完毕后释放锁通知下一个排队线程获取锁。 #### 1.2 共享获取锁简单流程 ![54.ReentrantReadWriteLock实现原理详解02.png](https://lilinchao.com/usr/uploads/2022/11/3554337000.png) **共享锁获取锁流程** 1. 独占锁获取(**readLock**读锁),首先判断是否有线程获取了锁。 2. 有读锁:当前线程发现此时**读锁状态被占用,说明有线程获取了读锁**。该线程通过cas自旋【死循环】获取到读锁为止。 3. 有写锁:还需要判断持有写锁的线程是否是自己,如果是自己而且此时获取的是读锁会获取锁成功,我们称为**锁降级**,如果不是自己说明此时有其他线程获取了写锁,那么当前线程需要排队阻塞。 4. 无锁:直接获取锁。 ### 二、读写锁原理 下面通过例子看一下ReentrantWriteReadLock是如何控制写锁和读锁的,这里只以非公平锁为例说明。 (1)当没有线程加读锁,也没有线程加写锁时,读写锁如下所示,state高位和低位计数都为0。 ![54.ReentrantReadWriteLock实现原理详解03.jpg](https://lilinchao.com/usr/uploads/2022/11/3399765807.jpg) (2)如果此时线程1想要加写锁,由于此时并没有线程加锁。因此加锁成功,修改state为`0_1`。 ```java public static class WriteLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); } } ``` `acqurie()`为AQS中的方法,如下所示: ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` `tryAcquire()`定义如下: ```java protected final boolean tryAcquire(int acquires) { // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。 // 如果计数饱和,则失败。只有在count不为零时,才可能发生这种情况。 // 否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。 // 如果是这样,请更新状态并设置所有者。 Thread current = Thread.currentThread(); int c = getState(); // 获取写锁被所有线程获取的次数 int w = exclusiveCount(c); // 如果c!=0,表示写锁已经被线程获取 if (c != 0) { // 如果读锁也被获取,而且持锁线程不是当前线程 if ( w == 0 || current != getExclusiveOwnerThread() ) { // 那么当前线程获取锁失败 return false; } // 写锁计数超过低 16 位, 报异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 写锁重入, 获得写锁成功 setState(c + acquires); return true; } // 判断写锁是否该阻塞,或尝试更改写锁计数值 if ( writerShouldBlock() || !compareAndSetState(c, c + acquires)) { // 获得锁失败 return false; } // 获得锁成功 setExclusiveOwnerThread(current); return true; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } ``` **源码分析** > 1. 获取 state , 如果 state 不为 0 则判断是否为当前线程重入获取。 > 2. state 为 0 ,则当前线程 CAS 更新 state,获取锁。 > 3. 更新成功之后绑定当前线程。 > 4. 如果失败会继续调用 AQS 的 acquireQueued,将当前阻塞放在 AQS 队列中。AQS 会不断循环,等待上一个锁释放后,尝试获得锁。 ![54.ReentrantReadWriteLock实现原理详解15.png](https://lilinchao.com/usr/uploads/2022/11/2902330806.png) **如图所示** ![54.ReentrantReadWriteLock实现原理详解04.jpg](https://lilinchao.com/usr/uploads/2022/11/1139830814.jpg) (3)接着线程2想要加读锁,根据加锁规则,此时线程2应该被阻塞。它首先进入读锁的`acquireShared(1)`流程,接着进入`tryAqcquire()`尝试获取锁。因为t1已经加了写锁,方法返回-1表示加锁失败。 > 方法的返回值有三种情况: > > - -1:失败 > - 0:成功,但是不会继续唤醒后继节点 > - 整数:成功,数值表示后续需要唤醒的节点个数 ```java public static class ReadLock implements Lock, java.io.Serializable { public void lock() { sync.acquireShared(1); } } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } ``` 其中尝试获取读锁的方法`tryAcquireShared()`定义如下: ```java protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果此时已有其他的线程持有写锁,并且持锁线程不是当前线程,则读锁获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取读状态位 int r = sharedCount(c); if ( // 读锁不该阻塞 !readerShouldBlock() && // 读锁计数小于最大值 r < MAX_COUNT && // 使用CAS尝试增加state中读锁计数 compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } static int sharedCount(int c) { return c >>> SHARED_SHIFT; } ``` > 不断尝试 for (;;) 获取读锁, 执行过程中无阻塞 ```java final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 无限循环 for (;;) { int c = getState(); // 是否有写锁 if (exclusiveCount(c) != 0) { // 有写锁,但是不是当前线程,直接返回失败 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 需要阻塞 // 没有写锁,确保没有重新获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 当前线程的读锁计数 ThreadLocal 中 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); // 计数结束,remove 掉 if (rh.count == 0) readHolds.remove(); } } // 为 0 直接失败 if (rh.count == 0) return -1; } } // 到达上限 抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS 设置读锁 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } ``` **源码分析** > 1. 首先会一直循环 > 2. 有写锁,但是不是当前线程,直接返回失败。**但是,有写锁,如果是当前线程,是会继续执行的。** > 3. 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。 当存在写锁(独占锁)时,方法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 方法,循环获取资源。doAcquireShared 方法会不断循环,尝试获取读锁,一旦获取到读锁,当前节点会立即唤醒后续节点,后续节点开始尝试获取读锁,依次传播。 ![54.ReentrantReadWriteLock实现原理详解14.png](https://lilinchao.com/usr/uploads/2022/11/990641087.png) **如图所示** ![54.ReentrantReadWriteLock实现原理详解05.jpg](https://lilinchao.com/usr/uploads/2022/11/2153152795.jpg) (4)由于线程2加锁被阻塞,此时会进入`doAcquireShared(1)`流程,也是调用`addWaiter()`添加节点,此时的节点被设置为**Node.SHARED**模式,而且线程2此时仍处于活跃状态。 ![54.ReentrantReadWriteLock实现原理详解06.jpg](https://lilinchao.com/usr/uploads/2022/11/841795380.jpg) ```java private void doAcquireShared(int arg) { // 将当前线程关联到一个 Node 对象上, 模式为共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 再一次尝试获取读锁 int r = tryAcquireShared(arg); // 成功 if (r >= 0) { // r 表示可用资源数, 在这里总是1,允许链式唤醒,继续唤醒下一个SHARED节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if ( // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL) shouldParkAfterFailedAcquire(p, node) && // park 当前线程 parkAndCheckInterrupt() ) { interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } ``` (5)此时,线程2会看他是不是哨兵节点后的第一个节点,如果是,继续调用tryAcquireShared(1)尝试加锁。 但由于线程1还没有释放写锁,加锁依然会失败。 线程2会在doAcquireShared()内的死循环中继续循环一次,将它的前驱节点的waitStatus改为-1。 然后再次调用tryAcquireShared(1)尝试加锁,如果还是失败,则在parkAndCheckInterrupt()处被阻塞,不再尝试加锁。 ![54.ReentrantReadWriteLock实现原理详解07.jpg](https://lilinchao.com/usr/uploads/2022/11/1784934313.jpg) (6)线程2加锁失败后,假设又有线程3想要加读锁,线程4想要加写锁。但是由于线程1仍然持有写锁,它们加锁也会失败,进入等待队列。 ![54.ReentrantReadWriteLock实现原理详解08.jpg](https://lilinchao.com/usr/uploads/2022/11/3083321653.jpg) ```java private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark for (;;) { Node h = head; // 队列还有节点 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 下一个节点 unpark 如果成功获取读锁 // 并且下下个节点还是 shared, 继续 doReleaseShared unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` (7)千辛万苦,终于等到线程1释放锁,修改exclusiveOwnerThread为null,如下所示 ![54.ReentrantReadWriteLock实现原理详解09.jpg](https://lilinchao.com/usr/uploads/2022/11/172814837.jpg) (8)释放锁成功后调用`unparkSuccessor()`唤醒等待队列中可能的阻塞线程,此时,线程2在`doAcquireShared()`内`parkAndCheckInterrupt()`处恢复运行。 接着再执行一次循环,调用`tryAcquireShared()`让读状态计数加一。 ```java static final class NonfairSync extends Sync { public void unlock() { sync.release(1); } public final boolean release(int arg) { // 尝试释放写锁成功 if (tryRelease(arg)) { // 唤醒等待队列中的线程 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 因为可重入的原因, 写锁计数为 0, 才算释放成功 boolean free = exclusiveCount(nextc) == 0; if (free) { setExclusiveOwnerThread(null); } setState(nextc); return free; } } ``` 因此,此时state值为`1_0` ![54.ReentrantReadWriteLock实现原理详解10.jpg](https://lilinchao.com/usr/uploads/2022/11/1264987046.jpg) (9)线程2加锁成功后,调用`setHeadAndPropagate()`将本来所在的节点设置为头节点。 然后在方法内检查下一个节点的状态是否是SHARED,如果是,则调用`doReleaseShared()`将头节点的waitStatus修改为-1,并唤醒后继线程3,它会在`parkAndCheckInterrupt()`处恢复运行。 执行上面相同的操作,修改state为`2_0`,并将原本的节点设置为头节点,waitStatus修改为-1 ![54.ReentrantReadWriteLock实现原理详解11.jpg](https://lilinchao.com/usr/uploads/2022/11/2452502655.jpg) (10)不久之后,线程2和线程3相继执行结束,调用`releaseShared(1)`修改读状态计数值为0。 ```java static final class NonfairSync extends Sync { public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { // ... 省略不重要的代码 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程 // 计数为 0 才是真正释放 return nextc == 0; } } } private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0 // 防止 unparkSuccessor 被多次执行 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 如果已经是 0 了,改为 -3,用来解决传播性 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } } ``` (11)然后调用`doReleaseShared()`将头节点waitStatus修改为0,唤醒线程4。 ![54.ReentrantReadWriteLock实现原理详解12.jpg](https://lilinchao.com/usr/uploads/2022/11/2049091830.jpg) (12)线程4同样在`parkAndCheckInterrupt()`处恢复运行,并且发现自己是哨兵节点的后继,而且此时没有其他的线程竞争锁,则加锁成功,修改state为`0_1`。 ![54.ReentrantReadWriteLock实现原理详解13.jpg](https://lilinchao.com/usr/uploads/2022/11/2764159732.jpg) ### 三、锁降级 锁降级是指把持住写锁,再获取读锁,随后释放写锁的过程。 经过锁降级之后,写锁就会被降级为读锁。 之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程对于数据的更新对当前线程不可见。 如果当前线程先获取读锁,那么想要获取写锁的线程就都会被阻塞,只有当前线程成功释放了写锁,其他竞争写锁的线程才能成功获取到。 *附参考原文链接地址* *https://blog.csdn.net/Forlogen/article/details/108434798*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2650.html
上一篇
53.ReentrantReadWriteLock应用之缓存
下一篇
55.StampedLock介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
Sentinel
Redis
队列
Kafka
Hadoop
MySQL
CentOS
容器深入研究
Spark
Thymeleaf
Java
RSA加解密
Elasticsearch
排序
Docker
Jenkins
查找
递归
MyBatisX
SpringBoot
Java编程思想
算法
持有对象
Eclipse
Azkaban
Golang基础
VUE
SpringCloudAlibaba
Java阻塞队列
nginx
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞