李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
02.Netty进阶之粘包与半包解决方案
Leefs
2022-06-14 PM
1182℃
0条
[TOC] ### 一、解决方案 1. **短链接**:发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低 2. **定长解码器**:每一条消息采用固定长度,缺点浪费空间 3. **行解码器**:每一条消息采用分隔符,例如 \n,缺点需要转义 4. **LTC解码器**:每一条消息分为 head 和 body,head 中包含 body 的长 ### 二、示例 #### 2.1 短链接 **客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开**。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以**短链接无法解决半包现象** + **思路** > 服务端:一次接收最大缓冲区大小为16字节大小的数据 > > 客户端:一次发送的字节数大于16字节,一次数据发送完成后断开于服务端的连接,循环发送十次消息 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * @author lilinchao * @date 2022/6/14 * @description 短链接 服务端 **/ @Slf4j public class Server01 { void start(){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //调整 netty 的接收缓冲区(byteBuf) //AdaptiveRecvByteBufAllocator:参数1:最小容量 参数2:初始容量 参数3:最大容量 serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16)); serverBootstrap.group(boss,worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error",e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server01().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * @author lilinchao * @date 2022/6/14 * @description 短链接 客户端 * 发送完成一次连接断开一次 **/ @Slf4j public class Client01 { public static void main(String[] args) { //发送十次消息 for (int i = 0; i < 10; i++) { send(); } System.out.println("finish"); } private static void send(){ NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //会在连接channel建立成功后,触发active事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); ctx.writeAndFlush(buf); //发送一次消息就直接断开连接 ctx.channel().close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { log.error("client error",e); } finally { worker.shutdownGracefully(); } } } ``` **运行结果** ``` 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] REGISTERED 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] ACTIVE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 10 11 |.. | +--------+-------------------------------------------------+----------------+ 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 ! R:/127.0.0.1:49486] INACTIVE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 ! R:/127.0.0.1:49486] UNREGISTERED ``` 只复制一次,客户端从连接到发送完消息以后断开的过程。 从结果可以很清晰的看到,没有发生粘包现象,但是半包问题未得到解决。 #### 2.2 定长解码器 客户端于服务器**约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度**。若发送数据长度不足则需要**补齐**至该长度。 服务器接收数据时,**将接收到的数据按照约定的最大长度进行拆分**,即使发送过程中产生了粘包,也可以通过**定长解码器**将数据正确地进行拆分。 `Netty`中提供了一个`FixedLengthFrameDecoder`(固定长度解析器),是一个特殊的handler,专门用来进行解码。 + **思路** > 客户端:给每个发送的数据封装成定长的长度(多余的使用分隔符`_`,统一规定)最后统一通过一个ByteBuf发送出去;(会产生粘包) > > 服务端:通过`FixedLengthFrameDecoder`进行固定长度解析,每次解析到定长的Bytebuf进行处理。 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Server2 { void start() { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); // 调整系统的接收缓冲区(滑动窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); // 调整 netty 的接收缓冲区(byteBuf) serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16)); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //对数据进行定长解码 ch.pipeline().addLast(new FixedLengthFrameDecoder(10)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server2().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.Random; @Slf4j public class Client2 { public static void main(String[] args) { send(); System.out.println("finish"); } //将数据填充到十个字节,不够的部分使用'_' public static byte[] fill10Bytes(char c, int len) { byte[] bytes = new byte[10]; Arrays.fill(bytes, (byte) '_'); for (int i = 0; i < len; i++) { bytes[i] = (byte) c; } System.out.println(new String(bytes)); return bytes; } private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 会在连接 channel 建立成功后,会触发 active 事件 @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1); c++; buf.writeBytes(bytes); } ctx.writeAndFlush(buf); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } } ``` 缺点是,数据包的大小不好把握 * 长度定的太大,浪费 * 长度定的太小,对某些数据包又显得不够 #### 2.3 行解码器 行解码器的是**通过分隔符对数据进行拆分**来解决粘包半包问题的 **在Netty中提供了两个解码器:** + **LineBasedFrameDecoder**:指定以换行符作为分隔符。\n或者\r\n,使用它的时候,会有一个最大长度限制,若是超过了最大长度还没有找到换行符就会抛出一个异常。 + **DelimiterBasedFrameDecoder**:可以自定义符号来作为分隔符,在构造方法中有最大长度和一个Bytebuf类型的分隔符。 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Server3 { void start() { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); // 调整系统的接收缓冲区(滑动窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); // 调整 netty 的接收缓冲区(byteBuf) serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16)); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //指定以换行符作为分隔符,限制最大字节长度为1024 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server3().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.util.Random; @Slf4j public class Client3 { public static void main(String[] args) { send(); System.out.println("finish"); } public static StringBuilder makeString(char c, int len) { StringBuilder sb = new StringBuilder(len + 2); for (int i = 0; i < len; i++) { sb.append(c); } sb.append("\n"); return sb; } private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 会在连接 channel 建立成功后,会触发 active 事件 @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { StringBuilder sb = makeString(c, r.nextInt(256) + 1); c++; buf.writeBytes(sb.toString().getBytes()); } ctx.writeAndFlush(buf); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } } ``` 缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误。 同时,效率比较低,需要一个一个字节去匹配消息的边界。 *附参考文章链接:* *https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80*
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2165.html
上一篇
01.Netty进阶之粘包与半包现象分析
下一篇
03.Netty进阶之长度域解码器
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Ubuntu
Spark Core
JavaWEB项目搭建
SpringBoot
Tomcat
国产数据库改造
前端
Golang基础
BurpSuite
线程池
JavaScript
Python
人工智能
HDFS
Linux
JavaWeb
稀疏数组
ClickHouse
Kibana
SpringCloud
Golang
Nacos
机器学习
哈希表
JVM
序列化和反序列化
字符串
二叉树
Typora
Docker
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭