当前位置: 首页 > news >正文

基于Netty构建Websocket服务端

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
在这里插入图片描述
引入pom依赖:

 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.69.Final</version>
 </dependency>
 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
 </dependency>

编写SocketServer:

package com.lzq.websocket.config;

import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {

    private static final Integer PORT = 8888;

    @Override
    public void run(String... args) throws Exception {
        new WebSocketConfig().start();
    }

    public void start() {
        // 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            // 最大数据长度
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // 添加接收websocket请求的url匹配路径
                            pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                            // 10秒内收不到消息强制断开连接
                            // pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(PORT).sync();
            log.info("websocket server started, port={}", PORT);
            // 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭
            // 阻塞
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("websocket server exception", e);
            throw new RuntimeException(e);
        } finally {
            log.info("websocket server close");
            // 关闭EventLoopGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写WebSocketHandler:

package com.lzq.websocket.handlers;

import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker webSocketServerHandshaker;
    private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 创建连接时执行
        NettyConfig.group.add(ctx.channel());
        log.info("client channel active, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 关闭连接时执行
        NettyConfig.group.remove(ctx.channel());
        log.info("client channel disconnected, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 服务端接收客户端发送过来的数据结束之后调用
        ctx.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            // 处理客户端http握手请求
            handlerHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 处理websocket连接业务
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 处理websocket连接业务
     *
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());
        // 判断是否是关闭websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
            return;
        }
        // 判断是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());
        }
        String text = ((TextWebSocketFrame) frame).text();
        if ("ping".equals(text)) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        log.info("WebSocket message received: {}", text);
        /**
         * 可通过客户传输的text,设计处理策略:
         * 如:text={"type": "messageHandler", "userId": "111"}
         * 服务端根据type,采用策略模式,自行派发处理
         *
         * 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),
         * Handler已经是线程处理,每个用户的请求是线程隔离的
         */
        // 返回WebSocket响应
        ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));
        /*// 群发
        TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()
                + ctx.channel().id()
                + " : "
                + text);
        NettyConfig.group.writeAndFlush(twsf);*/
    }

    /**
     * 处理客户端http握手请求
     *
     * @param ctx
     * @param request
     */
    private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());
        // 判断是否采用WebSocket协议
        if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
        webSocketServerHandshaker = wsFactory.newHandshaker(request);
        if (webSocketServerHandshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            webSocketServerHandshaker.handshake(ctx.channel(), request);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
        if (response.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        // 服务端向客户端发送数据
        ChannelFuture f = ctx.channel().writeAndFlush(response);
        if (response.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 非正常断开时调用
        log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);
        ctx.close();
    }
}

NettyConfig:

package com.lzq.websocket.config;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class NettyConfig {
    /**
     * 存储接入的客户端的channel对象
     */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

使用Apifox测试:
在这里插入图片描述

相关文章:

  • postman使用-03发送请求
  • 离线AI聊天清华大模型(ChatGLM3)本地搭建
  • 在CentOS 7上使用普通用户`minio`安装和配置MinIO
  • EasyExcel导出
  • golang并发安全-sync.map
  • 模型量化 | Pytorch的模型量化基础
  • 7. 结构型模式 - 代理模式
  • 贪心算法的运用
  • 算法的四大思想之一:动态规划
  • Nginx附-实战之负载均衡时怎么让请求不转发到一台正在启动的tomcat服务器上
  • MYSQL高级SQL语句
  • 嵌入式单片机的存储区域与堆和栈
  • 云原生机器学习平台cube-studio开源项目及代码简要介绍
  • 【QT】非常简单的登录界面实现
  • 同城配送小程序解决方案
  • 为什么react call api in cDidMount
  • SQL手工注入漏洞测试(MySQL数据库)
  • 【MySQL】:超详细MySQL完整安装和配置教程
  • 最新技术整理3款开源免费直播推流工具,实现实时视频推流、视频拉流,目标端可以是服务器、云平台、移动设备等(附源码)
  • C#与php自定义数据流传输
  • “80后”南京大学天文与空间科学学院教授施勇加盟西湖大学
  • 玉林一河段出现十年最大洪水,一村民被冲走遇难
  • 国际博物馆日|航海博物馆:穿梭于海洋神话与造船工艺间
  • 恒生银行回应裁员传闻:受影响的员工数目占银行核心业务员工总数约1%
  • 人民日报民生观:转人工客服,怎么这么难?
  • 体坛联播|C罗儿子完成国家队首秀,德约结束与穆雷合作