李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
56.Semaphore介绍
Leefs
2022-11-29 PM
659℃
0条
[TOC] ### 一、概述 Semaphore是向外分发资源的许可证,可以允许一个或多个任务同时访问资源。 + Semaphore通过构造参数来指定许可证的数量; + acquire方法阻塞式获取许可证; + release方法释放许可证。 可以将其比喻为地铁的安检,每当人流量高峰的时候,安检会先让几个人进去,然后拦住后面的人,待前面几人通过安检门后,会对后面的人用相同的方式放行。 #### 特点 + Semaphore(信号量)是一种计数器,用来保护一个或者多个共享资源的访问。 + 如果线程要访问一个资源就必须先获得信号量。 + 如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0。 + 当信号量使用完时,必须释放。 ### 二、常用方法 **Semaphore常用方法说明**: | 方法 | 说明 | | ----------------------------------------- | ------------------------------------------------------------ | | `acquire()` | 从信号量获取一个许可,如果无可用许可前将一直阻塞等待 | | `acquire(int permits)` | 获取指定数目的许可,如果无可用许可前也将会一直阻塞等待 | | `acquireUninterruptibly()` | 获取一个许可,在获取到许可之前线程一直处于阻塞状态(忽略中断)。 | | `tryAcquire()` | 尝试获得许可,返回获取许可成功或失败,不阻塞线程。 | | `tryAcquire(long timeout, TimeUnit unit)` | 尝试获得许可,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。 | | `release()` | 释放一个许可,唤醒一个获取许可不成功的阻塞线程。 | | `hasQueuedThreads()` | 等待队列里是否还存在等待线程。 | | `getQueueLength()` | 获取等待队列里阻塞的线程数。 | | `drainPermits()` | 清空许可把可用许可数置为0,返回清空许可的数量。 | | `availablePermits()` | 返回可用的许可数量。 | **基本使用** 信号量,用来限制能同时访问共享资源的线程上限。 ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Semaphore; import static com.lilinchao.concurrent.utils.Sleeper.sleep; /** * Created by lilinchao * Date 2022/11/29 * Description Semaphore示例 */ @Slf4j(topic = "c.SemaphoreDemo") public class SemaphoreDemo { public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { // 3. 获取许可 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } } } ``` **运行结果** ``` 22:36:25.882 [Thread-2] DEBUG c.SemaphoreDemo - running... 22:36:25.882 [Thread-1] DEBUG c.SemaphoreDemo - running... 22:36:25.882 [Thread-0] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-2] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-1] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-0] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-3] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-4] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-5] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-3] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-4] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-5] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-6] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-7] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-8] DEBUG c.SemaphoreDemo - running... 22:36:28.887 [Thread-7] DEBUG c.SemaphoreDemo - end... 22:36:28.887 [Thread-6] DEBUG c.SemaphoreDemo - end... 22:36:28.887 [Thread-9] DEBUG c.SemaphoreDemo - running... 22:36:28.888 [Thread-8] DEBUG c.SemaphoreDemo - end... 22:36:29.887 [Thread-9] DEBUG c.SemaphoreDemo - end... ``` 从结果可以看出,每个时刻最多有三个线程在运行,Semaphore 起到效果 ### 三、Semaphore应用 - 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现) - 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的 **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicIntegerArray; /** * Created by lilinchao * Date 2022/11/29 * Description Semaphore应用 */ public class SemaphorePoolDemo { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(() -> { Connection conn = pool.borrow(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } } @Slf4j(topic = "c.Pool") class Pool { // 1. 连接池大小 private final int poolSize; // 2. 连接对象数组 private Connection[] connections; // 3. 连接状态数组 0 表示空闲, 1 表示繁忙 private AtomicIntegerArray states; private Semaphore semaphore; // 4. 构造方法初始化 public Pool(int poolSize) { this.poolSize = poolSize; // 让许可数与资源数一致 this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i+1)); } } // 5. 借连接 public Connection borrow() {// t1, t2, t3 // 获取许可 try { semaphore.acquire(); // 没有许可的线程,在此等待 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { // 获取空闲连接 if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } // 不会执行到这里 return null; } // 6. 归还连接 public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } } } class MockConnection implements Connection { private String name; public MockConnection(String name) { this.name = name; } @Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } @Override public Statement createStatement() throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql) throws SQLException { return null; } @Override public String nativeSQL(String sql) throws SQLException { return null; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { } @Override public boolean getAutoCommit() throws SQLException { return false; } @Override public void commit() throws SQLException { } @Override public void rollback() throws SQLException { } @Override public void close() throws SQLException { } @Override public boolean isClosed() throws SQLException { return false; } @Override public DatabaseMetaData getMetaData() throws SQLException { return null; } @Override public void setReadOnly(boolean readOnly) throws SQLException { } @Override public boolean isReadOnly() throws SQLException { return false; } @Override public void setCatalog(String catalog) throws SQLException { } @Override public String getCatalog() throws SQLException { return null; } @Override public void setTransactionIsolation(int level) throws SQLException { } @Override public int getTransactionIsolation() throws SQLException { return 0; } @Override public SQLWarning getWarnings() throws SQLException { return null; } @Override public void clearWarnings() throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public Map
> getTypeMap() throws SQLException { return null; } @Override public void setTypeMap(Map
> map) throws SQLException { } @Override public void setHoldability(int holdability) throws SQLException { } @Override public int getHoldability() throws SQLException { return 0; } @Override public Savepoint setSavepoint() throws SQLException { return null; } @Override public Savepoint setSavepoint(String name) throws SQLException { return null; } @Override public void rollback(Savepoint savepoint) throws SQLException { } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return null; } @Override public Clob createClob() throws SQLException { return null; } @Override public Blob createBlob() throws SQLException { return null; } @Override public NClob createNClob() throws SQLException { return null; } @Override public SQLXML createSQLXML() throws SQLException { return null; } @Override public boolean isValid(int timeout) throws SQLException { return false; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { } @Override public String getClientInfo(String name) throws SQLException { return null; } @Override public Properties getClientInfo() throws SQLException { return null; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return null; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return null; } @Override public void setSchema(String schema) throws SQLException { } @Override public String getSchema() throws SQLException { return null; } @Override public void abort(Executor executor) throws SQLException { } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } @Override public int getNetworkTimeout() throws SQLException { return 0; } @Override public
T unwrap(Class
iface) throws SQLException { return null; } @Override public boolean isWrapperFor(Class> iface) throws SQLException { return false; } } ``` **运行结果** ``` 22:48:15.123 [Thread-0] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:15.123 [Thread-1] DEBUG c.Pool - borrow MockConnection{name='连接2'} 22:48:16.127 [Thread-0] DEBUG c.Pool - free MockConnection{name='连接1'} 22:48:16.127 [Thread-2] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:16.127 [Thread-1] DEBUG c.Pool - free MockConnection{name='连接2'} 22:48:16.127 [Thread-3] DEBUG c.Pool - borrow MockConnection{name='连接2'} 22:48:17.128 [Thread-2] DEBUG c.Pool - free MockConnection{name='连接1'} 22:48:17.128 [Thread-3] DEBUG c.Pool - free MockConnection{name='连接2'} 22:48:17.128 [Thread-4] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:18.128 [Thread-4] DEBUG c.Pool - free MockConnection{name='连接1'} ``` 从上面的结果可以看出每个时刻都只有两个线程在使用连接。 ### 四、Semaphore原理 #### 4.1 加解锁流程 Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位数量减一 + 刚开始,permits(state)为3,这时5个线程来获取资源 ![56.Semaphore介绍01.jpg](https://lilinchao.com/usr/uploads/2022/11/3017559926.jpg) + 假设其中 Thread-1,Thread-2,Thread-4 CAS竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞 ![56.Semaphore介绍02.jpg](https://lilinchao.com/usr/uploads/2022/11/3501510007.jpg) + 这时 Thread-4 释放了 permits,状态如下 ![56.Semaphore介绍03.jpg](https://lilinchao.com/usr/uploads/2022/11/2889234399.jpg) + 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态 ![56.Semaphore介绍04.jpg](https://lilinchao.com/usr/uploads/2022/11/3854171891.jpg) #### 4.2 源码分析 ```java static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { // permits 即 state super(permits); } // Semaphore 方法, 方便阅读, 放在此处 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 尝试获得共享锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Sync 继承过来的方法, 方便阅读, 放在此处 final int nonfairTryAcquireShared(int acquires) { for (; ; ) { int available = getState(); int remaining = available - acquires; if ( // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly remaining < 0 || // 如果 cas 重试成功, 返回正数, 表示获取成功 compareAndSetState(available, remaining) ) { return remaining; } } } // AQS 继承过来的方法, 方便阅读, 放在此处 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (; ; ) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取许可 int r = tryAcquireShared(arg); if (r >= 0) { // 成功后本线程出队(AQS), 所在 Node设置为 head // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE // r 表示可用资源数, 为 0 则不会继续传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // Semaphore 方法, 方便阅读, 放在此处 public void release() { sync.releaseShared(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryReleaseShared(int releases) { for (; ; ) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } } ``` ### 三、为什么要有PROPAGATE 早期有 bug + **releaseShared 方法** ```java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` **doAcquireShared 方法** ```java private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 这里会有空档 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ``` - **setHeadAndPropagate** ```java private void setHeadAndPropagate(Node node, int propagate) { setHead(node); // 有空闲资源 if (propagate > 0 && node.waitStatus != 0) { Node s = node.next; // 下一个 if (s == null || s.isShared()) unparkSuccessor(node); } } ``` 1. 假设存在某次循环里队队列里排队的结点状态为`head(-1)->t1(-1)->t2(-1)` 2. 假设存在将要释放的T3和T4,释放顺序为先T3后T4 ##### 正常流程 ![56.Semaphore介绍05.jpg](https://lilinchao.com/usr/uploads/2022/11/1384305073.jpg) ##### 产生BUG的情况 ![56.Semaphore介绍06.jpg](https://lilinchao.com/usr/uploads/2022/11/1616323815.jpg) 修复前版本执行流程 1. T3条用releaseShared(1),直接调用了unparkSuccessor(head),head的等待状态从-1变成0 2. T1由于T3释放信号量被唤醒,调用tryAcquireShared,假设返回值为0(获取锁成功,但没有剩余资源量) 3. T4调用releaseShared(1),此时head.waitStatus为0(此时读到的head和1为同一个head),不满足条件因此不调用unparkSuccessor(head) 4. T1获取信号量成功,调用setHeadAndPropagate时,因不满足propagate>0(2的返回值也就是propagate(剩余资源量)==0),从而不会唤醒后继结点,T2线程得不到唤醒 ##### bug修复后 ```java private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 设置自己为 head setHead(node); // propagate 表示有共享资源(例如共享读锁或信号量) // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 如果是最后一个节点或者是等待共享读锁的节点 if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (; ; ) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` ![56.Semaphore介绍07.jpg](https://lilinchao.com/usr/uploads/2022/11/3685643484.jpg) 1. T3调用releaseShared(),直接调用了unparkSuccessor(head),head的等待状态从-1变为0 2. T1由于T3释放信号量被唤醒,调用tryAcquireShared,假设返回值为0(获取锁成功,单没有剩余资源量) 3. T4调用releaseShared(),此时head.waitStatus为0(此时读到的head和1中为同一个head),调用doReleaseShared()将等待状态设置为PROPAGATE(-3) 4. T1获取信号量成功,调用setHeadAndPropagate时,读到h.waitStatus<0,从而调用doReleaseShared()唤醒T2 *附参考文章链接地址* *《黑马程序员之并发编程》*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2659.html
上一篇
55.StampedLock介绍
下一篇
57.CountdownLatch介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Livy
Spark Streaming
Java编程思想
gorm
nginx
Golang
Java工具类
查找
线程池
Nacos
并发编程
Zookeeper
散列
NIO
排序
队列
Stream流
BurpSuite
Filter
HDFS
持有对象
Quartz
DataWarehouse
JavaWeb
Linux
CentOS
MySQL
国产数据库改造
微服务
FileBeat
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞