李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
04.Netty源码分析之启动流程分析
Leefs
2022-06-24 PM
1466℃
0条
[TOC] ### 一、NIO启动流程 Netty底层是NIO,从对NIO的组件封装开始进行分析。本次对Netty的分析主要是在源码中找到下方NIO的方法,来看看netty中对下面的代码是怎样进行处理的 ```java //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector //可以同时监听多个channel上的读和写事件 Selector selector = Selector.open(); //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config NioServerSocketChannel attachment = new NioServerSocketChannel(); //3 创建 NioServerSocketChannel 时,创建了 JDK 原生的 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //4 启动 nio boss 线程执行接下来的操作 //5 注册(仅关联 selector 和 NioServerSocketChannel),0表示未关注事件 //selector一旦发生了事件,如何找到Nio相关的类去处理?答案是通过attachment,其将JDK原生的ssc和NioSsc联系起来。 SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment); //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor //7 绑定端口,监听端口8080 serverSocketChannel.bind(new InetSocketAddress(8080)); //8 ssc关注可连接事件,触发 channel active 事件,selectionKey 关注 op_accept 事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT); ``` ### 二、Netty启动流程 #### 2.1 Netty服务端的启动代码 ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; /** * @author lilinchao * @date 2022/6/24 * @description 服务端 **/ public class TestSourceServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer
() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler()); } }).bind(8080); } } ``` 通过该代码跟进源码,对服务端启动过程进行分析 #### 2.2 入口bind 选择器Selector的创建是在`NioEventloopGroup`中完成的。 `NioServerSocketChannel`与`ServerSocketChannel`的创建,`ServerSocketChannel`注册到Selector中以及绑定操作都是由bind方法完成的。所以服务器启动的入口便是`io.netty.bootstrap.ServerBootstrap.bind`。 ```java public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } ``` > 关键代码doBind ```java private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分 // 2.1 如果已经完成 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // 3.1 立刻调用 doBind0 doBind0(regFuture, channel, localAddress, promise); return promise; } // 2.2 还没有完成 else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); // 3.2 回调 doBind0 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // 处理异常... promise.setFailure(cause); } else { promise.registered(); // 3. 由注册线程去执行 doBind0 doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } ``` 其中有两个重要的方法:`initAndRegister()`和`doBind0(regFuture,channel,localAddress,promise)` - **initAndRegister**主要负责NioServerSocketChannel和ServerSocketChannel的创建(主线程中完成)与ServerSocketChannel注册(NIO线程中完成)工作 `init`就对应原生NIO中的 : `ServerSocketChannel ssc = ServerSocketChannel.open();` `register`就对应原生NIO中的: `SelectionKey selectionKey = ssc.register(selector,0,nettySsc);` - **doBind0**则负责连接的创建工作,对应原生NIO中的 :`ssc.bind(new InetSocketAddress(8080,backlog));` > 关键代码 `io.netty.bootstrap.AbstractBootstrap#initAndRegister` ```java final ChannelFuture initAndRegister() { Channel channel = null; try { // 这里才是真正初始化了一个channel, 通过 ServerBootstrap 的通 道工厂反射创建一个 NioServerSocketChannel channel = channelFactory.newChannel(); // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer init(channel); } catch (Throwable t) { // 处理异常... return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { // 处理异常... } return regFuture; } ``` > 关键代码 `io.netty.bootstrap.ServerBootstrap#init` ```java // 这里 channel 实际上是 NioServerSocketChannel void init(Channel channel) throws Exception { final Map
, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey
) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 为 NioServerSocketChannel 添加初始化器 p.addLast(new ChannelInitializer
() { // register之后才调用该方法,可以在此添加断点通过debug的方式查看何时被调用 @Override public void initChannel(final Channel ch) throws Exception { // 创建handler并加入到pipeline中 final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel ch.eventLoop().execute(new Runnable() { @Override public void run() { // 添加新的handler,在发生Accept事件后建立连接 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ``` **分析** 从源码中可以看出, init 的方法的核心作用在和 ChannelPipeline 相关。 从 NioServerSocketChannel **的初始化过程中**,我们知道,pipeline 是一个双向链表,并且,他本身就初始化了 head 和 tail,这里调用了他的 addLast 方法,也就是将整个 handler 插入到 tail 的前面,因为 tail 永远会在后面,需要做一些系统的固定工作。 #### 2.3 Register init执行完毕后,便执行`ChannelFuture regFuture = config().group().register(channel)`操作 该方法最终调用的是`promise.channel().unsafe().register(this, promise)`方法 ```java public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 一些检查,略... // 获取EventLoop AbstractChannel.this.eventLoop = eventLoop; //用于判断当前线程是不是eventLoop线程中的线程,也就是判断当前线程是否为NIO线程 if (eventLoop.inEventLoop()) { register0(promise); } else { try { // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行 // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程 // 这行代码完成的事实是 main -> nio boss 线程的切换 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { // 日志记录... closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } ``` > `io.netty.channel.AbstractChannel.AbstractUnsafe#register0` ```java private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel doRegister(); neverRegistered = false; registered = true; // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel pipeline.invokeHandlerAddedIfNeeded(); // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0 safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 对应 server socket channel 还未绑定,isActive 为 false if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ``` 后续`ChannelFuture regFuture = initAndRegister();`中的`regFuture`就是`safeSetSuccess`设置的`promise`,可以通过debug进行标识以及查看对象值来确定。 一般来说,真正执行操作的方法名前都会去加`do`,spring和Alibaba的源码中都是如此。 > 关键代码 `io.netty.channel.AbstractNioChannel#doRegister` ```java @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // javaChannel()即为ServerSocketChannel // eventLoop().unwrappedSelector()获取eventLoop中的Selector // this为NIOServerSocketChannel,作为附件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } } ``` **回调initChannel** ```java @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 添加新任务,任务负责添加handler // 该handler负责发生Accepet事件后建立连接 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } ``` **Register主要完成了以下三个操作** - 完成了主线程到NIO的**线程切换** - 通过`eventLoop.inEventLoop()`进行线程判断,判断当前线程是否为NIO线程 - 切换的方式为让eventLoop执行register的操作 - **register的操作在NIO线程中完成** - **调用doRegister方法** - **将ServerSocketChannel注册到EventLoop的Selector中** - 此时还未关注事件 - 添加NioServerSocketChannel附件 - 通过`invokeHandlerAddedIfNeeded`调用init中的`initChannel`方法 - initChannel方法主要创建了 两个handler - 一个handler负责设置配置 - 一个handler负责发生Accept事件后建立连接 #### 2.4 doBind0 **绑定端口** 在`doRegister`和`invokeHandlerAddedIfNeeded`操作中的完成后,会调用`safeSetSuccess(promise)`方法,向Promise中设置执行成功的结果。此时`doBind`方法中由`initAndRegister`返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作 ```java // initAndRegister为异步方法,会返回ChannelFuture对象 final ChannelFuture regFuture = initAndRegister(); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); // 如果没有异常,则执行绑定操作 doBind0(regFuture, channel, localAddress, promise); } } }); ``` **doBind0最底层调用的是ServerSocketChannel的bind方法** NioServerSocketChannel.doBind方法,通过该方法,绑定了对应的端口 ```java @SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { // 判断java版本是不是大于7 if (PlatformDependent.javaVersion() >= 7) { // 调用ServerSocketChannel的bind方法,绑定端口 javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } ``` **关注事件** 在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件 ```java if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } ``` 最终在`AbstractNioChannel.doBeginRead`方法中,会添加ServerSocketChannel添加Accept事件 ```java @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); // 如果ServerSocketChannel没有关注Accept事件 if ((interestOps & readInterestOp) == 0) { // 则让其关注Accepet事件 // readInterestOp 取值是 16 // 在 NioServerSocketChannel 创建时初始化 selectionKey.interestOps(interestOps | readInterestOp); } } ``` **注意**:此处设置interestOps时使用的方法,**避免覆盖关注的其他事件** - 首先获取Channel所有感兴趣的事件 ```java final int interestOps = selectionKey.interestOps(); ``` - 然后再设置其感兴趣的事件 ```java selectionKey.interestOps(interestOps | readInterestOp); ``` *附参考文章链接* *https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80/* *https://blog.csdn.net/weixin_45296116/article/details/123227825*
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2182.html
上一篇
03.Netty搭建RPC框架
下一篇
01.Golang介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
设计模式
Spark SQL
数据结构
Jenkins
链表
随笔
国产数据库改造
Python
JavaWeb
算法
机器学习
Ubuntu
Java
Flume
递归
Filter
Spark RDD
Stream流
Http
持有对象
nginx
FastDFS
Hive
工具
MyBatisX
Spark Core
哈希表
BurpSuite
SpringCloudAlibaba
NIO
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭