当前位置: 首页 > news >正文

Springboot集成Netty

集成Netty实现和设备的TCP通讯

:::success
根据本篇文章实现通用启动方式

高效的netty通讯

:::

定义父类Server

不管启动什么类型的长连接,都通过继承父类Server来启动

该父类中创建了统一的启动和停止服务的方法,子类只需要重写 initialize()方法即可

package cn.com.dyl.server;import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:15* @describe*/@SuppressWarnings({"rawtypes", "AssignmentUsedAsCondition"})
@Slf4j
public abstract class Server {//判断是否启动protected boolean isRunning;//netty配置类protected final NettyConfig config;//负责处理连接protected EventLoopGroup bossGroup;//负责处理channel通道的I/O事件protected EventLoopGroup workerGroup;//多线程业务组protected ExecutorService businessGroup;/*** 构造方法** @param config 配置类*/protected Server(NettyConfig config) {this.config = config;}protected abstract AbstractBootstrap initialize();/*** 启动方法** @return 启动结果*/public synchronized boolean start() {if (isRunning) {log.warn("==={}已经启动,port:{}===", config.name, config.port);return isRunning;}AbstractBootstrap bootstrap = initialize();ChannelFuture future = bootstrap.bind(config.port).awaitUninterruptibly();future.channel().closeFuture().addListener(f -> {if (isRunning) {stop();}});if (future.cause() != null) {log.error("启动失败", future.cause());}if (isRunning = future.isSuccess()) {log.warn("==={}启动成功,port:{}===", config.name, config.port);}return isRunning;}/*** 暂停方法*/public synchronized void stop() {//赋值启动标志isRunning = false;//关闭bossGroup.shutdownGracefully();//释放资源if (workerGroup != null) {workerGroup.shutdownGracefully();}if (businessGroup != null) {businessGroup.shutdown();}//输出日志log.warn("==={}已经停止,port:{}===", config.name, config.port);}
}

定义netty的配置类

所有启动时所需要的处理器编解码器都在这里进行赋值

该类中通过定义启动时的启动标志来启动指定的子类服务

也可以给子类服务定义心跳检测机制

package cn.com.dyl.server;import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.webServer.WebMessageHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.ObjectUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:15* @describe*/
public class NettyConfig {protected final int workerCore;//worker数量protected final int businessCore;//business数量protected final int readerIdleTime;//读卡器空闲时间protected final int writerIdleTime;//写入程序空闲时间protected final int allIdleTime;//所有空闲时间protected final Integer port;//端口号protected final Jtt1078Handler jtt1078Handler;//处理器protected final WebMessageHandler webMessageHandler;//web端处理器protected final Server server;//服务入口类protected final String name;//服务名称public NettyConfig(int workerGroup,int businessGroup,int readerIdleTime,int writerIdleTime,int allIdleTime,Integer port,Jtt1078Handler jtt1078Handler,WebMessageHandler webMessageHandler,String name,boolean enableHttp,boolean enableWeb) {ObjectUtil.checkNotNull(port, "port");ObjectUtil.checkPositive(port, "port");//检查是否严格为正int processors = NettyRuntime.availableProcessors();this.workerCore = workerGroup > 0 ? workerGroup : processors + 2;this.businessCore = businessGroup > 0 ? businessGroup : Math.max(1, processors >> 1);this.readerIdleTime = readerIdleTime;this.writerIdleTime = writerIdleTime;this.allIdleTime = allIdleTime;this.port = port;this.jtt1078Handler = jtt1078Handler;this.webMessageHandler = webMessageHandler;//http服务传输标志if (enableHttp) {this.name = name != null ? name : "mediaHttp";this.server = new NettyServer(this);} else if (enableWeb) {this.name = name != null ? name : "webServer";this.server = new AudioServer(this);} else {this.name = name != null ? name : "jtt1078";this.server = new MediaServer(this);}}public Server build() {return server;}public static NettyConfig.Builder custom() {return new Builder();}@NoArgsConstructor@Accessors(chain = true)@Datapublic static class Builder {private int workerCore;private int businessCore;private int readerIdleTime = 240;private int writerIdleTime = 0;private int allIdleTime = 0;private Integer port;private Jtt1078Handler jtt1078Handler;private WebMessageHandler webMessageHandler;//web端处理器private String name;private boolean enableHttp;private boolean enableWeb;public Builder setThreadGroup(int workerCore, int businessCore) {this.workerCore = workerCore;this.businessCore = businessCore;return this;}public Builder setIdleStateTime(int readerIdleTime, int writerIdleTime, int allIdleTime) {this.readerIdleTime = readerIdleTime;this.writerIdleTime = writerIdleTime;this.allIdleTime = allIdleTime;return this;}/*** 服务创建** @return 创建的服务*/public Server build() {return new NettyConfig(this.workerCore,this.businessCore,this.readerIdleTime,this.writerIdleTime,this.allIdleTime,this.port,this.jtt1078Handler,this.webMessageHandler,this.name,this.enableHttp,this.enableWeb).build();}}
}

定义子类启动服务

基于netty的TCP协议,用于接收终端设备的数据

在该类中可以定义编解码器和处理器等

package cn.com.dyl.server;import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.netty.Jtt1078MessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:21* @describe*/
@SuppressWarnings({"rawtypes", "NullableProblems"})
public class MediaServer extends Server {/*** 构造方法** @param config 配置类*/public MediaServer(NettyConfig config) {super(config);}/*** 重写初始化方法** @return 返回初始化后的数据*/@Overrideprotected AbstractBootstrap initialize() {//负责处理连接bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//负责处理channel通道的I/O事件workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//增加业务逻辑处理线程组DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "MediaServerCodecThread_" + this.threadIndex.incrementAndGet());}});//线程数量if (config.businessCore > 0) {businessGroup = new ThreadPoolExecutor(config.businessCore,config.businessCore,1L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new DefaultThreadFactory(config.name + "-B",true,Thread.NORM_PRIORITY));}//和bossGroup一一对应return new ServerBootstrap()//设置核心线程组和业务工作线程组.group(bossGroup, workerGroup)//设置nio类型的channel.channel(NioServerSocketChannel.class)//快速复用端口,避免端口冲突,原理tcp连接需要2ML时间单位回收,这个配置加快进度.option(NioChannelOption.SO_REUSEADDR, true).option(NioChannelOption.SO_BACKLOG, 102400)//设置低延迟.childOption(NioChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<SocketChannel>() {private final HandlerWrapper handler = new HandlerWrapper(config.jtt1078Handler);@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline().addLast(new Jtt1078MessageDecoder()).addLast(defaultEventExecutorGroup, "handler", handler);}});}
}

定义启动配置文件

再该类中定义启动的入口

package cn.com.dyl.config;import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.server.NettyConfig;
import cn.com.dyl.server.Server;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:14* @describe*/
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "server.enable", havingValue = "true")
public class JTConfig {private final Jtt1078Handler jtt1078Handler;public JTConfig(Jtt1078Handler jtt1078Handler) {this.jtt1078Handler = jtt1078Handler;}@ConditionalOnProperty(value = "server.media.port")@Bean(initMethod = "start", destroyMethod = "stop")public Server jtmediaServer(@Value("${server.media.port}") int port) {return NettyConfig.custom().setPort(port).setName("1078音视频服务").setJtt1078Handler(jtt1078Handler).build();}

集成Netty实现和前端进行webScoket通讯

定义websocket子类

package cn.com.dyl.server;import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.webServer.HeartBeatHandler;
import cn.com.dyl.webServer.WebMessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2024/1/23 10:20* @describe*/
@SuppressWarnings("rawtypes")
public class AudioServer extends Server {/*** 构造方法** @param config 配置类*/protected AudioServer(NettyConfig config) {super(config);}@Overrideprotected AbstractBootstrap initialize() {//负责处理连接bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//负责处理channel通道的I/O事件workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//增强也认为逻辑处理线程组DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "AudioServerCodecThread_" + this.threadIndex.incrementAndGet());}});//线程数量if (config.businessCore > 0) {businessGroup = new ThreadPoolExecutor(config.businessCore,config.businessCore,1L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new DefaultThreadFactory(config.name + "-B",true,Thread.NORM_PRIORITY));}return new ServerBootstrap()//设置核心线程组和业务工作线程组.group(bossGroup, workerGroup)//设置nio类型的channel.channel(NioServerSocketChannel.class)//通过TCP/IP方式进行传输//快速复用端口,避免端口冲入,原理tcp连接需要2ML时间单位回收,这个;配置加快进度.option(NioChannelOption.SO_REUSEADDR, true).option(NioChannelOption.SO_BACKLOG, 102400)//设置低延迟.childOption(NioChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<SocketChannel>() {private final HandlerWrapper handler = new HandlerWrapper(config.webMessageHandler);@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline().addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(1024*64)).addLast(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime))//自定义的空闲状态监测的handler.addLast(new HeartBeatHandler()).addLast(new WebMessageDecoder()).addLast(defaultEventExecutorGroup, "handler", handler).addLast(new WebSocketServerProtocolHandler("/audio"));}});}
}

websocket的重点

websocket的重点在于首次请求连接时使用的http方式,而之后才是webscoket连接

对于http和websocket进行判断并分开处理

package cn.com.dyl.netty;import cn.com.dyl.threads.SessionManager;
import cn.com.dyl.utils.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 15:20* @describe*/
@ChannelHandler.Sharable
@AllArgsConstructor
@Slf4j
public class HandlerWrapper extends ChannelInboundHandlerAdapter {//基础消息处理器private final JttMessageHandler handler;private static WebSocketServerHandshaker handShaker;//握手构造工厂//基础消息处理器@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel nettyChannel = ctx.channel();//获取netty的网络传输通道//判断是否是websocketif (msg instanceof WebSocketFrame) {WebSocketFrame frame = (WebSocketFrame) msg;handleWebSocketFrame(nettyChannel, frame);//判断是否是http请求(websocket建立连接是http请求,需要握手响应)} else if (msg instanceof FullHttpRequest) {FullHttpRequest httpRequest = (FullHttpRequest) msg;handleHttpRequest(nettyChannel, httpRequest);//如果都不是那就符合1078 的音视频数据} else {Packet packet = (Packet) msg;handler.channelRead(nettyChannel, packet);}}/*** 终端连接过来后的处理** @param ctx 上下文*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("<<<<< web服务连接{}", ctx.channel().remoteAddress());}/*** 用来监听客户端是否断开** @param ctx 通信管道* @throws Exception 异常*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);handler.release(ctx.channel());//如果断开就释放}/*** netty异常处理机制(完成异常处理后要释放并关闭所有的流)** @param ctx   通信管道* @param cause 异常数据*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();handler.release(ctx.channel());ctx.close();}/*** 进行心跳检测,用户超时长时间未操作则会触发** @param ctx netty通道* @param evt 心跳机制*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {//客户端读超时String tag = SessionManager.get(ctx.channel(), "tag");log.info("web服务心跳超时: {}", tag);handler.release(ctx.channel());//释放}}}private void handleHttpRequest(Channel channel, FullHttpRequest request) {//如果Http解码失败,返回http异常if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {sendHttpResponse(channel, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}//构造握手响应返回,本机测试WebSocketServerHandshakerFactory handShakerFactory = new WebSocketServerHandshakerFactory("ws://localhost:1011/audio", null, false);handShaker = handShakerFactory.newHandshaker(request);if (handShaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);} else {handShaker.handshake(channel, request);}}/*** websocket处理** @param channel 通道* @param frame   数据*/private void handleWebSocketFrame(Channel channel, WebSocketFrame frame) {//判断是否是关闭链路的指令if (frame instanceof CloseWebSocketFrame) {handShaker.close(channel, (CloseWebSocketFrame) frame.retain());return;}//判断是否是Ping消息if (frame instanceof PingWebSocketFrame) {channel.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));return;}//本例程仅支持文本消息,不支持二进制消息if (frame instanceof TextWebSocketFrame) {//返回应答消息String request = ((TextWebSocketFrame) frame).text();channel.writeAndFlush(new TextWebSocketFrame(request + ",欢迎使用Netty WebSocket服务,现在时刻:" + LocalDateTime.now()));}//如果是二进制消息if (frame instanceof BinaryWebSocketFrame){ByteBuf content = frame.content();log.info("收到二进制消息"+content);}}/*** 发送http响应** @param channel http通道* @param req     http请求* @param res     http响应*/private static void sendHttpResponse(Channel channel, FullHttpRequest req, FullHttpResponse res) {//返回应答给客户端if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);res.content().writeBytes(buf);buf.release();setContentLength(res, res.content().readableBytes());}//如果非keep-Alive,关闭连接ChannelFuture future = channel.writeAndFlush(res);if (!isKeepAlive(req) || res.status().code() != 200) {future.addListener(ChannelFutureListener.CLOSE);}}
}

文章转载自:

http://xeNRKhi0.LfLsq.cn
http://XXstpSXS.LfLsq.cn
http://Tp82aOvJ.LfLsq.cn
http://F7TB6cHf.LfLsq.cn
http://jBET9OgI.LfLsq.cn
http://2oZwtyv0.LfLsq.cn
http://jTy4Gpa0.LfLsq.cn
http://gsZynUCv.LfLsq.cn
http://1dZ4KcFN.LfLsq.cn
http://Lm9lC2dP.LfLsq.cn
http://VQfZsX7D.LfLsq.cn
http://PXU1ANG8.LfLsq.cn
http://L4MI2jpx.LfLsq.cn
http://qymgjfZI.LfLsq.cn
http://3P2ERO8q.LfLsq.cn
http://4za9kwuq.LfLsq.cn
http://pi6DgLgc.LfLsq.cn
http://1yo5YRyG.LfLsq.cn
http://fXaqTmEs.LfLsq.cn
http://LDeVdGaY.LfLsq.cn
http://ZPg54LkO.LfLsq.cn
http://zZAMJLGZ.LfLsq.cn
http://v6PrNbQ9.LfLsq.cn
http://qxeLS9DN.LfLsq.cn
http://ynoK4biX.LfLsq.cn
http://vVwQQ5qw.LfLsq.cn
http://Elt0UFqP.LfLsq.cn
http://L9F4V4t2.LfLsq.cn
http://QZy9yFnq.LfLsq.cn
http://Z64eS09K.LfLsq.cn
http://www.dtcms.com/a/367441.html

相关文章:

  • 系统代理开启时,钉钉页面加载失败
  • 基于STM32的除臭杀菌等多功能智能健康鞋柜设计
  • 在 PyCharm 里怎么“点鼠标”完成指令同样的运行操作
  • 学习PaddlePaddle--环境配置-PyCharm + Conda​
  • 彻底搞懂面向对象分析(OOA)
  • 遇享会—金湾读书会—第四期—你好,陌生人——20250823
  • Drawdb与cpolar:数据库设计的远程协作解决方案
  • 【CS32L015C8T6】配置单片机时基TimeBase(内附完整代码及注释)
  • 深度剖析 DC - DC 转换器在新能源汽车中的关键应用
  • 【RNN-LSTM-GRU】第二篇 序列模型原理深度剖析:从RNN到LSTM与GRU
  • Scikit-learn Python机器学习 - 特征预处理 - 归一化 (Normalization):MinMaxScaler
  • [光学原理与应用-386]:ZEMAX -1064nm SESAM光纤种子源设计,需要学习哪些光学理论和工程知识?
  • @Autowired原理(四)
  • Mongo的增删改查
  • 裸签、Attach、Detach及其验签方式
  • 「数据获取」中国科技统计年鉴(1991-2024)Excel
  • 无人机防风技术难点解析
  • 【Unity知识分享】Unity接入dll调用Window系统接口
  • 异地多活架构:从“机房炸了”到“用户无感”的逆袭之路
  • 【系统架构设计(16)】软件架构设计二:软件架构风格:构建系统的设计模式与选择指南
  • 树形组件,支持搜索展示,自定义展示,支持vue2,vue3,小程序等等
  • 去中心化投票系统开发教程
  • Eclipse 常用搜索功能汇总
  • go面试题-什么是用户态和内核态
  • C++语言编程规范-常量
  • windows线程注入
  • LeetCode 48 - 旋转图像算法详解(全网最优雅的Java算法
  • ResNet(残差网络)-彻底改变深度神经网络的训练方式
  • Docker多阶段构建Maven项目
  • 山姆·奥特曼 (Sam Altman) 分享提高工作效率的方法