李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
14.NIO消息边界问题处理
Leefs
2022-06-03 PM
857℃
0条
[TOC] ### 一、消息边界问题的产生 #### 1.1 服务端代码 ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; /** * Created by lilinchao * Date 2022/6/3 * Description 消息边界问题 服务端 */ @Slf4j public class Server { public static void main(String[] args) throws IOException { //1.创建selector,管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); //2. 建立channel和selector之间的联系(注册) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while (true){ //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理 selector.select(); //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件 Iterator
iterator = selector.selectedKeys().iterator(); //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型 while (iterator.hasNext()){ SelectionKey key = iterator.next(); //处理key的时候要从selectKeys中删除,否则会报错 iterator.remove(); //5.区分事件类型 if(key.isAcceptable()){ //拿到触发事件的channel ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept(); //设置为非阻塞 sc.configureBlocking(false); //scKey管sc的channel SelectionKey scKey = sc.register(selector, 0, null); //scKey关注读事件,也就是说客户端的通道关注可读事件 scKey.interestOps(SelectionKey.OP_READ); }else if(key.isReadable()){ //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错 try { SocketChannel channel = (SocketChannel)key.channel(); //将缓冲区大小设置为4 ByteBuffer buffer1 = ByteBuffer.allocate(4); //客户端正常断开,read返回值是-1 int read = channel.read(buffer1); if(read == -1){ //正常断开 key.channel(); } buffer1.flip(); System.out.println(Charset.defaultCharset().decode(buffer1)); } catch (IOException e) { e.printStackTrace(); key.cancel();//客户端断开,需要将key取消(从selector的key集合中真正删除) } } } } } } ``` #### 1.2 客户端代码 ```java import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SocketChannel; /** * Created by lilinchao * Date 2022/6/3 * Description 客户端 */ public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); SocketAddress localAddress = sc.getLocalAddress(); System.out.println("waiting..."); } } ``` **运行程序** (1)运行服务端代码 (2)通过Debug模式运行客户端代码 (3)通过客户端向服务端发送如下请求 ```java sc.write(Charset.defaultCharset().encode("中国")); ``` **服务端输出结果** ![14.NIO消息边界问题处理01.jpg](https://lilinchao.com/usr/uploads/2022/06/2258012529.jpg) 从输出结果可以看到,**国**字出现了乱码。 **问题分析** 因为在服务端代码中设置的接收客户端数据的缓冲区大小是4个字节,在UTF-8编码中,一个汉字占三个字节,也就是服务端在接收客户端发送到的消息时,只接收到了中字的三个字节和国字的第一个字节就进行了打印输出,导致国字出现了半包问题,产生了乱码。 ### 二、消息边界问题分析 ![14.NIO消息边界问题处理02.png](https://lilinchao.com/usr/uploads/2022/06/720319548.png) **分析** + **时刻1**:ByteBufeer较小,但是发送过来的消息比较大,一次处理不完; + **时刻2**:ByteBufeer较大,消息比较小。会出现半包现象 + **时刻3**:ButeBuffer可以一次性接收客户端发送过来的多条消息。此时会出现黏包现象 **解决思路** (1)**固定消息长度**,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽 (2)**按分隔符拆分**,缺点是效率低,需要一个一个字符去匹配分隔符 (3)**TLV 格式,即 Type 类型、Length 长度、Value 数据**(也就是在消息开头**用一些空间存放后面数据的长度**),如HTTP请求头中的Content-Type与**Content-Length**。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量 ![14.NIO消息边界问题处理03.png](https://lilinchao.com/usr/uploads/2022/06/3010171246.png) ### 三、解决消息边界问题 本示例将按照第二种方式,**按分隔符拆分**来解决消息边界问题。 #### 3.1 附件与扩容 Channel的register方法还有**第三个参数**:`附件`,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的`SelectionKey`绑定,可以从`SelectionKey`获取到对应通道的附件 ```java public final SelectionKey register(Selector sel, int ops, Object att) ``` 可通过SelectionKey的**attachment()方法获得附件** ```java ByteBuffer buffer = (ByteBuffer) key.attachment(); ``` 需要在Accept事件发生后,将通道注册到Selector中时,**对每个通道添加一个ByteBuffer附件**,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题 ```java // 设置为非阻塞模式,同时将连接的通道也注册到选择器中,同时设置附件 socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // 添加通道对应的Buffer附件 socketChannel.register(selector, SelectionKey.OP_READ, buffer); ``` 当Channel中的数据大于缓冲区时,需要对缓冲区进行**扩容**操作。此代码中的扩容的判定方法:**Channel调用compact方法后,position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中** ```java // 如果缓冲区太小,就进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2); // 将旧buffer中的内容放入新的buffer中 newBuffer.put(buffer); // 将新buffer作为附件放到key中 key.attach(newBuffer); } ``` #### 3.2 完整代码 + **需求** > 将服务端缓冲区大小设置成16,客户端向服务端发送数据21个字节的数据`0123456789abcdef3333\n` > > + `\n`为消息的分隔符,占一个字节大小 + **过程分析** ![14.NIO消息边界问题处理04.jpg](https://lilinchao.com/usr/uploads/2022/06/1694622757.jpg) + **服务端代码** ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll; /** * Created by lilinchao * Date 2022/6/3 * Description 服务端 */ @Slf4j public class MessageBorderServer { public static void main(String[] args) throws IOException { // 1. 创建 selector, 管理多个 channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2. 建立 selector 和 channel 的联系(注册) // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件 SelectionKey sscKey = ssc.register(selector, 0, null); // key 只关注 accept 事件 sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("sscKey:{}", sscKey); ssc.bind(new InetSocketAddress(8080)); while (true) { // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行 // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理 selector.select(); // 4. 处理事件, selectedKeys 内部包含了所有发生的事件 Iterator
iter = selector.selectedKeys().iterator(); // accept, read while (iter.hasNext()) { SelectionKey key = iter.next(); // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题 iter.remove(); log.debug("key: {}", key); // 5. 区分事件类型 if (key.isAcceptable()) { // 如果是 accept ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // attachment // 将一个 byteBuffer 作为附件关联到 selectionKey 上 SelectionKey scKey = sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); log.debug("{}", sc); log.debug("scKey:{}", scKey); } else if (key.isReadable()) { // 如果是 read try { SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel // 获取 selectionKey 上关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1 if(read == -1) { key.cancel(); } else { split(buffer); // 需要扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 0123456789abcdef3333\n key.attach(newBuffer); } } } catch (IOException e) { e.printStackTrace(); key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key) } } } } } private static void split(ByteBuffer source) { source.flip(); for (int i = 0; i < source.limit(); i++) { // 找到一条完整消息 if (source.get(i) == '\n') { int length = i + 1 - source.position(); // 把这条完整消息存入新的 ByteBuffer ByteBuffer target = ByteBuffer.allocate(length); // 从 source 读,向 target 写 for (int j = 0; j < length; j++) { target.put(source.get()); } debugAll(target); } } source.compact(); // 0123456789abcdef position 16 limit 16 } } ``` + **客户端代码** ```java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * Created by lilinchao * Date 2022/6/3 * Description 1.0 */ public class MessageBorderClient { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n")); System.in.read(); } } ``` + **输出结果** ``` 11:50:04 [DEBUG] [main] c.l.n.b.MessageBorderServer - sscKey:sun.nio.ch.SelectionKeyImpl@7dc36524 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@7dc36524 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51861] 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - scKey:sun.nio.ch.SelectionKeyImpl@27f674d 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d +--------+-------------------- all ------------------------+----------------+ position: [21], limit: [21] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef| |00000010| 33 33 33 33 0a |3333. | +--------+-------------------------------------------------+----------------+ ``` ### 四、bytebuffer大小分配 * 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer * ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer * 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 [http://tutorials.jenkov.com/java-performance/resizable-array.html](http://tutorials.jenkov.com/java-performance/resizable-array.html) * 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗 *附参考文章:* *《黑马程序员Netty教程》*
标签:
Netty
,
NIO
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2112.html
上一篇
13.Selector处理accept和read事件
下一篇
15.NIO Selector之处理write事件
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
ClickHouse
并发编程
Elastisearch
递归
Filter
散列
查找
Map
排序
哈希表
Scala
VUE
MyBatis-Plus
Spark Streaming
Kafka
gorm
Typora
Thymeleaf
Jenkins
Spring
LeetCode刷题
Java
Kibana
持有对象
序列化和反序列化
Jquery
Hbase
Nacos
并发线程
Quartz
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞