当前位置: 首页 > news >正文

SpringBoot集成Netty实现Ws和Tcp通信

        本教程将指导你如何在 Spring Boot 项目中集成 Netty,实现 WebSocket 和 TCP 通信。以下是详细的步骤和代码示例。  

环境准备

在 你的pom.xml 中添加 Netty 依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

Ws通信具体模块

1.初始服务端代码

import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class Init implements ApplicationRunner {

        public static void serverStart(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            try {
                serverBootstrap
                        .group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast();
                                pipeline.addLast(new HttpServerCodec());
                                pipeline.addLast(new ChunkedWriteHandler());
                                pipeline.addLast(new IdleStateHandler(12,12,12, TimeUnit.DAYS));
                                pipeline.addLast(new HttpObjectAggregator(1024*64));
                                pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                                pipeline.addLast(new MsgHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
                    if (channelFuture1.isSuccess()) {
                        log.info("Websocket启动成功,端口:{}", port);
                    }else {
                        log.warn("Websocket启动失败,端口:{}", port);
                    }
                });
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }finally {
                bossGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }

    @Override
    public void run(ApplicationArguments args)  {
        serverStart(7309);
    }
}

2.信息处理器

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class MsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
    private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx){
        channelGroups.add(ctx.channel());
        SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
        ChannelId id = ctx.channel().id();
        CHANNEL.put(id, ctx.channel());
        log.info("客服端:{} 上线了!",id);
        ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        cause.printStackTrace();
        ChannelId id = ctx.channel().id();
        CHANNEL.remove(id);
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 异常断开!",id);
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx){
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 断开连接!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();
    }


    @Override
    protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        if (!CHANNEL.containsKey(ctx.channel().id())) { CHANNEL.put(ctx.channel().id(), ctx.channel());}
        String msg = textWebSocketFrame.text();
        log.info("客服端:{} 发送消息:{}", ctx.channel().id(), msg );
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到您发送的信息:" + msg));
    }
}

3.测试用例 

测试案例
Ws测试用例

WebSocket测试网站http://wstool.js.org/

Tcp通信具体模块

1.初始服务端代码



import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Init implements ApplicationRunner {

    public static void serverStart(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap
                    .group(bossGroup,workerGroup)
                    // 添加通道设置非阻塞
                    .channel(NioServerSocketChannel.class)
                    // 服务端可连接队列数量
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 开启长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    // 流程处理
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new MsgHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
                if (channelFuture1.isSuccess()) {
                    log.info("TcpServer启动成功,端口:{}", port);
                }else {
                    log.error("TcpServer启动失败,端口:{}", port);
                }
            });
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally {
            bossGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    @Override
    public void run(ApplicationArguments args)  {
        serverStart(7311);
    }
}

2.信息处理器代码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class MsgHandler extends ChannelInboundHandlerAdapter  {

    public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
    private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx)  {
        channelGroups.add(ctx.channel());
        SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
        ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
        ChannelId id = ctx.channel().id();
        CHANNEL.put(id, ctx.channel());
        log.info("客服端:{} 上线了!",id);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 异常!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)  {
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 断开连接!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ByteBuf byteBuf) {
            // 将 ByteBuf 转换为字符串
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            log.info("客服端:{} 发送消息:{}", ctx.channel().id(), message);
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer("服务端收到您发送的信息:" + message, CharsetUtil.UTF_8));
        } else {
            log.info("客服端:{} 发送未知类型的消息:{}", ctx.channel().id(), msg);
        }
    }


}

相关文章:

  • 初探WebAssembly
  • 什么是组态软件?
  • Kotlin 类委托与属性委托
  • 图论-岛屿数量
  • 什么是分布式和微服务?
  • 第一章:6.差分+前缀和(一个区域整体添加一个数)
  • EVOAGENT: Towards Automatic Multi-Agent Generation via Evolutionary Algorithms
  • yolo初体验
  • 【Kubernets】K8S亲和性配置相关说明
  • (链表 删除链表的倒数第N个结点)leetcode 19
  • 【Elasticsearch】自定义内置的索引生命周期管理(ILM)策略。
  • 博客系统测试报告
  • 17. LangChain实战项目2——易速鲜花宣传文案批量生成并导出
  • 探秘基带算法:从原理到5G时代的通信变革【十】基带算法应用与对比
  • 【图像处理与OpenCV:技术栈、应用和实现】
  • 防火墙旁挂组网双机热备负载均衡
  • Storm 踩坑之路
  • Animate Anyone本地部署教程:AI动画生成技术的革命性突破
  • 网卡驱动接收数据----软中断处理数据----socket接收数据
  • 太阳同步轨道的进动速度解析
  • 印巴战火LIVE丨“快速接近战争状态”?印度袭击巴军事基地,巴启动反制军事行动
  • 中国金茂新任命三名副总裁,撤销区域公司
  • 中方是否认同俄方关于新纳粹主义观点?外交部:联大曾多次通过相关决议
  • 深圳下调公积金利率,209万纯公积金贷款总利息减少9.94万
  • 陕西澄城打造“中国樱桃第一县”:从黄土高原走向海外,年产值超30亿
  • 首家股份行旗下AIC来了,兴银金融资产投资有限公司获批筹建