14-netty基础-手写rpc-提供方(服务端)-06
netty系列文章:
01-netty基础-socket |
02-netty基础-java四种IO模型 |
03-netty基础-多路复用select、poll、epoll |
04-netty基础-Reactor三种模型 |
05-netty基础-ByteBuf数据结构 |
06-netty基础-编码解码 |
07-netty基础-自定义编解码器 |
08-netty基础-自定义序列化和反序列化 |
09-netty基础-手写rpc-原理-01 |
10-netty基础-手写rpc-定义协议头-02 |
11-netty基础-手写rpc-支持多序列化协议-03 |
12-netty基础-手写rpc-编解码-04 |
13-netty基础-手写rpc-消费方生成代理-05 |
14-netty基础-手写rpc-提供方(服务端)-06 |
1 功能逻辑
在服务端启动的时候,在spring容器中bean已经被初始化好之后,拿到当前bean的信息,判断是否被BonnieRemoteService修饰,如果被修饰则获取到当前类下的所有Method,然后将这个Method信息缓存到容器中。以供后续rpc反射调用。
缓存容器使用Map, key:类的全路径+方法名称 value:Method即可
2、核心代码
spring容器中bean已经被初始化好,可以实现BeanPostProcessor接口中的postProcessAfterInitialization方法实现初始化后扩展功能
实现InitializingBean接口中afterPropertiesSet方法,将nettyServer的服务放在这块启动
2.1 收集BonnieRemoteService修饰的类
package com.bonnie.protocol.spring.service;import com.bonnie.protocol.annotation.BonnieRemoteService;
import com.bonnie.protocol.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;@Component
@Slf4j
public class SpringRpcProviderBean implements BeanPostProcessor, InitializingBean {private String serverAddress = "127.0.0.1";private Integer serverPort = 48081;// public SpringRpcProviderBean(String serverAddress, Integer serverPort) {
// this.serverAddress = serverAddress;
// this.serverPort = serverPort;
// }/*** bean初始化完成后,执行该逻辑* @param bean* @param beanName* @return* @throws BeansException*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {System.out.println("SpringRpcProviderBean===="+beanName);// 只要bean声明了BonnieRemoteService注解,则需要把该服务发布到网络上// 判断当前类上是否有注解BonnieRemoteServiceboolean flag = bean.getClass().isAnnotationPresent(BonnieRemoteService.class);if (flag) {Method[] declaredMethods = bean.getClass().getDeclaredMethods();for (Method method : declaredMethods) {String serviceName = bean.getClass().getInterfaces()[0].getName();String key = serviceName + "." + method.getName();BeanMethod beanMethod = new BeanMethod();beanMethod.setBean(bean);beanMethod.setMethod(method);// 缓存到map容器中Mediator.beanMethodMap.put(key, beanMethod);}}return bean;}@Overridepublic void afterPropertiesSet() throws Exception {log.info("启动Netty服务端======48081");new Thread(()->{new NettyServer(serverAddress, serverPort).startNettyServer();}).start();}
}
2.2 NettyServer实现
启动服务,设置编解码的方式
package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServer {private String serverAddress;private Integer serverPort;public NettyServer(String serverAddress, Integer serverPort) {this.serverAddress = serverAddress;this.serverPort = serverPort;}public void startNettyServer() {log.info("begin start Netty server");NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup work = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 长度域解码器.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 12, 4, 0, 0)).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcServerHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(this.serverAddress, this.serverPort).sync();log.info("Server started Success on serverAddress {} Port,{}",this.serverAddress, this.serverPort);channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();work.shutdownGracefully();}}}
2.3 NettyServer接收客户端请求数据
package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.spring.service.Mediator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);/*** 服务端接收客户端消息* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收到客户端的消息: requestId {} {}", requestId, JSONObject.toJSONString(msg));// 构建返回消息ReqResponseRpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<>();Header header = msg.getHeader();// 设置返回的消息类型header.setReqType(ReqTypeEnum.RESPONSE.getCode());// 通过调用获取到方法的返回数据Object result = Mediator.getInstance().processor(msg.getContent());RpcResponse rpcResponse = new RpcResponse();rpcResponse.setMsg("success");rpcResponse.setData(result);responseRpcProtocol.setHeader(header);responseRpcProtocol.setContent(rpcResponse);// 数据写入到客户端ctx.writeAndFlush(responseRpcProtocol);}}