李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
51.ReentrantLock原理
Leefs
2022-11-26 PM
546℃
0条
[TOC] ### 前言 **ReentrantLock关系类图**: ![51.ReentrantLock原理01.jpg](https://lilinchao.com/usr/uploads/2022/11/306958388.jpg) ReentrabtLock实现了LOCK接口,里面维护了一个sync同步器,Sync是一个抽象类,有两种实现FairSync和NonfairSync分别对应着公平锁和非公平锁两种实现。 ### 一、非公平锁实现原理 #### 1.1 示例 **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.ReentrantLock; import static com.lilinchao.concurrent.utils.Sleeper.sleep; /** * Created by lilinchao * Date 2022/11/26 * Description ReentrantLock非公平锁 */ @Slf4j(topic = "c.ReentrantLockUnfairLock") public class ReentrantLockUnfairLock { public static void main(String[] args) { // 无参构造函数,默认创建非公平锁模式 ReentrantLock lock = new ReentrantLock(); for (int i = 0; i < 10; i++){ final int num = i; new Thread(() -> { lock.lock(); try { log.debug("线程 {} 获取到锁",num); sleep(1); } finally { // finally中解锁 lock.unlock(); log.debug("线程 {} 释放锁成功",num); } }).start(); } } } ``` **运行结果** ``` 20:42:23.238 [Thread-0] DEBUG c.ReentrantLockUnfairLock - 线程 0 获取到锁 20:42:24.244 [Thread-1] DEBUG c.ReentrantLockUnfairLock - 线程 1 获取到锁 20:42:24.244 [Thread-0] DEBUG c.ReentrantLockUnfairLock - 线程 0 释放锁成功 20:42:25.244 [Thread-1] DEBUG c.ReentrantLockUnfairLock - 线程 1 释放锁成功 20:42:25.244 [Thread-2] DEBUG c.ReentrantLockUnfairLock - 线程 2 获取到锁 20:42:26.245 [Thread-2] DEBUG c.ReentrantLockUnfairLock - 线程 2 释放锁成功 20:42:26.245 [Thread-4] DEBUG c.ReentrantLockUnfairLock - 线程 4 获取到锁 20:42:27.246 [Thread-4] DEBUG c.ReentrantLockUnfairLock - 线程 4 释放锁成功 20:42:27.246 [Thread-3] DEBUG c.ReentrantLockUnfairLock - 线程 3 获取到锁 20:42:28.247 [Thread-3] DEBUG c.ReentrantLockUnfairLock - 线程 3 释放锁成功 20:42:28.247 [Thread-5] DEBUG c.ReentrantLockUnfairLock - 线程 5 获取到锁 20:42:29.248 [Thread-5] DEBUG c.ReentrantLockUnfairLock - 线程 5 释放锁成功 ............... ``` **结果说明** - 默认构造函数创建的是非公平锁 - 从运行结果可以看出,线程4优先于线程3获取到锁 #### 1.2 加锁原理 **1. 构造函数创建锁对象** ```java public ReentrantLock() { sync = new NonfairSync(); } ``` - 默认构造函数,创建了NonfairSync,非公平锁同步器,它是继承自AQS。 **2. 第一个线程加锁时,不存在竞争** ```java // ReentrantLock.NonfairSync#lock final void lock() { // 用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示【获得了独占锁】 if (compareAndSetState(0, 1)) // 设置当前线程为独占线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//失败进入 } ``` ![51.ReentrantLock原理02.jpg](https://lilinchao.com/usr/uploads/2022/11/3811120731.jpg) - cas修改state从0到1,获取锁 - 设置锁对象的线程为当前线程 **3. 第二个线程申请加锁时,出现锁竞争** ![51.ReentrantLock原理03.jpg](https://lilinchao.com/usr/uploads/2022/11/389117185.jpg) - Thread-1 执行,CAS 尝试将 state 由 0 改为 1,结果失败(第一次),进入 acquire 逻辑 ```java // AbstractQueuedSynchronizer#acquire public final void acquire(int arg) { // tryAcquire 尝试获取锁失败时, 会调用 addWaiter 将当前线程封装成node入队,acquireQueued 阻塞当前线程, // acquireQueued 返回 true 表示挂起过程中线程被中断唤醒过,false 表示未被中断过 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果线程被中断了逻辑来到这,完成一次真正的打断效果 selfInterrupt(); } ``` - 调用tryAcquire方法尝试获取锁,这里由子类NonfairSync实现。 - 如果tryAcquire获取锁失败,通过addWaiter方法将当前线程封装成节点,入队 - acquireQueued方法会将当前线程阻塞 ```java // ReentrantLock.NonfairSync#tryAcquire protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 抢占成功返回 true,抢占失败返回 false final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // state 值 int c = getState(); // 条件成立说明当前处于【无锁状态】 if (c == 0) { //如果还没有获得锁,尝试用cas获得,这里体现非公平性: 不去检查 AQS 队列是否有阻塞线程直接获取锁 if (compareAndSetState(0, acquires)) { // 获取锁成功设置当前线程为独占锁线程。 setExclusiveOwnerThread(current); return true; } } // 这部分是重入锁的原理 // 如果已经有线程获得了锁, 独占锁线程还是当前线程, 表示【发生了锁重入】 else if (current == getExclusiveOwnerThread()) { // 更新锁重入的值 int nextc = c + acquires; // 越界判断,当重入的深度很深时,会导致 nextc < 0,int值达到最大之后再 + 1 变负数 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新 state 的值,这里不使用 cas 是因为当前线程正在持有锁,所以这里的操作相当于在一个管程内 setState(nextc); return true; } // 获取失败 return false; } ``` - 正是这个方法体现了非公平锁,在nonfairTryAcquire如果发现state=0,无锁的情况,它会忽略队列中等待的线程,优先获取一次锁,相当于"插队"。 **4. 第二个线程tryAcquire申请锁失败,通过执行addWaiter方法加入到队列中** ![51.ReentrantLock原理04.jpg](https://lilinchao.com/usr/uploads/2022/11/1716706491.jpg) - 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 - Node 的创建是懒惰的,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程 ```java // AbstractQueuedSynchronizer#addWaiter,返回当前线程的 node 节点 private Node addWaiter(Node mode) { // 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 快速入队,如果 tail 不为 null,说明存在阻塞队列 if (pred != null) { // 将当前节点的前驱节点指向 尾节点 node.prev = pred; // 通过 cas 将 Node 对象加入 AQS 队列,成为尾节点,【尾插法】 if (compareAndSetTail(pred, node)) { pred.next = node;// 双向链表 return node; } } // 初始时队列为空,或者 CAS 失败进入这里 enq(node); return node; } ``` ```java // AbstractQueuedSynchronizer#enq private Node enq(final Node node) { // 自旋入队,必须入队成功才结束循环 for (;;) { Node t = tail; // 说明当前锁被占用,且当前线程可能是【第一个获取锁失败】的线程,【还没有建立队列】 if (t == null) { // 设置一个【哑元节点】,头尾指针都指向该节点 if (compareAndSetHead(new Node())) tail = head; } else { // 自旋到这,普通入队方式,首先赋值尾节点的前驱节点【尾插法】 node.prev = t; // 【在设置完尾节点后,才更新的原始尾节点的后继节点,所以此时从前往后遍历会丢失尾节点】 if (compareAndSetTail(t, node)) { //【此时 t.next = null,并且这里已经 CAS 结束,线程并不是安全的】 t.next = node; return t; // 返回当前 node 的前驱节点 } } } } ``` **5. 第二个线程加入队列后,现在要做的是想办法阻塞线程,不让它执行,就看acquireQueued的了** ![51.ReentrantLock原理05.jpg](https://lilinchao.com/usr/uploads/2022/11/20249822.jpg) - 图中黄色三角表示该 Node 的 waitStatus 状态,0 为默认正常状态, 但是-1状态表示它肩负唤醒下一个节点的线程。 - 灰色表示线程阻塞了。 ```java inal boolean acquireQueued(final Node node, int arg) { // true 表示当前线程抢占锁失败,false 表示成功 boolean failed = true; try { // 中断标记,表示当前线程是否被中断 boolean interrupted = false; for (;;) { // 获得当前线程节点的前驱节点 final Node p = node.predecessor(); // 前驱节点是 head, FIFO 队列的特性表示轮到当前线程可以去获取锁 if (p == head && tryAcquire(arg)) { // 获取成功, 设置当前线程自己的 node 为 head setHead(node); p.next = null; // help GC // 表示抢占锁成功 failed = false; // 返回当前线程是否被中断 return interrupted; } // 判断是否应当 park,返回 false 后需要新一轮的循环,返回 true 进入条件二阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 条件二返回结果是当前线程是否被打断,没有被打断返回 false 不进入这里的逻辑 // 【就算被打断了,也会继续循环,并不会返回】 interrupted = true; } } finally { // 【可打断模式下才会进入该逻辑】 if (failed) cancelAcquire(node); } } ``` - acquireQueued 会在一个自旋中不断尝试获得锁,失败后进入 park 阻塞 - 如果当前线程是在 head 节点后,也就是第一个节点,又会直接多一次机会 tryAcquire 尝试获取锁,如果还是被占用,会返回失败 ```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 表示前置节点是个可以唤醒当前节点的节点,返回 true if (ws == Node.SIGNAL) return true; // 前置节点的状态处于取消状态,需要【删除前面所有取消的节点】, 返回到外层循环重试 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 获取到非取消的节点,连接上当前节点 pred.next = node; // 默认情况下 node 的 waitStatus 是 0,进入这里的逻辑 } else { // 【设置上一个节点状态为 Node.SIGNAL】,返回外层循环重试 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回不应该 park,再次尝试一次 return false; } ``` + shouldParkAfterFailedAcquire发现前驱节点等待状态是-1, 返回true,表示需要阻塞。 + shouldParkAfterFailedAcquire发现前驱节点等待状态大于0,说明是无效节点,会进行清理。 + shouldParkAfterFailedAcquire发现前驱节点等待状态等于0,将前驱 node 的 waitStatus 改为 -1,返回 false。 ```java private final boolean parkAndCheckInterrupt() { // 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效 LockSupport.park(this); // 判断当前线程是否被打断,清除打断标记 return Thread.interrupted(); } ``` - 通过不断自旋尝试获取锁,最终前驱节点的等待状态为-1的时候,进行阻塞当前线程。 - 通过调用LockSupport.park方法进行阻塞。 **6. 多个线程尝试获取锁,竞争失败后,最终形成下面的图形** ![51.ReentrantLock原理06.jpg](https://lilinchao.com/usr/uploads/2022/11/1293538742.jpg) #### 1.3 释放锁原理 **1. 第一个线程通过调用unlock方法释放锁** ```java public void unlock() { sync.release(1); } ``` + 最终调用的是同步器的release方法 ![51.ReentrantLock原理07.jpg](https://lilinchao.com/usr/uploads/2022/11/1131184844.jpg) - 设置锁定的线程exclusiveOwnerThread为null - 设置锁的state为0 ```java // AbstractQueuedSynchronizer#release public final boolean release(int arg) { // 尝试释放锁,tryRelease 返回 true 表示当前线程已经【完全释放锁,重入的释放了】 if (tryRelease(arg)) { // 队列头节点 Node h = head; // 头节点什么时候是空?没有发生锁竞争,没有竞争线程创建哑元节点 // 条件成立说明阻塞队列有等待线程,需要唤醒 head 节点后面的线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` - 进入 tryRelease,设置 exclusiveOwnerThread 为 null,state = 0 - 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor, 唤醒阻塞的线程 **2. 线程一通过调用tryRelease方法释放锁,该类的实现是在子类中** ```java // ReentrantLock.Sync#tryRelease protected final boolean tryRelease(int releases) { // 减去释放的值,可能重入 int c = getState() - releases; // 如果当前线程不是持有锁的线程直接报错 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否已经完全释放锁 boolean free = false; // 支持锁重入, 只有 state 减为 0, 才完全释放锁成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 当前线程就是持有锁线程,所以可以直接更新锁,不需要使用 CAS setState(c); return free; } ``` - 修改锁资源的state **3. 唤醒队列中第一个线程Thread1** ```java private void unparkSuccessor(Node node) { // 当前节点的状态 int ws = node.waitStatus; if (ws < 0) // 【尝试重置状态为 0】,因为当前节点要完成对后续节点的唤醒任务了,不需要 -1 了 compareAndSetWaitStatus(node, ws, 0); // 找到需要 unpark 的节点,当前节点的下一个 Node s = node.next; // 已取消的节点不能唤醒,需要找到距离头节点最近的非取消的节点 if (s == null || s.waitStatus > 0) { s = null; // AQS 队列【从后至前】找需要 unpark 的节点,直到 t == 当前的 node 为止,找不到就不唤醒了 for (Node t = tail; t != null && t != node; t = t.prev) // 说明当前线程状态需要被唤醒 if (t.waitStatus <= 0) // 置换引用 s = t; } // 【找到合适的可以被唤醒的 node,则唤醒线程】 if (s != null) LockSupport.unpark(s.thread); } ``` - 从后往前找到队列中距离 head 最近的一个没取消的 Node,unpark 恢复其运行,本例中即为 Thread-1 - thread1活了,开始重新去获取锁,也就是前面acquireQueued中的流程 **为什么这里查找唤醒的节点是从后往前,而不是从前往后呢?** enq 方法中,节点是尾插法,首先赋值的是尾节点的前驱节点,此时前驱节点的 next 并没有指向尾节点,从前遍历会丢失尾节点。 **4. Thread1恢复执行流程** ![51.ReentrantLock原理08.jpg](https://lilinchao.com/usr/uploads/2022/11/3696786460.jpg) + 唤醒的Thread-1 线程会从 park 位置开始执行,如果加锁成功(没有竞争),设置了exclusiveOwnerThread为Thread-1, state=1 + head 指向刚刚 Thread-1 所在的 Node,该 Node 会清空 Thread + 原本的 head 因为从链表断开,而可被垃圾回收 **5. 另一种可能,突然来了Thread-4来竞争,体现非公平锁** 如果这时有其它线程来竞争锁,例如这时有 Thread-4 来了并抢占了锁,很有可能抢占成功。 ![51.ReentrantLock原理09.jpg](https://lilinchao.com/usr/uploads/2022/11/2710460771.jpg) - Thread-4 被设置为 exclusiveOwnerThread,state = 1 - Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞 ### 二、可重入原理 同一个线程, 锁重入, 会对state进行自增;释放锁的时候, state进行自减。当state自减为0的时候,此时线程才会将锁释放成功, 才会进一步去唤醒其他线程来竞争锁。 ```java static final class NonfairSync extends Sync { // ... // Sync 继承过来的方法, 方便阅读, 放在此处 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } ``` ### 三、可打断原理 + **不可打断模式** 在此模式下,即使它被打断,仍会驻留在 AQS 队列中,并将打断信号存储在一个interrupt变量中。 一直要等到获得锁后方能得知自己被打断了,并且调用`selfInterrupt`方法打断自己。 **不可打断源码** ```java // Sync 继承自 AQS static final class NonfairSync extends Sync { // ... private final boolean parkAndCheckInterrupt() { // 如果打断标记已经是 true, 则 park 会失效 // 被park阻塞的线程, 可以被别的线程调用它的interrupt方法打断该park阻塞 LockSupport.park(this); // interrupted 会清除打断标记; 下次park仍然可以阻塞 return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; // 还是需要获得锁后, 才能返回打断状态 return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { // 如果是因为 interrupt 被唤醒, 返回打断状态为 true interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { // 如果打断状态为 true selfInterrupt(); } } //响应打断标记,打断自己 static void selfInterrupt() { // 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错 Thread.currentThread().interrupt(); } } ``` + **可打断模式** 此模式下即使线程在等待队列中等待,一旦被打断,就会立刻抛出打断异常。 **可打断源码** ```java static final class NonfairSync extends Sync { public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果没有获得到锁, 进入 ㈠ if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // ㈠ 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 这里一直在判断是否被打断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { // 在 park 过程中如果被 interrupt 会进入这里 // 这时候抛出异常, 而不会再次进入 for (;;) throw new InterruptedException(); } } } finally { if (failed) cancelAcquire(node); } } } ``` ### 四、公平锁实现原理 **公平锁是如何实现的**: - 简而言之,公平与非公平的区别在于,公平锁中的tryAcquire方法被重写了 - 新来的线程即便得知了锁的state为0,也要先判断等待队列中是否还有线程等待,只有当队列没有线程等待时,才获得锁。 **公平锁的源码** ```java static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 上锁 final void lock() { acquire(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } // 与非公平锁主要区别在于 tryAcquire 方法的实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 这里就是主要区别!!! // 公平锁:先检查 AQS 队列中是否有前驱节点, 没有才去竞争 // 非公平锁:不会检查,直接竞争,可能就会导致当前线程和AQS队列中的线程进行竞争 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // h != t 时表示队列中有 Node return h != t && ( // (s = h.next) == null 表示队列中还有没有老二 (s = h.next) == null || // 或者队列中老二线程不是此线程 s.thread != Thread.currentThread() ); } } ``` ### 五、条件变量实现原理 每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject #### 5.1 await 流程 **1. 线程0(Thread-0)一开始获取锁,exclusiveOwnerThread字段是Thread-0, 如下图中的深蓝色节点** ![51.ReentrantLock原理10.jpg](https://lilinchao.com/usr/uploads/2022/11/3631463801.jpg) **2. Thread-0调用await方法,Thread-0封装成Node进入ConditionObject的队列,因为此时只有一个节点,所有firstWaiter和lastWaiter都指向Thread-0,会释放锁资源,NofairSync中的state会变成0,同时exclusiveOwnerThread设置为null** ![51.ReentrantLock原理11.jpg](https://lilinchao.com/usr/uploads/2022/11/93731636.jpg) **3. 线程1(Thread-1)被唤醒,重新获取锁,如下图的深蓝色节点所示** ![51.ReentrantLock原理12.jpg](https://lilinchao.com/usr/uploads/2022/11/2947861580.jpg) **4. Thread-0被park阻塞,如下图灰色节点所示** ![51.ReentrantLock原理13.jpg](https://lilinchao.com/usr/uploads/2022/11/1043304559.jpg) ##### 源码如下 - 下面是await()方法的整体流程,其中`LockSupport.park(this)`进行阻塞当前线程,后续唤醒,也会在这个程序点恢复执行。 ```java public final void await() throws InterruptedException { // 判断当前线程是否是中断状态,是就直接给个中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 将调用 await 的线程包装成 Node,添加到条件队列并返回 Node node = addConditionWaiter(); // 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】 int savedState = fullyRelease(node); // 设置打断模式为没有被打断,状态码为 0 int interruptMode = 0; // 如果该节点还没有转移至 AQS 阻塞队列, park 阻塞,等待进入阻塞队列 while (!isOnSyncQueue(node)) { // 阻塞当前线程,待会 LockSupport.park(this); // 如果被打断,退出等待队列,对应的 node 【也会被迁移到阻塞队列】尾部,状态设置为 0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 逻辑到这说明当前线程退出等待队列,进入【阻塞队列】 // 尝试枪锁,释放了多少锁就【重新获取多少锁】,获取锁成功判断打断模式 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // node 在条件队列时 如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设 nextWaiter = null if (node.nextWaiter != null) // 清理条件队列内所有已取消的 Node unlinkCancelledWaiters(); // 条件成立说明挂起期间发生过中断 if (interruptMode != 0) // 应用打断模式 reportInterruptAfterWait(interruptMode); } ``` - 将线程封装成Node, 加入到ConditionObject队列尾部,此时节点的等待状态时-2。 ```java private Node addConditionWaiter() { // 获取当前条件队列的尾节点的引用,保存到局部变量 t 中 Node t = lastWaiter; // 当前队列中不是空,并且节点的状态不是 CONDITION(-2),说明当前节点发生了中断 if (t != null && t.waitStatus != Node.CONDITION) { // 清理条件队列内所有已取消的 Node unlinkCancelledWaiters(); // 清理完成重新获取 尾节点 的引用 t = lastWaiter; } // 创建一个关联当前线程的新 node, 设置状态为 CONDITION(-2),添加至队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; // 空队列直接放在队首【不用CAS因为执行线程是持锁线程,并发安全】 else t.nextWaiter = node; // 非空队列队尾追加 lastWaiter = node; // 更新队尾的引用 return node; } ``` + 清理条件队列中的cancel类型的节点,比如中断、超时等会导致节点转换为Cancel ```java // 清理条件队列内所有已取消(不是CONDITION)的 node,【链表删除的逻辑】 private void unlinkCancelledWaiters() { // 从头节点开始遍历【FIFO】 Node t = firstWaiter; // 指向正常的 CONDITION 节点 Node trail = null; // 等待队列不空 while (t != null) { // 获取当前节点的后继节点 Node next = t.nextWaiter; // 判断 t 节点是不是 CONDITION 节点,条件队列内不是 CONDITION 就不是正常的 if (t.waitStatus != Node.CONDITION) { // 不是正常节点,需要 t 与下一个节点断开 t.nextWaiter = null; // 条件成立说明遍历到的节点还未碰到过正常节点 if (trail == null) // 更新 firstWaiter 指针为下个节点 firstWaiter = next; else // 让上一个正常节点指向 当前取消节点的 下一个节点,【删除非正常的节点】 trail.nextWaiter = next; // t 是尾节点了,更新 lastWaiter 指向最后一个正常节点 if (next == null) lastWaiter = trail; } else { // trail 指向的是正常节点 trail = t; } // 把 t.next 赋值给 t,循环遍历 t = next; } } ``` + fullyRelease方法将r让Thread-0释放锁, 这个时候Thread-1就会去竞争锁 ```java // 线程可能重入,需要将 state 全部释放 final int fullyRelease(Node node) { // 完全释放锁是否成功,false 代表成功 boolean failed = true; try { // 获取当前线程所持有的 state 值总数 int savedState = getState(); // release -> tryRelease 解锁重入锁 if (release(savedState)) { // 释放成功 failed = false; // 返回解锁的深度 return savedState; } else { // 解锁失败抛出异常 throw new IllegalMonitorStateException(); } } finally { // 没有释放成功,将当前 node 设置为取消状态 if (failed) node.waitStatus = Node.CANCELLED; } } ``` + 判断节点是否在AQS阻塞对列中,不在条件对列中 ```java final boolean isOnSyncQueue(Node node) { // node 的状态是 CONDITION,signal 方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它 node,因为条件队列的 next 指针为 null if (node.next != null) return true; // 说明【可能在阻塞队列,但是是尾节点】 // 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 false return findNodeFromTail(node); } ``` #### 5.2 signal过程 **1. 假设 Thread-1 要来唤醒 Thread-0** ![51.ReentrantLock原理14.jpg](https://lilinchao.com/usr/uploads/2022/11/4072558641.jpg) **2. 进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node** ![51.ReentrantLock原理15.jpg](https://lilinchao.com/usr/uploads/2022/11/2916267225.jpg) **3. 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1** ![51.ReentrantLock原理16.jpg](https://lilinchao.com/usr/uploads/2022/11/241914807.jpg) ##### 源码如下 - signal()方法是唤醒的入口方法 ```java public final void signal() { // 判断调用 signal 方法的线程是否是独占锁持有线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列中第一个 Node Node first = firstWaiter; // 不为空就将第该节点【迁移到阻塞队列】 if (first != null) doSignal(first); } ``` - 调用doSignal()方法唤醒节点 ```java // 唤醒 - 【将没取消的第一个节点转移至 AQS 队列尾部】 private void doSignal(Node first) { do { // 成立说明当前节点的下一个节点是 null,当前节点是尾节点了,队列中只有当前一个节点了 if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // signalAll() 会调用这个函数,唤醒所有的节点 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; // 唤醒所有的节点,都放到阻塞队列中 } while (first != null); } ``` - 调用transferForSignal()方法,先将节点的 waitStatus 改为 0,然后加入 AQS 阻塞队列尾部,将 Thread-3 的 waitStatus 改为 -1。 ```java // 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功 final boolean transferForSignal(Node node) { // CAS 修改当前节点的状态,修改为 0,因为当前节点马上要迁移到阻塞队列了 // 如果状态已经不是 CONDITION, 说明线程被取消(await 释放全部锁失败)或者被中断(可打断 cancelAcquire) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 返回函数调用处继续寻找下一个节点 return false; // 【先改状态,再进行迁移】 // 将当前 node 入阻塞队列,p 是当前节点在阻塞队列的【前驱节点】 Node p = enq(node); int ws = p.waitStatus; // 如果前驱节点被取消或者不能设置状态为 Node.SIGNAL,就 unpark 取消当前节点线程的阻塞状态, // 让 thread-0 线程竞争锁,重新同步状态 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ``` *附参考原文链接* *《黑马程序员之并发编程》*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2627.html
上一篇
50.AQS实现原理介绍
下一篇
52.ReentrantReadWriteLock介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
栈
Filter
Shiro
Scala
字符串
Map
算法
Java编程思想
并发线程
递归
正则表达式
Git
MySQL
Spark Core
Spring
Typora
设计模式
Hadoop
SpringBoot
Flume
Kafka
Livy
HDFS
MyBatisX
Hive
数据结构和算法
FastDFS
Golang基础
Beego
LeetCode刷题
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞