李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
16.NIO之多线程优化
Leefs
2022-06-04 PM
1309℃
0条
[TOC] ### 前言 之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。 ### 一、概述 服务端线程可以建立多个线程,将这些线程分成两组: + 单线程配一个选择器(Boss),**专门处理 accept 事件** + 创建 cpu 核心数的线程(Worker),**每个线程配一个选择器,轮流处理 read 事件** **关系图** ![16.NIO之多线程优化01.jpg](https://lilinchao.com/usr/uploads/2022/06/1394560525.jpg) **说明** + Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。 + 当Boss线程检测到有客户端的连接请求,就会把这个连接返回的`SocketChannel`注册到某一个Worker线程上。 + 当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。 ### 二、实现思路 + 创建**一个**负责处理Accept事件的Boss线程,与**多个**负责处理Read事件的Worker线程; + **Boss线程**执行的操作 - 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要**根据标识index去判断将任务分配给哪个Worker** ``` // 创建固定数量的Worker Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 用于负载均衡的原子整数 AtomicInteger index = new AtomicInteger(0); // 负载均衡,轮询分配Worker workers[index.getAndIncrement()% workers.length].register(socket); ``` - `register(SocketChannel socket)`方法会**通过同步队列完成Boss线程与Worker线程之间的通信**,让`SocketChannel`的注册任务被Worker线程执行。添加任务后需要调用`selector.wakeup()`来唤醒被阻塞的Selector ```java public void register(final SocketChannel socket) throws IOException { // 只启动一次 if (!started) { // 初始化操作 } // 向同步队列中添加SocketChannel的注册事件 // 在Worker线程中执行注册事件 queue.add(new Runnable() { @Override public void run() { try { socket.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } }); // 唤醒被阻塞的Selector selector.wakeup(); } ``` + **Worker线程执行**的操作 - **从同步队列中获取注册任务,并处理Read事件** ### 三、代码实现 + **服务端代码** ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll; /** * Created by lilinchao * Date 2022/6/4 * Description 多线程优化 -- 服务端 */ @Slf4j public class MultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 负责轮询Accept事件的Selector Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); //创建固定数量的worker = core 数 Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i=0;i
iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected:{}",sc.getRemoteAddress()); // 2. 关联 selector (静态内部类可以访问到selector) log.debug("before register:{}",sc.getRemoteAddress()); // 负载均衡,轮询分配Worker workers[index.getAndIncrement() % workers.length].register(sc); log.debug("after register:{}",sc.getRemoteAddress()); } } } } static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private volatile boolean start = false; //还未初始化 /** * 同步队列,用于Boss线程与Worker线程之间的通信 */ private ConcurrentLinkedQueue
queue = new ConcurrentLinkedQueue<>(); public Worker(String name) { this.name = name; } //初始化线程和Selector public void register(SocketChannel sc) throws IOException { //只启动一次 if(!this.start){ this.thread = new Thread(this,name); this.selector = Selector.open(); this.thread.start(); this.start = true; } //向队列添加任务,但这个任务并没有立刻执行 queue.add(() -> { try { sc.register(selector,SelectionKey.OP_READ,null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup(); //唤醒select方法 } @Override public void run() { while (true){ try { selector.select(); //阻塞 // 通过同步队列获得任务并运行 Runnable task = queue.poll(); if(task != null){ task.run(); //获得任务,执行注册 } Iterator
iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); // Worker只负责Read事件 if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read...{}",channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } } ``` + **客户端代码** ```java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * Created by lilinchao * Date 2022/6/3 * Description 客户端 */ public class TestClient { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); sc.write(Charset.defaultCharset().encode("0123456789abcdef")); System.in.read(); } } ``` + **运行结果** ``` 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - connected:/127.0.0.1:52622 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - before register:/127.0.0.1:52622 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - after register:/127.0.0.1:52622 13:03:57 [DEBUG] [worker-0] c.l.n.t.MultiThreadServer - read.../127.0.0.1:52622 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [16] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef| +--------+-------------------------------------------------+----------------+ ``` 在运行时,可以同时运行多个客户端程序,查看服务端的输出效果。 #### 问题:如何拿到 cpu 个数 > * `Runtime.getRuntime().availableProcessors()` 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数 > * 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
标签:
Netty
,
NIO
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2115.html
上一篇
15.NIO Selector之处理write事件
下一篇
17.NIO之IO模型
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Hive
算法
Scala
数据结构和算法
NIO
设计模式
数学
Stream流
锁
随笔
Git
Filter
Http
Hadoop
高并发
HDFS
Kibana
ajax
线程池
Redis
pytorch
MyBatis-Plus
持有对象
容器深入研究
国产数据库改造
Golang基础
Tomcat
Java工具类
散列
DataWarehouse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭