使用Netty解析WebSocket协议
首先创建一个自定义的WebSocket处理器
往这个处理器添加三个方法:建立连接、读取消息和断开连接。先上代码:
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ConcurrentHashMap;/*** @author: * @Desc:自定义WebSocket处理器* WebSocketFrame是Netty提供的WebSocket协议的消息帧,包含了WebSocket协议的消息类型和消息内容。* @create: 2025-10-01 10:36**/
@Slf4j
public class WebSocketInboundHandler extends SimpleChannelInboundHandler<WebSocketFrame> {// 存放WebSocket连接的Channel,用于向客户端发送消息private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();/*** 接收到消息时触发,处理WebSocket消息* @param channelHandlerContext* @param webSocketFrame* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {// 判断WebSocketFrame类型,如果是TextWebSocketFrame,则进行处理if (webSocketFrame instanceof TextWebSocketFrame) {//转为字符串类型String text = ((TextWebSocketFrame) webSocketFrame).text();log.info("WebSocket收到消息:{}", text);}}/*** 处理WebSocket连接建立,当有WebSocket客户端连接时触发* ChannelHandlerContext是Netty提供的上下文对象,通过它可以获取到Channel和ChannelPipeline相关信息* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//通过Context获取连接进来的ChannelChannel channel = ctx.channel();//获取连接进来的客户端idString id = channel.id().asLongText();log.info("有新客户端连接进来,id为:{}", id);//将Channel存入Map中channelMap.put(id, channel);}/*** 处理WebSocket连接断开,当有WebSocket客户端断开连接时触发* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//获取连接断开的客户端idString id = ctx.channel().id().asLongText();log.info("客户端连接断开,id为:{}", id);//从Map中移除ChannelchannelMap.remove(id);}
}
下面对这3个方法进行逐一解读
1、建立连接
触发时机:当有新的客户端连接进来时触发。
示例代码只是简单地获取客户端id,然后保存到ConcurrentHashMap(确保线程安全)。这里使用ChannelHandlerContext来获取Channel上下文。ChannelHandlerContext是Netty提供的上下文对象,通过它可以获取到Channel和ChannelPipeline相关信息。
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {//通过Context获取连接进来的ChannelChannel channel = ctx.channel();//获取连接进来的客户端idString id = channel.id().asLongText();log.info("有新客户端连接进来,id为:{}", id);//将Channel存入Map中channelMap.put(id, channel);
}
2、断开连接
触发时机:当有WebSocket客户端断开连接时触发。
方法所起的作用,当有客户端断开连接时,将该客户端从ConcurrentHashMap移除。
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//获取连接断开的客户端idString id = ctx.channel().id().asLongText();log.info("客户端连接断开,id为:{}", id);//从Map中移除ChannelchannelMap.remove(id);
}
3、读取消息
触发时机:接收到新消息时触发。
下面是文本消息的处理,通过WebSocketFrame获取消息。WebSocketFrame是Netty提供的WebSocket协议的消息帧,包含了WebSocket协议的消息类型和消息内容。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {// 判断WebSocketFrame类型,如果是TextWebSocketFrame,则进行处理if (webSocketFrame instanceof TextWebSocketFrame) {//转为字符串类型String text = ((TextWebSocketFrame) webSocketFrame).text();log.info("WebSocket收到消息:{}", text);}
}
然后创建一个自定义通道初始化器
引入上面自定义的WebSocket处理器
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;/*** @author: * @Desc:自定义通道初始化器* @create: 2025-10-01 10:04**/
public class MyChannelInit extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//取出channelPipelineChannelPipeline pipeline = socketChannel.pipeline();pipeline//添加处理HTTP GET请求.addLast(new HttpServerCodec())//添加处理HTTP POST请求,聚合Http消息,防止粘包.addLast(new HttpObjectAggregator(1048576))//添加处理WebSocket请求.addLast(new WebSocketServerProtocolHandler("/ws"))//添加自定义处理器.addLast(new WebSocketInboundHandler());}
}
上面代码中依次添加了HTTP GET请求,POST请求,WebSocket请求,自定义的WebSocket处理器,使用了Netty来进行解析。Netty内置了很多编码解码器,这里列举2个,简单解析一下:
1、HttpServerCodec:HTTP解码器,只能处理HTTP get请求,不能处理post请求;
2、HttpObjectAggregator:可处理Http POST请求,用于将多个Http消息聚合为完整的HTTP请求,它能够将HttpMessage和HttpContent聚合为FullHttpRequest或FullHttpResponse。
小tips:Netty中以Codec为后缀的类,一般都是编码解码器,既可以编码也可以解码。