Springboot集成Netty
集成Netty实现和设备的TCP通讯
:::success
根据本篇文章实现通用启动方式
高效的netty通讯
:::
定义父类Server
不管启动什么类型的长连接,都通过继承父类Server来启动
该父类中创建了统一的启动和停止服务的方法,子类只需要重写 initialize()方法即可
package cn.com.dyl.server;import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:15* @describe*/@SuppressWarnings({"rawtypes", "AssignmentUsedAsCondition"})
@Slf4j
public abstract class Server {//判断是否启动protected boolean isRunning;//netty配置类protected final NettyConfig config;//负责处理连接protected EventLoopGroup bossGroup;//负责处理channel通道的I/O事件protected EventLoopGroup workerGroup;//多线程业务组protected ExecutorService businessGroup;/*** 构造方法** @param config 配置类*/protected Server(NettyConfig config) {this.config = config;}protected abstract AbstractBootstrap initialize();/*** 启动方法** @return 启动结果*/public synchronized boolean start() {if (isRunning) {log.warn("==={}已经启动,port:{}===", config.name, config.port);return isRunning;}AbstractBootstrap bootstrap = initialize();ChannelFuture future = bootstrap.bind(config.port).awaitUninterruptibly();future.channel().closeFuture().addListener(f -> {if (isRunning) {stop();}});if (future.cause() != null) {log.error("启动失败", future.cause());}if (isRunning = future.isSuccess()) {log.warn("==={}启动成功,port:{}===", config.name, config.port);}return isRunning;}/*** 暂停方法*/public synchronized void stop() {//赋值启动标志isRunning = false;//关闭bossGroup.shutdownGracefully();//释放资源if (workerGroup != null) {workerGroup.shutdownGracefully();}if (businessGroup != null) {businessGroup.shutdown();}//输出日志log.warn("==={}已经停止,port:{}===", config.name, config.port);}
}
定义netty的配置类
所有启动时所需要的处理器编解码器都在这里进行赋值
该类中通过定义启动时的启动标志来启动指定的子类服务
也可以给子类服务定义心跳检测机制
package cn.com.dyl.server;import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.webServer.WebMessageHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.ObjectUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:15* @describe*/
public class NettyConfig {protected final int workerCore;//worker数量protected final int businessCore;//business数量protected final int readerIdleTime;//读卡器空闲时间protected final int writerIdleTime;//写入程序空闲时间protected final int allIdleTime;//所有空闲时间protected final Integer port;//端口号protected final Jtt1078Handler jtt1078Handler;//处理器protected final WebMessageHandler webMessageHandler;//web端处理器protected final Server server;//服务入口类protected final String name;//服务名称public NettyConfig(int workerGroup,int businessGroup,int readerIdleTime,int writerIdleTime,int allIdleTime,Integer port,Jtt1078Handler jtt1078Handler,WebMessageHandler webMessageHandler,String name,boolean enableHttp,boolean enableWeb) {ObjectUtil.checkNotNull(port, "port");ObjectUtil.checkPositive(port, "port");//检查是否严格为正int processors = NettyRuntime.availableProcessors();this.workerCore = workerGroup > 0 ? workerGroup : processors + 2;this.businessCore = businessGroup > 0 ? businessGroup : Math.max(1, processors >> 1);this.readerIdleTime = readerIdleTime;this.writerIdleTime = writerIdleTime;this.allIdleTime = allIdleTime;this.port = port;this.jtt1078Handler = jtt1078Handler;this.webMessageHandler = webMessageHandler;//http服务传输标志if (enableHttp) {this.name = name != null ? name : "mediaHttp";this.server = new NettyServer(this);} else if (enableWeb) {this.name = name != null ? name : "webServer";this.server = new AudioServer(this);} else {this.name = name != null ? name : "jtt1078";this.server = new MediaServer(this);}}public Server build() {return server;}public static NettyConfig.Builder custom() {return new Builder();}@NoArgsConstructor@Accessors(chain = true)@Datapublic static class Builder {private int workerCore;private int businessCore;private int readerIdleTime = 240;private int writerIdleTime = 0;private int allIdleTime = 0;private Integer port;private Jtt1078Handler jtt1078Handler;private WebMessageHandler webMessageHandler;//web端处理器private String name;private boolean enableHttp;private boolean enableWeb;public Builder setThreadGroup(int workerCore, int businessCore) {this.workerCore = workerCore;this.businessCore = businessCore;return this;}public Builder setIdleStateTime(int readerIdleTime, int writerIdleTime, int allIdleTime) {this.readerIdleTime = readerIdleTime;this.writerIdleTime = writerIdleTime;this.allIdleTime = allIdleTime;return this;}/*** 服务创建** @return 创建的服务*/public Server build() {return new NettyConfig(this.workerCore,this.businessCore,this.readerIdleTime,this.writerIdleTime,this.allIdleTime,this.port,this.jtt1078Handler,this.webMessageHandler,this.name,this.enableHttp,this.enableWeb).build();}}
}
定义子类启动服务
基于netty的TCP协议,用于接收终端设备的数据
在该类中可以定义编解码器和处理器等
package cn.com.dyl.server;import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.netty.Jtt1078MessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:21* @describe*/
@SuppressWarnings({"rawtypes", "NullableProblems"})
public class MediaServer extends Server {/*** 构造方法** @param config 配置类*/public MediaServer(NettyConfig config) {super(config);}/*** 重写初始化方法** @return 返回初始化后的数据*/@Overrideprotected AbstractBootstrap initialize() {//负责处理连接bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//负责处理channel通道的I/O事件workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//增加业务逻辑处理线程组DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "MediaServerCodecThread_" + this.threadIndex.incrementAndGet());}});//线程数量if (config.businessCore > 0) {businessGroup = new ThreadPoolExecutor(config.businessCore,config.businessCore,1L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new DefaultThreadFactory(config.name + "-B",true,Thread.NORM_PRIORITY));}//和bossGroup一一对应return new ServerBootstrap()//设置核心线程组和业务工作线程组.group(bossGroup, workerGroup)//设置nio类型的channel.channel(NioServerSocketChannel.class)//快速复用端口,避免端口冲突,原理tcp连接需要2ML时间单位回收,这个配置加快进度.option(NioChannelOption.SO_REUSEADDR, true).option(NioChannelOption.SO_BACKLOG, 102400)//设置低延迟.childOption(NioChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<SocketChannel>() {private final HandlerWrapper handler = new HandlerWrapper(config.jtt1078Handler);@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline().addLast(new Jtt1078MessageDecoder()).addLast(defaultEventExecutorGroup, "handler", handler);}});}
}
定义启动配置文件
再该类中定义启动的入口
package cn.com.dyl.config;import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.server.NettyConfig;
import cn.com.dyl.server.Server;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 14:14* @describe*/
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "server.enable", havingValue = "true")
public class JTConfig {private final Jtt1078Handler jtt1078Handler;public JTConfig(Jtt1078Handler jtt1078Handler) {this.jtt1078Handler = jtt1078Handler;}@ConditionalOnProperty(value = "server.media.port")@Bean(initMethod = "start", destroyMethod = "stop")public Server jtmediaServer(@Value("${server.media.port}") int port) {return NettyConfig.custom().setPort(port).setName("1078音视频服务").setJtt1078Handler(jtt1078Handler).build();}
集成Netty实现和前端进行webScoket通讯
定义websocket子类
package cn.com.dyl.server;import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.webServer.HeartBeatHandler;
import cn.com.dyl.webServer.WebMessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2024/1/23 10:20* @describe*/
@SuppressWarnings("rawtypes")
public class AudioServer extends Server {/*** 构造方法** @param config 配置类*/protected AudioServer(NettyConfig config) {super(config);}@Overrideprotected AbstractBootstrap initialize() {//负责处理连接bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//负责处理channel通道的I/O事件workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),new DefaultThreadFactory(config.name,Thread.MAX_PRIORITY));//增强也认为逻辑处理线程组DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "AudioServerCodecThread_" + this.threadIndex.incrementAndGet());}});//线程数量if (config.businessCore > 0) {businessGroup = new ThreadPoolExecutor(config.businessCore,config.businessCore,1L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new DefaultThreadFactory(config.name + "-B",true,Thread.NORM_PRIORITY));}return new ServerBootstrap()//设置核心线程组和业务工作线程组.group(bossGroup, workerGroup)//设置nio类型的channel.channel(NioServerSocketChannel.class)//通过TCP/IP方式进行传输//快速复用端口,避免端口冲入,原理tcp连接需要2ML时间单位回收,这个;配置加快进度.option(NioChannelOption.SO_REUSEADDR, true).option(NioChannelOption.SO_BACKLOG, 102400)//设置低延迟.childOption(NioChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<SocketChannel>() {private final HandlerWrapper handler = new HandlerWrapper(config.webMessageHandler);@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline().addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(1024*64)).addLast(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime))//自定义的空闲状态监测的handler.addLast(new HeartBeatHandler()).addLast(new WebMessageDecoder()).addLast(defaultEventExecutorGroup, "handler", handler).addLast(new WebSocketServerProtocolHandler("/audio"));}});}
}
websocket的重点
websocket的重点在于首次请求连接时使用的http方式,而之后才是webscoket连接
对于http和websocket进行判断并分开处理
package cn.com.dyl.netty;import cn.com.dyl.threads.SessionManager;
import cn.com.dyl.utils.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;/*** @author 荣家燕* @company 达友利科技(廊坊)有限公司* @create 2023/12/6 15:20* @describe*/
@ChannelHandler.Sharable
@AllArgsConstructor
@Slf4j
public class HandlerWrapper extends ChannelInboundHandlerAdapter {//基础消息处理器private final JttMessageHandler handler;private static WebSocketServerHandshaker handShaker;//握手构造工厂//基础消息处理器@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel nettyChannel = ctx.channel();//获取netty的网络传输通道//判断是否是websocketif (msg instanceof WebSocketFrame) {WebSocketFrame frame = (WebSocketFrame) msg;handleWebSocketFrame(nettyChannel, frame);//判断是否是http请求(websocket建立连接是http请求,需要握手响应)} else if (msg instanceof FullHttpRequest) {FullHttpRequest httpRequest = (FullHttpRequest) msg;handleHttpRequest(nettyChannel, httpRequest);//如果都不是那就符合1078 的音视频数据} else {Packet packet = (Packet) msg;handler.channelRead(nettyChannel, packet);}}/*** 终端连接过来后的处理** @param ctx 上下文*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("<<<<< web服务连接{}", ctx.channel().remoteAddress());}/*** 用来监听客户端是否断开** @param ctx 通信管道* @throws Exception 异常*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);handler.release(ctx.channel());//如果断开就释放}/*** netty异常处理机制(完成异常处理后要释放并关闭所有的流)** @param ctx 通信管道* @param cause 异常数据*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();handler.release(ctx.channel());ctx.close();}/*** 进行心跳检测,用户超时长时间未操作则会触发** @param ctx netty通道* @param evt 心跳机制*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {//客户端读超时String tag = SessionManager.get(ctx.channel(), "tag");log.info("web服务心跳超时: {}", tag);handler.release(ctx.channel());//释放}}}private void handleHttpRequest(Channel channel, FullHttpRequest request) {//如果Http解码失败,返回http异常if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {sendHttpResponse(channel, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}//构造握手响应返回,本机测试WebSocketServerHandshakerFactory handShakerFactory = new WebSocketServerHandshakerFactory("ws://localhost:1011/audio", null, false);handShaker = handShakerFactory.newHandshaker(request);if (handShaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);} else {handShaker.handshake(channel, request);}}/*** websocket处理** @param channel 通道* @param frame 数据*/private void handleWebSocketFrame(Channel channel, WebSocketFrame frame) {//判断是否是关闭链路的指令if (frame instanceof CloseWebSocketFrame) {handShaker.close(channel, (CloseWebSocketFrame) frame.retain());return;}//判断是否是Ping消息if (frame instanceof PingWebSocketFrame) {channel.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));return;}//本例程仅支持文本消息,不支持二进制消息if (frame instanceof TextWebSocketFrame) {//返回应答消息String request = ((TextWebSocketFrame) frame).text();channel.writeAndFlush(new TextWebSocketFrame(request + ",欢迎使用Netty WebSocket服务,现在时刻:" + LocalDateTime.now()));}//如果是二进制消息if (frame instanceof BinaryWebSocketFrame){ByteBuf content = frame.content();log.info("收到二进制消息"+content);}}/*** 发送http响应** @param channel http通道* @param req http请求* @param res http响应*/private static void sendHttpResponse(Channel channel, FullHttpRequest req, FullHttpResponse res) {//返回应答给客户端if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);res.content().writeBytes(buf);buf.release();setContentLength(res, res.content().readableBytes());}//如果非keep-Alive,关闭连接ChannelFuture future = channel.writeAndFlush(res);if (!isKeepAlive(req) || res.status().code() != 200) {future.addListener(ChannelFutureListener.CLOSE);}}
}