李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
03.Netty搭建RPC框架
Leefs
2022-06-23 PM
1544℃
0条
[TOC] ### 一、概述 RPC的目的,让分布式或者微服务系统中不同服务之间的调用(远程调用)像本地调用一样简单,调用者感知不到远程调用的逻辑。 ### 二、目的 > 客户端向服务端发送请求,调用HelloService接口中的sayHello方法,最后服务端将结果返回给客户端 + **HelloService接口** ```java public interface HelloService { String sayHello(String name); } ``` + **HelloServiceImpl接口实现类** ```java public class HelloServiceImpl implements HelloService { @Override public String sayHello(String msg) { // int i = 1 / 0; return "你好, " + msg; } } ``` ### 三、代码实现 本次代码为了简化起见,在原来聊天功能的基础上新增 Rpc 请求和响应消息 + **修改Message** ```java @Data public abstract class Message implements Serializable { // 省略旧的代码 public static final int RPC_MESSAGE_TYPE_REQUEST = 101; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102; static { // ... messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } } ``` + **新增ServicesFactory类** ```java import cn.itcast.config.Config; import java.io.IOException; import java.io.InputStream; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * 服务器端的 service 获取 * 目的:根据接口名称找到具体的实现 */ public class ServicesFactory { static Properties properties; static Map
, Object> map = new ConcurrentHashMap<>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties")) { properties = new Properties(); properties.load(in); Set
names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service")) { Class> interfaceClass = Class.forName(name); Class> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError(e); } } public static
T getService(Class
interfaceClass) { return (T) map.get(interfaceClass); } } ``` + **application.properties配置信息** ```properties serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl ``` + **RPC请求消息RpcRequestMessageHandler** ```java import lombok.Getter; import lombok.ToString; /** * Created by lilinchao * Date 2022/6/21 * Description RPC请求消息 */ @Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { /** * 调用的接口全限定名,服务端根据它找到实现 */ private String interfaceName; /** * 调用接口中的方法名 */ private String methodName; /** * 方法返回类型 */ private Class> returnType; /** * 方法参数类型数组 */ private Class[] parameterTypes; /** * 方法参数值数组 */ private Object[] parameterValue; public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class> returnType, Class[] parameterTypes, Object[] parameterValue) { super.setSequenceId(sequenceId); this.interfaceName = interfaceName; this.methodName = methodName; this.returnType = returnType; this.parameterTypes = parameterTypes; this.parameterValue = parameterValue; } @Override public int getMessageType() { return RPC_MESSAGE_TYPE_REQUEST; } } ``` 想要远程调用一个方法,必须知道以**下五个信息** - 方法所在的全限定类名 - 方法名 - 方法返回值类型 - 方法参数类型 - 方法参数值 + **RPC响应消息RpcResponseMessage** ```java import lombok.Data; import lombok.ToString; /** * Created by lilinchao * Date 2022/6/21 * Description RPC响应消息 */ @Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { /** * 返回值 */ private Object returnValue; /** * 异常值 */ private Exception exceptionValue; @Override public int getMessageType() { return RPC_MESSAGE_TYPE_RESPONSE; } } ``` 响应消息中只需要获取**返回结果和异常值** + **服务端代码RpcServer** ```java import cn.itcast.protocol.MessageCodecSharable; import cn.itcast.protocol.ProcotolFrameDecoder; import cn.itcast.server.handler.RpcRequestMessageHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * Created by lilinchao * Date 2022/6/21 * Description RPC服务端 */ @Slf4j public class RpcServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // rpc 请求消息处理器 RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } } ``` + **RPC客户端代码** ```java import cn.itcast.message.RpcRequestMessage; import cn.itcast.protocol.MessageCodecSharable; import cn.itcast.protocol.ProcotolFrameDecoder; import cn.itcast.server.handler.RpcResponseMessageHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * Created by lilinchao * Date 2022/6/21 * Description RPC客户端 */ @Slf4j public class RpcClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // rpc 响应消息处理器,待实现 RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost", 8080).sync().channel(); channel.writeAndFlush(new RpcRequestMessage( 1, "cn.itcast.server.service.HelloService", "sayHello", String.class, new Class[]{String.class}, new Object[]{"张三"} )).addListener(promise -> { if(!promise.isSuccess()){ Throwable cause = promise.cause(); log.error("error",cause); } }); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error", e); } finally { group.shutdownGracefully(); } } } ``` + **RpcResponseMessageHandler** ```java @ChannelHandler.Sharable @Slf4j public class RpcResponseMessageHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}", msg); } } ``` + **RpcRequestMessageHandler** ```java import cn.itcast.message.RpcRequestMessage; import cn.itcast.message.RpcResponseMessage; import cn.itcast.server.service.HelloService; import cn.itcast.server.service.ServicesFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; /** * Created by lilinchao * Date 2022/6/21 * Description 服务器handler */ @Slf4j @ChannelHandler.Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message){ RpcResponseMessage response = new RpcResponseMessage(); // 设置返回值的属性 response.setSequenceId(message.getSequenceId()); try { //获取真正的实现对象 HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName())); //通过反射调用方法,并获取返回值 Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); //调用方法 Object invoke = method.invoke(service, message.getParameterValue()); //调用成功 response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); String msg = e.getCause().getMessage(); response.setExceptionValue(new Exception("远程调用出错:" + msg)); } //返回结果 ctx.writeAndFlush(response); } public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { RpcRequestMessage message = new RpcRequestMessage( 1, "cn.itcast.server.service.HelloService", "sayHello", String.class, new Class[]{String.class}, new Object[]{"张三"} ); /*HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); System.out.println(invoke);*/ } } ``` 远程调用方法主要是通过反射实现的。 **大致步骤如下:** - 通过**请求消息传入被调入方法的各个参数** - 通过**全限定接口名,在map中查询到对应的类并实例化对象** - 通过反射获取Method,并调用其invoke方法的**返回值,并放入响应消息中** - 若有**异常需要捕获,并放入响应消息中** **启动服务端可客户端代码** ### 四、代码改进 + **增加SequenceIdGenerator** ```java import java.util.concurrent.atomic.AtomicInteger; public abstract class SequenceIdGenerator { private static final AtomicInteger id = new AtomicInteger(); public static int nextId() { return id.incrementAndGet(); } } ``` + **改进客户端** ```java import cn.itcast.message.RpcRequestMessage; import cn.itcast.protocol.MessageCodecSharable; import cn.itcast.protocol.ProcotolFrameDecoder; import cn.itcast.protocol.SequenceIdGenerator; import cn.itcast.server.handler.RpcResponseMessageHandler; import cn.itcast.server.service.HelloService; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.concurrent.DefaultPromise; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Proxy; /** * Created by lilinchao * Date 2022/6/22 * Description 1.0 */ @Slf4j public class RpcClientManager { public static void main(String[] args) { HelloService service = getProxyService(HelloService.class); System.out.println(service.sayHello("zhangsan")); // System.out.println(service.sayHello("lisi")); // System.out.println(service.sayHello("wangwu")); } /** * 使用代理模式,帮助我们创建请求消息并发送 * @param serviceClass * @param
* @return */ public static
T getProxyService(Class
serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class>[] interfaces = new Class[]{serviceClass}; // 使用JDK代理,创建代理对象 sayHello "张三" Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { // 1. 将方法调用转换为 消息对象 int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage( sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); // 2. 将消息对象发送出去 getChannel().writeAndFlush(msg); // 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程 DefaultPromise
promise = new DefaultPromise<>(getChannel().eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); // promise.addListener(future -> { // // 线程 // }); // 4. 等待 promise 结果 promise.await(); if(promise.isSuccess()) { // 调用正常 return promise.getNow(); } else { // 调用失败 throw new RuntimeException(promise.cause()); } }); return (T) o; } private static Channel channel = null; private static final Object LOCK = new Object(); // 获取唯一的 channel 对象 public static Channel getChannel() { if (channel != null) { return channel; } synchronized (LOCK) { // t2 if (channel != null) { // t1 return channel; } initChannel(); return channel; } } // 初始化 channel 方法 private static void initChannel() { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost", 8080).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error", e); } } } ``` **获得Channel** - 建立连接,获取Channel的操作被封装到了`init`方法中,当连接断开时,通过`addListener`方**法异步关闭group** - 通过**单例模式**创建与获取Channel **远程调用方法** - 为了让方法的调用变得简洁明了,将`RpcRequestMessage`的**创建与发送过程通过JDK的动态代理来完成** - 通过返回的代理对象调用方法即可,**方法参数为被调用方法接口的Class类** **远程调用方法返回值获取** - 调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。**要在不同线程中进行返回值的传递,需要用到Promise** - 在`RpcResponseMessageHandler`中创建一个Map - Key为**SequenceId** - Value为对应的**Promise** - **主线程**的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。**需要使用await等待结果被放入Promise中**。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常 + **改进RpcResponseMessageHandler** ```java import cn.itcast.message.RpcResponseMessage; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Created by lilinchao * Date 2022/6/21 * Description 1.0 */ @Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler
{ // 序号 用来接收结果的 promise 对象 public static final Map
> PROMISES = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}", msg); // 拿到空的 promise Promise
promise = PROMISES.remove(msg.getSequenceId()); if (promise != null) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if(exceptionValue != null) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } } ``` **NIO线程**负责通过SequenceId**获取并移除(remove)**对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值 - 如果**没有异常信息**(ExceptionValue),就调用`promise.setSuccess(returnValue)`放入方法返回值 - 如果**有异常信息**,就调用`promise.setFailure(exception)`放入异常信息 **客户端运行结果** ``` 16:45:42 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xad16e128, L:/127.0.0.1:53451 - R:localhost/127.0.0.1:8080] ACTIVE 16:45:42 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xad16e128, L:/127.0.0.1:53451 - R:localhost/127.0.0.1:8080] WRITE: 228B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 01 01 65 00 00 00 01 ff 00 00 00 d4 |......e.........| |00000010| 7b 22 69 6e 74 65 72 66 61 63 65 4e 61 6d 65 22 |{"interfaceName"| |00000020| 3a 22 63 6e 2e 69 74 63 61 73 74 2e 73 65 72 76 |:"cn.itcast.serv| |00000030| 65 72 2e 73 65 72 76 69 63 65 2e 48 65 6c 6c 6f |er.service.Hello| |00000040| 53 65 72 76 69 63 65 22 2c 22 6d 65 74 68 6f 64 |Service","method| |00000050| 4e 61 6d 65 22 3a 22 73 61 79 48 65 6c 6c 6f 22 |Name":"sayHello"| |00000060| 2c 22 72 65 74 75 72 6e 54 79 70 65 22 3a 22 6a |,"returnType":"j| |00000070| 61 76 61 2e 6c 61 6e 67 2e 53 74 72 69 6e 67 22 |ava.lang.String"| |00000080| 2c 22 70 61 72 61 6d 65 74 65 72 54 79 70 65 73 |,"parameterTypes| |00000090| 22 3a 5b 22 6a 61 76 61 2e 6c 61 6e 67 2e 53 74 |":["java.lang.St| |000000a0| 72 69 6e 67 22 5d 2c 22 70 61 72 61 6d 65 74 65 |ring"],"paramete| |000000b0| 72 56 61 6c 75 65 22 3a 5b 22 7a 68 61 6e 67 73 |rValue":["zhangs| |000000c0| 61 6e 22 5d 2c 22 73 65 71 75 65 6e 63 65 49 64 |an"],"sequenceId| |000000d0| 22 3a 31 2c 22 6d 65 73 73 61 67 65 54 79 70 65 |":1,"messageType| |000000e0| 22 3a 30 7d |":0} | +--------+-------------------------------------------------+----------------+ 16:45:42 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xad16e128, L:/127.0.0.1:53451 - R:localhost/127.0.0.1:8080] FLUSH 16:45:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xad16e128, L:/127.0.0.1:53451 - R:localhost/127.0.0.1:8080] READ: 81B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 01 01 66 00 00 00 01 ff 00 00 00 41 |......f........A| |00000010| 7b 22 72 65 74 75 72 6e 56 61 6c 75 65 22 3a 22 |{"returnValue":"| |00000020| e4 bd a0 e5 a5 bd 2c 20 7a 68 61 6e 67 73 61 6e |......, zhangsan| |00000030| 22 2c 22 73 65 71 75 65 6e 63 65 49 64 22 3a 31 |","sequenceId":1| |00000040| 2c 22 6d 65 73 73 61 67 65 54 79 70 65 22 3a 30 |,"messageType":0| |00000050| 7d |} | +--------+-------------------------------------------------+----------------+ 16:45:43 [DEBUG] [nioEventLoopGroup-2-1] c.i.s.h.RpcResponseMessageHandler - RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, zhangsan, exceptionValue=null) 16:45:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xad16e128, L:/127.0.0.1:53451 - R:localhost/127.0.0.1:8080] READ COMPLETE 你好, zhangsan ``` *附参考文章链接:* *《黑马程序员Netty教程》* [*《Netty之RPC框架》*](https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80/#3%E3%80%81RPC%E6%A1%86%E6%9E%B6)
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2181.html
上一篇
02.Netty优化之参数调优
下一篇
04.Netty源码分析之启动流程分析
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Quartz
Golang
JavaWeb
VUE
算法
并发线程
Elastisearch
Scala
nginx
ClickHouse
Thymeleaf
Zookeeper
Linux
工具
持有对象
Flink
ajax
Nacos
Sentinel
递归
人工智能
Ubuntu
MyBatisX
Hive
Python
数据结构和算法
SpringBoot
微服务
线程池
Spring
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭