【Netty+WebSocket详解】WebSocket全双工通信与Netty的高效结合与实战
一、 Netty网络框架、WebSocket协议基础
1.1 Netty网络框架介绍
1.2 WebSocket简介
1.3 WebSocket握手流程
二、为什么选择Netty+WebSocket?
三、Netty+WebSocket与Spring WebSocket
3.1 架构层级对比
3.2 核心组件差异
3.3 协议支持深度
3.4 性能基准测试
3.5 开发效率对比
3.6 典型应用场景
3.7 集成生态对比编辑
四、基于Netty+WebSocket构建高性能实时通信系统
4.1 技术选型
4.2 pom.xml核心依赖
4.3 Netty服务的启动类(服务器)
4.4 WebSocket初始化器
4.5 心跳处理类
4.6 消息处理器
4.7 Vue2 WebSocket连接核心代码
五、总结
一、 Netty网络框架、WebSocket协议基础
1.1 Netty网络框架介绍
参考我的另一篇文章:
1.2 WebSocket简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议,解决了HTTP协议的半双工和轮询效率低的问题。其核心特点:
全双工:客户端和服务端可同时发送数据。
持久连接:一次握手后保持长连接。
低延迟:无需频繁建立连接。
1.3 WebSocket握手流程
客户端发起HTTP请求:包含Upgrade:websocket头。
服务端响应101状态码:表示协议切换成功。
数据帧传输:后续通信通过二进制帧(Frame)进行。
握手请求示例:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13
握手响应示例:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
二、为什么选择Netty+WebSocket?
在实时通信(IM)场景中,低延迟、高并发、长连接是核心需求。Netty作为高性能网络框架,结合WebSocket协议,能够完美满足这些要求:
全双工通信:WebSocket支持服务端主动推送,告别HTTP轮询
低延迟:Netty的Reactor线程模型+零拷贝技术,轻松支撑10万级并发
协议灵活:可扩展支持自定义二进制协议
本文将带你从零开始实现一个完整的IM系统,涵盖消息收发、用户在线状态管理、心跳检测等核心功能,并提供可运行的完整代码。
三、Netty+WebSocket与Spring WebSocket
3.1 架构层级对比
3.2 核心组件差异
Netty核心模块
// 典型Netty WebSocket配置
pipeline.addLast(new HttpServerCodec()); // HTTP编解码
pipeline.addLast(new HttpObjectAggregator(65536)); // 聚合HTTP请求
pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 协议升级
pipeline.addLast(new CustomFrameHandler()); // 自定义业务处理
Spring核心模块
// 典型Spring配置
@Configuration
@EnableWebSocket
public class Config implements WebSocketConfigurer {void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/ws");}
}
3.3 协议支持深度
3.4 性能基准测试
3.5 开发效率对比
3.6 典型应用场景
Netty更适合:
游戏服务器(需要自定义二进制协议)
金融级交易系统(微秒级延迟要求)
IoT设备接入(海量长连接管理)
Spring WebSocket更适合:
企业级消息通知系统
实时协作应用(如在线文档)
需要快速集成Spring Security的场景
3.7 集成生态对比
四、基于Netty+WebSocket构建高性能实时通信系统
4.1 技术选型
Java 8+、Netty 4.x、WebSocket协议、Maven
4.2 pom.xml核心依赖
<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version></dependency>
</dependencies>
4.3 Netty服务的启动类(服务器)
/*** ChatServer: Netty 服务的启动类(服务器)*/
public class ChatServer {public static void main(String[] args) throws Exception {// 定义主从线程组// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了EventLoopGroup bossGroup = new NioEventLoopGroup();// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 构建Netty服务器ServerBootstrap server = new ServerBootstrap(); // 服务的启动类server.group(bossGroup, workerGroup) // 把主从线程池组放入到启动类中.channel(NioServerSocketChannel.class) // 设置Nio的双向通道.childHandler(new WSServerInitializer()); // 设置处理器,用于处理workerGroup// 启动server,并且绑定端口号875,同时启动方式为"同步"ChannelFuture channelFuture = server.bind(875).sync();// 请求:http://127.0.0.1:875// 监听关闭的channelchannelFuture.channel().closeFuture().sync();} finally {// 优雅的关闭线程池组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
4.4 WebSocket初始化器
/*** 初始化器,channel注册后,会执行里面的相应的初始化方法*/
public class WSServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {// 通过SocketChannel获得对应的管道ChannelPipeline pipeline = channel.pipeline();/*** 通过管道,添加handler处理器*/// HttpServerCodec 是由netty自己提供的助手类,此处可以理解为管道中的拦截器// 当请求到服务端,我们需要进行做解码,相应到客户端做编码// websocket 基于http协议,所以需要有http的编解码器pipeline.addLast(new HttpServerCodec());// 添加对大数据流的支持pipeline.addLast(new ChunkedWriteHandler());// 对httpMessage进行聚合,聚合成为FullHttpRequest或FullHttpResponse// 几乎在netty的编程中,都会使用到此handlerpipeline.addLast(new HttpObjectAggregator(1024 * 64));// ==================== 以上是用于支持http协议相关的handler ====================// ==================== 增加心跳支持 start ====================// 针对客户端,如果在5小时没有向服务端发送读写心跳(ALL),则主动断开连接// 如果是读空闲或者写空间,不做任何处理(参数分别对应读空闲、写空闲、读写空闲秒数)pipeline.addLast(new IdleStateHandler(8,10,300 * 60));pipeline.addLast(new HeartBeatHandler());// ==================== 增加心跳支持 end ====================// ==================== 以下是用于支持websocket ====================/*** WebSocket 服务器处理的协议,用于指定给客户端连接的时候访问路由: /ws* 此Handler会帮我们处理一些比较复杂的繁重的操作* 会处理一些握手操作:handShaking(close,ping,pong)ping + pong = 心跳* 对于WebSocket来说,数据都是以frames(帧)进行传输的,不同的数据类型所对应的frames(帧)也都不同*/pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 添加自定义的助手类pipeline.addLast(new ChatHandler());}
}
4.5 心跳处理类
/*** 创建心跳助手类*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 判断evt是否是IdleStateEvent(空闲事件状态,包含 读空闲/写空闲/读写空闲)if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent)evt;if (event.state() == IdleState.READER_IDLE) {// System.out.println("进入读空闲...");} else if (event.state() == IdleState.WRITER_IDLE) {// System.out.println("进入写空闲...");} else if (event.state() == IdleState.ALL_IDLE) {Channel channel = ctx.channel();// 关闭无用的channel,以防资源浪费channel.close();}}}
}
4.6 消息处理器
/*** 创建自定义助手类*/
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 存储所有连接的Channelprivate static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void channelActive(ChannelHandlerContext ctx) {clients.add(ctx.channel());System.out.println("客户端连接: " + ctx.channel().id());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {clients.remove(ctx.channel());System.out.println("客户端断开: " + ctx.channel().id());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {String msg = frame.text();System.out.println("收到消息: " + msg);// 方案1:广播给所有客户端clients.writeAndFlush(new TextWebSocketFrame("[广播] " + msg));// 方案2:转发给特定客户端(示例逻辑)if (msg.startsWith("@")) {forwardToTargetClient(msg);}}private void forwardToTargetClient(String msg) {String targetId = msg.substring(1, msg.indexOf(" "));String content = msg.substring(msg.indexOf(" ") + 1);clients.stream().filter(ch -> ch.id().asShortText().equals(targetId)).findFirst().ifPresent(ch -> ch.writeAndFlush(new TextWebSocketFrame("[私信] " + content)));}/*** 发生异常并且捕获,移除channel*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {clients.remove(ctx.channel());ctx.close();cause.printStackTrace();}
}
4.7 Vue2 WebSocket连接核心代码
// src/utils/websocket.js
export default class SocketService {static instance = nullstatic get Instance() {if (!this.instance) {this.instance = new SocketService()}return this.instance}// 连接状态标记connected = falsews = null// 初始化连接connect() {if (!this.connected) {this.ws = new WebSocket('ws://localhost:875')this.ws.onopen = () => {this.connected = trueconsole.log('Netty连接成功')}this.ws.onmessage = (msg) => {console.log('收到消息:', msg.data)// 这里可以触发Vuex的action处理消息}this.ws.onclose = () => {this.connected = falseconsole.log('Netty连接关闭')}this.ws.onerror = (err) => {console.error('连接错误:', err)}}}// 发送消息send(data) {if (this.connected) {this.ws.send(JSON.stringify(data))} else {console.error('尚未建立连接')}}
}
<template><div><button @click="connect">连接Netty</button><button @click="sendTestMsg">发送测试消息</button><div v-for="(msg, index) in messages" :key="index">{{ msg }}</div></div>
</template><script>
import SocketService from '@/utils/websocket'export default {data() {return {messages: []}},methods: {connect() {SocketService.Instance.connect()},sendTestMsg() {SocketService.Instance.send({type: 'test',content: 'Hello Netty!',timestamp: Date.now()})}},mounted() {// 可选:组件加载时自动连接// this.connect()}
}
</script>
4.8 Spring WebSocket实现简单通信系统
参考我的文章:【Spring WebSocket详解】Spring WebSocket从入门到实战-CSDN博客
五、总结
本文通过Netty框架实现了基于WebSocket协议的简单通信系统,覆盖了从基础握手、消息处理到心跳检测的全流程。实际开发中还需考虑安全性(如WSS加密)、跨域处理、消息持久化等高级功能。如果你对上文有什么其他理解或者问题欢迎评论区留言讨论!