李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
42.并发编程之自定义线程池
Leefs
2022-11-16 PM
1074℃
0条
[TOC] ### 线程池模型架构 + 自定义线程池包括:Thread Pool(线程池)+ Blocking Queue(阻塞队列) ![42.并发编程之自定义线程池01.jpg](https://lilinchao.com/usr/uploads/2022/11/2985369599.jpg) **图例分析** 图中内容表示,三个消费线程或者说是核心线程 `t1`、`t2`、`t3` 通过poll方法从阻塞队列中执行任务,main线程不断地往阻塞队列中put任务task,如果核心线程处于忙碌状态,task就放进阻塞队列中。 ### 实现步骤 **步骤1:自定义拒绝策略接口** ```java @FunctionalInterface // 拒绝策略 interface RejectPolicy
{ void reject(BlockingQueue
queue, T task); } ``` **作用** 当核心线程都被占用,并且阻塞队列中的任务也满时,就会触发拒绝策略。 简单理解就是,任务实在太多了,分配给线程池的所有线程都参与进来处理任务,但还是处理不过来,同时存放任务的缓存队列空间也满了,剩下来的任务该如何进行处理呢,这个时候就涉及到拒绝策略。 **拒绝策略的方式**: + 死等 + 带超时等待 + 让调用者放弃任务执行 + 让调用者抛出异常 + 让调用者自己执行任务等等 **步骤2:自定义任务队列** ```java //自定义任务队列 @Slf4j(topic = "c.BlockingQueue") class BlockingQueue
{ // 1.任务队列 private Deque
queue = new ArrayDeque<>(); // 2.锁 (防止多个线程获取同一个任务) private ReentrantLock lock = new ReentrantLock(); // 3.生产者条件变量 (当阻塞队列满了以后,生产者线程等待) private Condition fullWaitSet = lock.newCondition(); // 4.消费者条件变量 (当阻塞队列为空以后,消费者线程等待) private Condition emptyWaitSet = lock.newCondition(); // 5.容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } // 带超时阻塞获取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try { // 将timeout统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { // 返回值是剩余时间 if (nanos <= 0){ return null; } //返回值是:等待时间-经过的时间 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } //移除队列中的第一个元素 T t = queue.removeFirst(); //唤醒生产者线程继续生产 fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T task() { lock.lock(); try { while (queue.isEmpty()) { try { //当任务队列为空,消费者就没有任务可以消费,那么就进入等待的状态 emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //此时任务队列不为空,取出任务队列当中队头的任务返回 T t = queue.removeFirst(); //当从任务队列当中取出一个任务的时候,任务队列就有空位了,就可以唤醒因为队列满了而等待的生产者 fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞添加 public void put(T task) { lock.lock(); try { // 队列已满 while (queue.size() == capcity) { try { log.debug("等待加入任务队列{}...",task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列{}",task); //当有空位的时候,将新的任务放到队列的尾部 queue.addLast(task); //添加完新的元素之后,需要唤醒等待当中的消费者队列,因为有新的任务进队列 emptyWaitSet.signal(); } finally { lock.unlock(); } } // 带超时时间阻塞添加 public boolean offer(T task,long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0) { return false; } log.debug("等待加入任务队列 {} ...", task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列{}",task); // 向队列中添加元素 queue.addLast(task); // 唤醒消费者线程 emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } // 获取阻塞队列的大小 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } // 带自定义拒绝策略的的添加 public void tryPut(RejectPolicy
rejectPolicy,T task) { lock.lock(); try { // 判断队列是否满 if(queue.size() == capcity) { // 执行拒绝策略的方法 rejectPolicy.reject(this,task); } else { // 有空闲 log.debug("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } ``` **说明** + **queue(队列)**:生产者创建的任务都放在queue中 ,`Deque`是一个双向链表,比`LinkedList`效率要高,当然这里也可以用`LinkedList`。 + **lock(锁)**:为了防止多个线程获取同一个任务,本次使用的是`ReentrantLock` 锁,目的是因为它可以提供两个条件变量(集合)`fullWaitSet` 和`emptyWaitSet` 。 + **fullWaitSet(生产者条件变量)**:当任务队列queue满的时候,生产者线程将要进入`fullWaitSet` 阻塞状态,不能再进行生产。 + **emptyWaitSet(消费者条件变量)**:当queue为空的时候,消费者线程(t1、t2、t3)就无任务可以进行消费任务,同理也应该阻塞,进入`emptyWaitSet` 。 + **capcity(任务容量)**:初始化创建任务队列的容量,与上面两个不一样,这个是放任务的,上面两个是放线程的。 + **BlockingQueue(int capcity)**:构造方法,用来初始化任务队列的容量。 + **poll(long timeout, TimeUnit unit)**:核心线程带有超时的获取任务的方法 如果任务是空的就阻塞,而且是带有超时的阻塞,如果获取任务成功,说明queue获取后就不是满的状态了,所以应该唤醒`fullWaitSet` 中阻塞的生产者线程,让生产者线程继续生产任务。 + **offer(T task, long timeout, TimeUnit timeUnit)** :生产者线程用于向队列queue中添加任务的方法 这个方法也是带有阻塞,如果queue是满的,就不应该添加任务,main线程就应该阻塞,这里也是使用了超时阻塞,原因和上面一样,不想让它阻塞太久,任务添加不进就不添加,一直阻塞势必消耗CPU资源。 + **size()**:获取当前任务的数量 + **tryPut(RejectPolicy rejectPolicy, T task)**:调用生产者提供的拒绝策略,它的调用时机是,核心线程用完了(t1、t2、t3都忙),如果任务队列满了,就执行拒绝策略,如果没满就放任务队列中。 **步骤3:自定义线程池** ```java @Slf4j(topic = "c.ThreadPool") class ThreadPool { // 任务队列 private BlockingQueue
taskQueue; // 线程集合 private HashSet
workers = new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务时的超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy
rejectPolicy; // 执行任务 public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); // 交给工作线程 log.debug("新增 worker{}, {}", worker, task); workers.add(worker); // 将工作线程交给工作线程队列 worker.start(); // 启动 } else { // taskQueue.put(task); // 1) 死等 // 2) 带超时等待 // 3) 让调用者放弃任务执行 // 4) 让调用者抛出异常 // 5) 让调用者自己执行任务 //拒绝策略,即到底如何处理多余的任务,交由创建线程池的创建者选择 taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy
rejectPolicy) { this.coreSize = coreSize; //核心线程数 this.timeout = timeout; //获取任务的超时时间 this.timeUnit = timeUnit; //转换时间器 this.taskQueue = new BlockingQueue<>(queueCapcity); //阻塞队列 this.rejectPolicy = rejectPolicy; //拒绝策略,在构建线程池的时候定义 } // 工作线程 class Worker extends Thread{ // 任务 private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 // while(task != null || (task = taskQueue.take()) != null) { while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.debug("正在执行...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } //如果都没有任务了,那么该工作线程被移除 synchronized (workers) { log.debug("worker 被移除{}", this); workers.remove(this); } } } } ``` **说明** - **taskQueue**:任务队列,这个队列中有封装好的取任务和添加任务的方法,以及线程的阻塞队列等属性。 - **workers** :存放工作线程,也就是核心线程。 - **coreSize**:定义核心线程的数量。 - **timeout和timeUnit**:任务的超时时间,下面调用之前方法传参用 - **rejectPolicy**:传参用 - **execute**:线程池向外提供的执行方法 + `ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,` `RejectPolicy rejectPolicy)`:创建线程池的初始化方法 + **class Worker**:线程实体,它的逻辑是先执行当前任务,如果当前任务执行结束后从任务队列中取任务执行。 **步骤4:测试** 按照定义的决策策略分别进行演示。 **(1)死等** > 初始化核心线程数是1,超时取任务的时间是1秒,任务队列容量是1,拒绝策略是死等 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 死等 queue.put(task); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:15:56.232 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:15:56.236 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:15:56.236 [main] DEBUG c.BlockingQueue - 等待加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2... 16:15:56.236 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e ``` **结果分析** - 首先创建了一个核心线程,开始执行第一个任务; - 由于任务执行周期长,核心线程一直处于忙碌状态,因此第二个任务到来时,放入任务队列等待核心线程进入空闲状态; - 当再来第三个任务时,此时任务队列已满,同时核心线程中的第一个任务也未执行完毕,此时主线程就进入到阻塞队列`fullWaitSet`一直死等,等待queue有位置。 **(2)带超时等待** > 每个任务的执行周期是1秒,拒绝策略是main线程等待1.5秒,如果还添加不进去任务,不再进行添加 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 1. 死等 // queue.put(task); // 2. 带超时等待 queue.offer(task, 1500, TimeUnit.MILLISECONDS); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:25:30.918 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:30.922 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:30.922 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:25:30.922 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:31.923 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:25:31.923 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:31.923 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:32.924 [main] DEBUG c.CustomThreadPoolDemo - 2 16:25:32.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:25:32.925 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:33.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 2 16:25:34.927 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` 可以看到三个任务都被执行了。 > 如果将主线程的拒绝策略改成0.5秒呢? ``` 16:28:37.554 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.557 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:37.558 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.558 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:28:38.558 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:28:39.059 [main] DEBUG c.CustomThreadPoolDemo - 2 16:28:39.059 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:40.059 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:28:41.060 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` 可以发现第三个任务没有执行。 **(3)让调用者放弃任务执行** > 直接让main线程打印一下任务,不执行任何添加操作 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 3. 让调用者放弃任务执行 log.debug("放弃{}", task); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:34:53.012 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:53.016 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:53.016 [main] DEBUG c.CustomThreadPoolDemo - 放弃com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:34:53.016 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:54.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:34:54.018 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:55.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:34:56.019 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **(4)让调用者抛出异常** ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 4. 让调用者抛出异常 throw new RuntimeException("任务执行失败 " + task); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:46:30.854 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:30.857 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f Exception in thread "main" java.lang.RuntimeException: 任务执行失败 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.lambda$main$0(CustomThreadPoolDemo.java:29) at com.lilinchao.concurrent.demo_05.BlockingQueue.tryPut(CustomThreadPoolDemo.java:270) at com.lilinchao.concurrent.demo_05.ThreadPool.execute(CustomThreadPoolDemo.java:78) at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.main(CustomThreadPoolDemo.java:35) 16:46:30.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:31.859 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:46:31.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:46:32.860 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:46:33.861 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **结果分析** 从结果可以看出,核心线程执行第一到来的任务,将第二个任务加入到阻塞队列中,当第三个任务再来时,直接抛出异常,同时不再尝试添加之后的任务。 (5)让调用者自己执行任务 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 5. 让调用者自己执行任务 task.run(); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:52:05.328 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:05.331 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:05.332 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:06.332 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:52:06.332 [main] DEBUG c.CustomThreadPoolDemo - 2 16:52:07.333 [main] DEBUG c.CustomThreadPoolDemo - 3 16:52:07.333 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:08.333 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:52:09.334 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **结果分析** 从结果可以看出,后面再来的两个线程直接由主线程进行执行。
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2580.html
上一篇
41.并发编程之final详解
下一篇
43.ThreadPoolExecutor线程池状态和构造方法
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
队列
Hbase
Nacos
Spark SQL
VUE
前端
Shiro
随笔
SpringCloud
并发编程
ajax
Scala
Golang
Netty
GET和POST
排序
SQL练习题
MyBatis
Python
稀疏数组
Sentinel
FastDFS
BurpSuite
ClickHouse
RSA加解密
FileBeat
栈
国产数据库改造
DataWarehouse
序列化和反序列化
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭