李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
50.AQS实现原理介绍
Leefs
2022-11-26 PM
1492℃
0条
[TOC] ### 前言 Java.util.concurrent(J.U.C)大大提高了并发性能,AQS是JUC的核心,**是阻塞式锁相关的同步器工具的框架,是一个主要用来构建锁和同步器的抽象类**。 ### 一、AQS介绍 AQS 全程为 AbstractQueuedSynchronizer,它提供了一个 FIFO 队列,可以看成是一个用来实现同步锁及其它涉及到同步功能的核心组件,常见的有,ReentrantLock、CountDownLatch等 AQS是一个抽象类,主要通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提高自定义的同步组件。 **AQS 核心思想** - 如果被请求的共享**资源空闲**,则将**当前请求资源的线程设置为有效的工作线程**,并且将共享资源设置为锁定状态。 - 如果被请求的共享**资源被占用**,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将**暂时获取不到锁的线程加入到队列中**。 **AQS的功能** 从使用层面来说,AQS 的功能分为两种:独占和共享。 + **独占锁**:每次只有一个线程持有锁,如:ReentrantLock 就是以独占方式实现的互斥锁 + **共享锁**:允许多个线程同时获取锁 ,并发访问共享资源 , 如:ReentrantReadWriteLock **AQS 的内部实现** AQS依赖内部的一个FIFO双向队列来完成同步状态的管理,当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node对象)并将其加入AQS中,同时会阻塞当前线程,当锁被释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。 AQS中有一个头(head)节点和一个尾(tail)节点,中间每个节点(Node)都有一个prev和next指针指向前一个节点和后一个节点,如下图:  **Node对象组成** AQS中每一个节点就是一个Node对象,并且通过节点中的状态等信息来控制队列,Node对象是AbstractQueuedSynchronizer对象中的一个静态内部类,下面就是Node对象的源码: ```java static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1;//表示当前线程状态是取消的 static final int SIGNAL = -1;//表示当前线程正在等待锁 static final int CONDITION = -2;//Condition队列有使用到,暂时用不到 static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暂时用不到 volatile int waitStatus;//节点中线程的状态,默认为0 volatile Node prev;//当前节点的前一个节点 volatile Node next;//当前节点的后一个节点 volatile Thread thread;//当前节点封装的线程信息 Node nextWaiter;//Condition队列中的关系,暂时用不到 final boolean isShared() {//暂时用不到 return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException {//获取当前节点的上一个节点 Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) {//构造一个节点:addWaiter方法中会使用,此时waitStatus默认等于0 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { //构造一个节点:Condition中会使用 this.waitStatus = waitStatus; this.thread = thread; } } ``` **内部状态说明** | 状态 | 说明 | | --------------------------------- | ------------------------------------------------------------ | | `static final int CANCELLED = 1` | 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 | | `static final int SIGNAL = -1` | 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 | | `static final int CONDITION = -2` | 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 | | `static final int PROPAGATE = -3` | 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 | | `volatile int waitStatus` | 新结点入队时的默认状态0 | **负值表示结点处于有效等待状态,而正值表示结点已被取消。**所以源码中很多地方用>0、<0来判断结点的状态是否正常。 > 注意:Node对象并不是AQS才会使用,Condition队列以及其他工具中也会使用,所以有些状态和方法在这里是暂时用不上的本文就不会过多关注。 ### 二、CLH队列锁 CLH队列锁是一种基于链表的可扩展,高性能,公平的自旋锁,申请资源的线程仅仅在本地变量上自旋,它不断轮询前驱节点的状态,假设发现前驱释放了锁,就结束自旋。 当一个线程需要获取锁时,先创建一个 QNode ,将其中的 locked 设置 true 表示需要获取锁, myPred 表示对其前驱结点的引用。  **其完整流程如下所示**: 1. 假设线程A要获取资源,其先使自己成为队列的尾部,同时获取一个指向其前驱结点的引用 `myPred`,并不断在父节点引用上自旋判断。  2.当另一个线程B同样也需要获取锁时,上述的过程同样也要来一遍,如下所示 **(QNode-B)**:  3.当某个线程要释放锁时,就将当前节点的 `locked` 设置为 `false` 。 其后续节点因为不断在自旋,当判断到其前序节点 `locked` 为 `false` ,就表明其前序节点已经释放锁,其自身就可以获取到锁,并且释放当前前序节点引用,以便GC回收。  整个过程如上图所示,CLH队列锁的优点是空间复杂度低,如果有n个线程,L个锁,每个线程每次都只获取一个锁,那么其需要的存储空间 O(L+n) ,n个线程有n个 node,L 个锁有L个tail . AQS 中的 CLH队列锁实现方式与上述方式相比是一种变体的实现,相比普通 CLH队列锁 ,AQS 中的实现方式做了相关的优化,比如不会不断重试,而会在重试相关次数后将线程阻塞。等待之后的唤醒。 ### 三、AQS相关方法 #### 模板方法 在我们实现自定义的同步组件时,将会调用同步器提供的模板方法。 **相关方法如下**:  > 上述模板方法同步器提供的模板方法分为3类: > > 1. 独占式获取与释放同步状态 > 2. 共享式获取与释放 > 3. 同步状态和查询同步队列中的等待线程情况。 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 #### 可重写的方法  #### 访问或修改同步状态的方法 在自定义的同步组件框架中,AQS 抽象方法在实现过程中免不了要对同步状态 state 进行更改,这时就需要同步器提供的3个方法来进行操作,因为他们能够保证状态的改变是安全的: + **getState()**:获取当前同步状态 + **setState(newState:Int)**:设置当前同步状态 + **compareAndSetState(expect:Int,update:Int)**:使用 CAS 设置当前状态,该方法能保证状态设置的原子性。 ### 四、自定义锁实现不可重入锁 **自定义同步器** ```java //独占锁,同步器类 class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0,1)) { //加锁成功,并设置owner为当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { setExclusiveOwnerThread(null); setState(0); //state属性是volatile的,它具有写屏障可以保证它之前的操作对其他线程可见 return true; } //是否持有独占 @Override protected boolean isHeldExclusively() { return getState() == 1; } protected Condition newCondition(){ return new ConditionObject(); } } ``` **自定义锁** 有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁 ```java final class MyLock implements Lock { private MySync sync = new MySync(); //加锁(不成功会进入等待队列等待) @Override public void lock() { sync.acquire(1); } //加锁,可打断 @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } //尝试加锁(一次) @Override public boolean tryLock() { return sync.tryAcquire(1); } //有时限的尝试加锁 @Override public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } //解锁 @Override public void unlock() { sync.release(1); } //新建环境变量 @NotNull @Override public Condition newCondition() { return sync.newCondition(); } } ``` **测试代码** ```java public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try { log.debug("locking..."); sleep(1); } finally { log.debug("unlocking..."); lock.unlock(); } },"t1").start(); new Thread(() -> { lock.lock(); try { log.debug("locking..."); } finally { log.debug("unlocking..."); lock.unlock(); } },"t2").start(); } ``` **运行结果** ``` 14:37:31.020 [t1] DEBUG c.ThreadPoolDemo10 - locking... 14:37:32.024 [t1] DEBUG c.ThreadPoolDemo10 - unlocking... 14:37:32.024 [t2] DEBUG c.ThreadPoolDemo10 - locking... 14:37:32.024 [t2] DEBUG c.ThreadPoolDemo10 - unlocking... ``` 从结果可以看出,成功使用MyLock对t1和t2加了锁,t2在t1解锁之后才继续执行。 同时实现的是不可重入锁,也就是同一个线程如果重复加锁也会发生阻塞,测试代码如下: ```java public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(()->{ lock.lock(); log.debug("locking..."); lock.lock(); log.debug("relock..."); log.debug("unlocking..."); lock.unlock(); }, "t1").start(); } ``` **运行结果** ``` 14:43:21.980 [t1] DEBUG c.ThreadPoolDemo10 - locking... ``` 可以看到relock并没有被打印出来,也就是第二次`lock.lock();`并没加锁成功,而是同样被阻塞了。 *附参考文章链接* *https://developer.aliyun.com/article/1083799* *https://lichong.work/archives/*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2610.html
上一篇
49.Fork&Join框架介绍
下一篇
51.ReentrantLock原理
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
6
标签云
工具
Tomcat
JVM
查找
Yarn
数据结构
FileBeat
Spark Core
Scala
Java
Ubuntu
SQL练习题
线程池
Jquery
容器深入研究
JavaScript
Docker
Git
Kafka
排序
递归
Java阻塞队列
数学
散列
MySQL
稀疏数组
微服务
Golang基础
Spark Streaming
Elasticsearch
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭