李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
48.Tomcat线程池简单介绍
Leefs
2022-11-23 PM
1524℃
0条
[TOC] ### 前言 tomcat的线程池扩展了jdk的executor,而且队列用的是自己的task queue,因此其策略与jdk的有所不同。 本篇将讨论一下tomcat线程池和jdk线程池的不同之处,以及tomcat为什么要重写jdk线程池的方法。 ### 一、Tomcat的请求处理过程 ![48.Tomcat线程池简单介绍02.png](https://lilinchao.com/usr/uploads/2022/11/503455772.png) 一个客户端请求到达Tomcat之后的处理流程如上图所示: - 当Tomcat启动后,Connector的接收器Acceptor会监听是否有客户端连接。 - 一旦监听到客户端连接,则将连接交给线程池Executor,开始执行请求响应任务。 - Http11Processor负责从客户端连接中读取Http报文并进行解析,解析后的报文封装成Request对象。 - Maper根据Http协议请求的URL值和Host属性匹配由哪个Host、哪个Context和哪个Wrapper容器来处理请求。 - CoyoteAdaptor负责将Connector组件和Engine容器连接起来,将Request对象和Response对象传递到Engine容器中。 - Engine容器的请求处理管道开始工作,管道里包括若干Valve,每个Valve都负责一些处理逻辑。 - Engine容器的请求处理管道工作完成后,再依次交给Host容器的处理管道、Context容器的处理管道和Wrapper容器的处理管道。最后将结果输出到客户端。 ### 二、tomcat在哪里用到了线程池 ![48.Tomcat线程池简单介绍01.png](https://lilinchao.com/usr/uploads/2022/11/2820302018.png) **说明** - LimitLatch用来限流,可以控制最大连接个数 - acceptor负责接收新的socket连接 - poller负责监听socket channel是否有可读的IO事件 - 一旦有可读的IO事件被监听到,就封装一个任务对象socketProcessor,提交给executor线程池处理 - executor线程池中的工作线程最终负责处理请求 ### 三、Tomcat线程池和JUC线程池流程 **JDK线程池策略:** 1. 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务。 2. 当前线程池线程数量大于等于corePoolSize,则每来一个任务。会尝试将其添加到任务缓存队列中,若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行。 3. 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理。 **tomcat线程池策略:** 1. 当前线程数小于corePoolSize,则去创建工作线程; 2. 当前线程数大于corePoolSize,但小于maximumPoolSize,则去创建工作线程; 3. 当前线程数大于maximumPoolSize,则将任务放入到阻塞队列中,当阻塞队列满了之后,则调用拒绝策略丢弃任务; **Tomcat线程池对JUC线程池的增强** Tomcat是重写了JDK线程池(即改动的是少量的源码)实现的功能的增强。 主要流程还是JDK线程池流程,即先开启corePoolSize线程,然后在queue,最后在开启maximumPoolSize线程。 Tomcat重点改造的是queue的offer()。即在向queue放入任务时,若发现未达到最大线程数,那么offer()返回false,即放入队列失败。此时,便继续开启maximumPoolSize线程。 那么Tomcat为什么要重新改造JDK线程池呢? ### 四、Tomcat线程池和JUC线程池的区别 **使用线程池的任务有两种**: + IO密集型任务(如调用接口、查询数据库); + CPU密集型任务; **场景** + **JDK线程池**:当线程数达到corePoolSize后,任务首先被放到queue。发挥CPU多核的并行优势,减少多个线程导致的上下文切换。 适合的场景是:**CPU密集型任务** + **Tomcat线程池**:当大量请求达到时,接收的请求数量大于核心线程池的corePoolSize时,会继续创建worker线程去处理请求。而后续请求量变少时,只会销毁maximumPoolSize线程数。 适合的场景是:**IO密集型任务**。 JDK的线程池`ThreadPoolExecutor`主要目的解决的便是CPU密集型任务的并发处理。但是Tomcat若使用原生的JDK线程池,一旦接收的请求数量大于线程池的核心线程数,这些请求就会被放到队列中,等待核心线程处理。这样会降低请求的总体处理速度,所以Tomcat并没有使用JDK原生线程池的策略。 ### 五、源码分析 Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同 + 如果总线程数达到 maximumPoolSize + 这时不会立刻抛 RejectedExecutionException 异常 + 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常 **拒绝策略** ```java // ThreadPoolExecutor.class private static class RejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) { // 直接抛出错误 throw new RejectedExecutionException(); } } ``` **任务执行** ```java @Override public void execute(Runnable command) { execute(command,0,TimeUnit.MILLISECONDS); } public void execute(Runnable command, long timeout, TimeUnit unit) { // 提交计数加一 submittedCount.incrementAndGet(); try { // 此处调用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法 super.execute(command); } catch (RejectedExecutionException rx) { // 如果调用父类中的方法执行错误,会尝试将任务再一次放入到等待队列里 if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { // 此处尝试放入等待队列 // 如果也失败了,就回滚提交计数,并抛出异常 if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } } @Override protected void afterExecute(Runnable r, Throwable t) { // 计数加一 submittedCount.decrementAndGet(); // 如果没有报错,那么此处尝试关闭多余的线程 // 抛出错误的方式停止线程 if (t == null) { stopCurrentThreadIfNeeded(); } } /** * 判断是否需要关闭线程 **/ protected void stopCurrentThreadIfNeeded() { // 如果线程存活时间超过了 delay 值,那么此处会抛出一个错误,使线程停止 if (currentThreadShouldBeStopped()) { long lastTime = lastTimeThreadKilledItself.longValue(); if (lastTime + threadRenewalDelay < System.currentTimeMillis()) { if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis() + 1)) { final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak", Thread.currentThread().getName()); throw new StopPooledThreadException(msg); } } } } protected boolean currentThreadShouldBeStopped() { // 如果当前线程并非工作线程,或者不存在线程存活 delay 值,那么此处返回 false // 如果当前线程是工作线程,且设置了 delay 时间,且当前线程的存活时间已经超过了设置值,那么此处返回 true if (threadRenewalDelay >= 0 && Thread.currentThread() instanceof TaskThread) { TaskThread currentTaskThread = (TaskThread) Thread.currentThread(); if (currentTaskThread.getCreationTime() < this.lastContextStoppedTime.longValue()) { return true; } } return false; } ``` 从execute 方法可以看出,当提交线程的时候,如果被线程池拒绝了,Tomcat 的线程池,还会再次尝试,调用 force() 方法"强行"的尝试向阻塞队列中添加任务。 **TaskQueue** TaskQueue 是 Tomcat 中对任务队列的增强和封装 ```java public class TaskQueue extends LinkedBlockingQueue
{ // 序列编码 private static final long serialVersionUID = 1L; // 字符串管理类 protected static final StringManager sm = StringManager .getManager("org.apache.tomcat.util.threads.res"); // 任务队列关联的线程池 private transient volatile ThreadPoolExecutor parent = null; // 不太清楚是做什么用的一个容量计数 private Integer forcedRemainingCapacity = null; // 其它方法暂时忽略 // ... ``` **添加、获取任务的相关方法** ```java // 不带超时的添加任务方法 public boolean force(Runnable o) { // 关联线程池不可为空 if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); // 调用 LinkedBlockingQueue 的 offer(...) 方法添加任务 return super.offer(o); } // 带超时的添加任务方法 public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); return super.offer(o,timeout,unit); } // 本质上是调用父类的 offer(...) 方法 // 非阻塞添加任务的方法 @Override public boolean offer(Runnable o) { if (parent == null) return super.offer(o); if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); if (parent.getSubmittedCount() <= (parent.getPoolSize())) return super.offer(o); // 这种情况下线程池可以直接消费任务,无需放入任务队列等待 if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false; return super.offer(o); } // 带超时的阻塞方式获取任务 @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { // 获取一个任务,如果获取到的为空,则停止当前线程 // 能获取到就返回任务 Runnable runnable = super.poll(timeout, unit); if (runnable == null && parent != null) { parent.stopCurrentThreadIfNeeded(); } return runnable; } // 阻塞方式获取任务 @Override public Runnable take() throws InterruptedException { // 线程池存在的情况下,会使用限时的方式去获取任务 if (parent != null && parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } return super.take(); } ``` > Connector配置 (连接器配置) | 配置项 | 默认值 | 说明 | | ------------------- | ------ | -------------------------------------- | | acceptorThreadCount | 1 | acceptor 线程数量 | | pollerThreadCount | 1 | poller 线程数量 | | minSpareThreads | 10 | 核心线程数,即 corePoolSize | | maxThreads | 200 | 最大线程数,即 maximumPoolSize | | executor | - | Executor 名称,用来引用下面的 Executor | > Executor 线程配置 | 配置项 | 默认值 | 说明 | | ----------------------- | ----------------- | ----------------------------------------- | | threadPriority | 5 | 线程优先级 | | daemon | true | 是否守护线程 | | minSpareThreads | 25 | 核心线程数,即 corePoolSize | | maxThreads | 200 | 最大线程数,即 maximumPoolSize | | maxIdleTime | 60000 | 线程生存时间,单位是毫秒,默认值即 1 分钟 | | maxQueueSize | Integer.MAX_VALUE | 队列长度 | | prestartminSpareThreads | false | 核心线程是否在服务器启动时启动 | *附参考文章链接* *https://www.jianshu.com/p/63bfba7c66c7*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2596.html
上一篇
47.任务调度线程池介绍
下一篇
49.Fork&Join框架介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
ClickHouse
Filter
Elasticsearch
Jenkins
DataWarehouse
栈
SpringCloud
Java编程思想
GET和POST
Yarn
Git
MyBatis
Azkaban
JavaWeb
Hive
Sentinel
gorm
nginx
Eclipse
VUE
Docker
JavaSE
CentOS
Flume
Livy
Ubuntu
并发编程
Shiro
持有对象
链表
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭