当前位置: 首页 > news >正文

springboot netty 客户端网络编程入门与实战

Spring Boot3 Netty 客户端项目地址

https://gitee.com/supervol/loong-springboot-study

(记得给个start,感谢)

Netty 前提

        请先参考《springboot netty 服务端网络编程入门与实战》。

Netty 客户端概述

        Netty 客户端与服务端在核心组件上有诸多共通之处,但客户端更侧重于连接管理、消息发送、重连机制响应处理。在 Spring Boot 3 环境中,Netty 客户端需与 Spring 生命周期协同,同时应对网络波动、服务端下线等异常场景。

Netty 客户端核心

        与服务端相比,客户端组件更简洁,核心围绕 “连接建立” 和 “消息交互”:

组件作用说明客户端特有特性
Bootstrap客户端启动引导类,用于配置连接参数、绑定线程组和 Channel无需区分 Boss/Worker 线程组,通常使用一个 EventLoopGroup
EventLoopGroup管理 IO 线程,处理连接建立、消息读写等事件通常仅需一个线程组(服务端需两个),推荐线程数为 CPU 核心数
Channel网络连接的载体,客户端使用 NioSocketChannel(服务端用 NioServerSocketChannel需主动调用 connect() 建立连接,而非绑定端口
ChannelPipeline处理器链,负责编码发送消息、解码接收消息、处理业务逻辑编解码器需与服务端完全匹配(如服务端用 LengthFieldBasedFrameDecoder,客户端也需对应配置)
ChannelFuture异步操作结果的占位符,用于获取连接状态、消息发送结果客户端常用其监听连接成功 / 失

Netty 客户端示例

        请参考项目地址中 springboot-netty/springboot-netty-client 模块代码。

Netty 客户端整合

1. 依赖配置

        与服务端一致,需引入 Spring Boot 基础依赖和 Netty 核心依赖(版本由 Spring Boot 3 自动管理):

<dependencies><!-- Spring Boot 基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Netty 核心依赖 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- 日志依赖 --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></dependency>
</dependencies>

2. 核心配置类

        封装客户端启动、连接、关闭逻辑,核心是通过 Bootstrap 配置连接参数,并管理 EventLoopGroup 生命周期:

package com.example.netty.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class NettyClient {// 服务端地址和端口(从配置文件读取)@Value("${netty.server.host:localhost}")private String serverHost;@Value("${netty.server.port:8888}")private int serverPort;// 客户端线程组(仅需一个,处理连接和IO事件)private final EventLoopGroup group = new NioEventLoopGroup(4); // 4个线程// 与服务端的连接通道(全局唯一,需线程安全)private volatile Channel channel;/*** 初始化客户端并连接服务端*/public void start() {try {// 1. 创建 Bootstrap 并配置Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class) // 客户端 Channel 类型.remoteAddress(new InetSocketAddress(serverHost, serverPort)) // 服务端地址// TCP 参数配置(与服务端保持一致).option(ChannelOption.SO_KEEPALIVE, true) // 保持连接.option(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // 连接超时 3秒// 配置 Channel 初始化器(编解码器+业务处理器).handler(new NettyClientChannelInitializer());// 2. 异步连接服务端,并添加连接结果监听connect(bootstrap);} catch (Exception e) {log.error("Netty 客户端初始化失败", e);shutdown(); // 失败时关闭资源}}/*** 连接服务端(支持重连)*/private void connect(Bootstrap bootstrap) {ChannelFuture future = bootstrap.connect();// 监听连接结果future.addListener(f -> {if (f.isSuccess()) {// 连接成功,保存 Channel 实例channel = future.channel();log.info("成功连接到服务端:{}:{}", serverHost, serverPort);} else {// 连接失败,定时重试(1秒后重试)log.error("连接服务端失败,1秒后重试...", f.cause());future.channel().eventLoop().schedule(() -> connect(bootstrap), 1, TimeUnit.SECONDS);}});}/*** 发送消息到服务端(线程安全)* @param msg 消息内容* @return 是否发送成功*/public boolean sendMessage(String msg) {if (channel == null || !channel.isActive()) {log.warn("连接未建立,无法发送消息:{}", msg);return false;}// 异步发送消息,并添加监听ChannelFuture future = channel.writeAndFlush(msg);future.addListener(f -> {if (!f.isSuccess()) {log.error("消息发送失败:{}", msg, f.cause());}});return true;}/*** 关闭客户端(Spring 容器销毁时调用)*/@PreDestroypublic void shutdown() {if (channel != null && channel.isActive()) {channel.close(); // 关闭连接}group.shutdownGracefully(); // 优雅关闭线程组log.info("Netty 客户端已关闭");}//  getter 用于外部判断连接状态public boolean isConnected() {return channel != null && channel.isActive();}
}

3. 配置编解码器与处理器

        客户端的 ChannelPipeline 必须与服务端协议一致(否则会出现粘包拆包或解码失败)。以下示例使用 “长度字段协议”(与服务端对应):

package com.example.netty.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;import java.util.concurrent.TimeUnit;public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 1. 粘包拆包处理(与服务端完全一致)pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, // 最大帧长度 1MB0,           // 长度字段偏移量4,           // 长度字段占4字节0,           // 长度调整为04            // 跳过长度字段本身));pipeline.addLast(new LengthFieldPrepender(4)); // 发送消息时添加长度字段// 2. 日志处理器(调试用)pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));// 3. 字符串编解码器(与服务端一致)pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 4. 心跳检测(30秒写空闲则发送心跳)pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));// 5. 客户端业务处理器(处理服务端响应和连接事件)pipeline.addLast(new NettyClientBusinessHandler());}
}

4. 业务处理器

        客户端处理器需关注服务端消息接收、连接状态变化、心跳触发等场景:

package com.example.netty.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientBusinessHandler extends SimpleChannelInboundHandler<String> {// 心跳消息内容(与服务端约定)private static final String HEARTBEAT_MSG = "client-heartbeat";/*** 接收服务端消息(自动释放资源)*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {log.info("收到服务端消息:{}", msg);// 业务逻辑:根据服务端消息类型处理(如响应指令、推送通知等)handleServerMessage(ctx, msg);}/*** 连接建立成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("与服务端连接已建立:{}", ctx.channel().remoteAddress());// 连接成功后可发送认证消息(如token、客户端ID等)ctx.writeAndFlush("client-auth:123456");}/*** 连接断开时触发(触发重连)*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("与服务端连接已断开,等待重连...");// 触发重连(实际由 NettyClient 中的监听逻辑处理)super.channelInactive(ctx);}/*** 空闲事件触发(如写空闲则发送心跳)*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.WRITER_IDLE) {log.debug("触发写空闲,发送心跳");ctx.writeAndFlush(HEARTBEAT_MSG); // 发送心跳}} else {super.userEventTriggered(ctx, evt);}}/*** 发生异常时触发(如连接被强制关闭)*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("与服务端通信异常", cause);ctx.close(); // 关闭连接,触发重连}/*** 处理服务端消息的业务逻辑*/private void handleServerMessage(ChannelHandlerContext ctx, String msg) {if (msg.startsWith("server-command:")) {// 处理服务端指令String command = msg.substring("server-command:".length());log.info("执行服务端指令:{}", command);// 示例:回复指令执行结果ctx.writeAndFlush("command-result:" + command + "-done");} else if (msg.equals("heartbeat-reply")) {// 处理心跳响应log.debug("收到服务端心跳响应");}}
}

5. 客户端启动器

        通过 CommandLineRunner 在 Spring Boot 启动后触发客户端连接:

package com.example.netty.client;import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class NettyClientStarter implements CommandLineRunner {private final NettyClient nettyClient;@Overridepublic void run(String... args) throws Exception {// 启动 Netty 客户端(独立线程,避免阻塞 Spring 主线程)new Thread(nettyClient::start, "netty-client-thread").start();}
}

6. 配置文件

        在 application.yaml 中配置服务端地址、端口等参数:

netty:server:host: localhost  # 服务端IP地址port: 8888       # 服务端端口client:reconnect-interval: 1000  # 重连间隔(毫秒)heartbeat-interval: 30    # 心跳间隔(秒)

Netty 客户端高级

1. 重连机制优化

        基础重连逻辑(如上文 connect() 方法)可能存在 “无限重试” 问题,实际场景需优化:

  • 重试次数限制:超过最大次数后报警(如通过 Spring 事件通知)
  • 指数退避策略:重试间隔随失败次数增加而延长(如 1s→2s→4s→8s,上限 60s)
  • 网络检测:重连前检查服务端端口是否可达(避免无效重试)

        优化后的重连逻辑示例:

// 在 NettyClient 中添加重连参数
@Value("${netty.client.reconnect-interval:1000}")
private int baseReconnectInterval;
private int reconnectCount = 0;
private static final int MAX_RECONNECT_COUNT = 10; // 最大重试次数private void connect(Bootstrap bootstrap) {ChannelFuture future = bootstrap.connect();future.addListener(f -> {if (f.isSuccess()) {// 连接成功,重置重试计数reconnectCount = 0;channel = future.channel();log.info("成功连接到服务端:{}:{}", serverHost, serverPort);} else {reconnectCount++;// 超过最大次数,触发报警if (reconnectCount >= MAX_RECONNECT_COUNT) {log.error("已达最大重连次数({}次),请检查服务端状态", MAX_RECONNECT_COUNT);// 发送报警事件(如通过 Spring 的 ApplicationEventPublisher)return;}// 计算退避间隔(指数增长,最大60秒)long interval = Math.min(baseReconnectInterval * (1 << (reconnectCount - 1)), 60_000);log.error("第{}次连接失败,{}毫秒后重试...", reconnectCount, interval, f.cause());// 定时重试future.channel().eventLoop().schedule(() -> connect(bootstrap), interval, TimeUnit.MILLISECONDS);}});
}

2. 同步消息发送

        Netty 消息发送默认是异步的,若需 “发送消息后等待响应”(如 RPC 调用),可通过 Promise 实现同步阻塞:

import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;// 在 NettyClient 中添加同步发送方法
public String sendSyncMessage(String msg, long timeoutMillis) throws Exception {if (!isConnected()) {throw new IllegalStateException("连接未建立");}// 创建 Promise 用于等待响应Promise<String> promise = new DefaultPromise<>(channel.eventLoop());String requestId = "req-" + System.currentTimeMillis(); // 生成唯一请求ID// 存储请求(用于响应匹配)requestMap.put(requestId, promise); // 需定义线程安全的 Map<String, Promise>// 发送消息(格式:requestId|消息内容)channel.writeAndFlush(requestId + "|" + msg);// 等待响应(超时则抛出异常)return promise.get(timeoutMillis, TimeUnit.MILLISECONDS);
}// 在业务处理器中处理响应匹配
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {// 假设服务端响应格式:requestId|响应内容if (msg.contains("|")) {String[] parts = msg.split("\\|", 2);String requestId = parts[0];String response = parts[1];// 匹配并唤醒等待的 PromisePromise<String> promise = requestMap.remove(requestId);if (promise != null) {promise.setSuccess(response);}} else {// 处理非同步响应消息(如广播、通知)handleServerMessage(ctx, msg);}
}

3. SSL/TLS 加密通信

        为客户端与服务端通信添加加密层,需使用 SslHandler

  1. 生成证书:通过 keytool 生成客户端信任库(信任服务端证书)
  2. 配置 SslContext
// 在 ChannelInitializer 中添加 SSL 处理器(需在编解码器之前)
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 1. SSL 加密(最外层,先解密后解码,最后加密发送)SslContext sslContext = SslContext.newClientContext(SslProvider.JDK,NettyClient.class.getResourceAsStream("/client-truststore.jks"),"truststore-password" // 信任库密码);pipeline.addLast(sslContext.newHandler(ch.alloc(), serverHost, serverPort));// 后续添加粘包拆包处理器、编解码器等(与之前一致)// ...
}

4. 消息发送线程安全

        客户端可能被多个业务线程调用 sendMessage(),需确保线程安全:

  • Channel 本身是线程安全的,可被多线程并发调用 writeAndFlush()
  • 避免在业务线程中执行阻塞操作(如 sync()),防止线程池耗尽
  • 建议通过 Channel.eventLoop().execute() 将消息发送逻辑提交到 Netty 线程执行
// 线程安全的消息发送示例
public boolean sendMessageSafe(String msg) {if (!isConnected()) {log.warn("连接未建立,无法发送消息");return false;}// 提交到 Netty 线程执行,避免业务线程阻塞channel.eventLoop().execute(() -> {ChannelFuture future = channel.writeAndFlush(msg);future.addListener(f -> {if (!f.isSuccess()) {log.error("消息发送失败", f.cause());}});});return true;
}

Netty 客户端注意

1. 协议设计一致性

        客户端与服务端必须使用完全一致的协议,不一致会导致解码失败,表现为 “消息乱码” 或 “连接被强制关闭”:

  • 粘包拆包方案(如长度字段的字节数、偏移量)
  • 编解码方式(如字符串编码用 UTF-8,Protobuf 版本一致)
  • 消息格式(如请求 ID、消息类型、业务字段的顺序)

2. 资源释放与内存管理

  • 客户端关闭时必须调用 EventLoopGroup.shutdownGracefully(),否则会导致线程泄漏
  • 使用 PooledByteBufAllocator 优化内存分配(Netty 默认启用)
  • 避免在 channelRead0() 中长时间阻塞(应提交到业务线程池)

3. 连接状态监控

        通过 Spring 监控客户端状态,例如:

  • 暴露 isConnected() 方法给监控系统(如 Prometheus)
  • 连接状态变化时发送 Spring 事件(如 ClientConnectedEventClientDisconnectedEvent
  • 集成健康检查端点(/actuator/health):
@Component
public class NettyClientHealthIndicator implements HealthIndicator {private final NettyClient nettyClient;@Overridepublic Health health() {if (nettyClient.isConnected()) {return Health.up().withDetail("server", nettyClient.getServerHost() + ":" + nettyClient.getServerPort()).build();} else {return Health.down().withDetail("reason", "未连接到服务端").build();}}
}

4. 日志与调试

  • 开发环境启用 LoggingHandlerLogLevel.DEBUG),查看消息收发细节
  • 生产环境降低日志级别(LogLevel.INFO),避免性能损耗
  • 记录关键事件(连接成功 / 失败、消息发送失败、心跳超时)

总结

        Spring Boot 3 整合 Netty 客户端的核心是连接管理协议一致性。通过本文的实现方案,可构建一个具备重连机制、心跳检测、同步 / 异步消息发送能力的健壮客户端。关键要点包括:

  1. 客户端与服务端的编解码器、粘包拆包方案必须严格一致;
  2. 重连机制需结合指数退避和次数限制,避免无效重试;
  3. 通过 ChannelFuture 和 Promise 处理异步操作,区分同步 / 异步消息场景;
  4. 纳入 Spring 生命周期管理,确保资源优雅释放和状态可监控。

        在实际应用中,还需根据业务场景(如物联网设备通信、RPC 调用)优化协议设计和性能参数,进一步提升客户端的稳定性和效率。


文章转载自:

http://qdcjP9ou.gynLc.cn
http://GKbHCJN4.gynLc.cn
http://uLJtp7Dc.gynLc.cn
http://HMid5Wfg.gynLc.cn
http://rr6QwVFI.gynLc.cn
http://Dy1imHQY.gynLc.cn
http://w5jwiaov.gynLc.cn
http://HXc5xcsj.gynLc.cn
http://Pqd9Xown.gynLc.cn
http://CJA1FwEU.gynLc.cn
http://gvsHwl6c.gynLc.cn
http://7LGt7Cfc.gynLc.cn
http://NTqAkdma.gynLc.cn
http://LNCoZJKe.gynLc.cn
http://AJerzyge.gynLc.cn
http://yFLFsKND.gynLc.cn
http://XbP02m1X.gynLc.cn
http://xRzuXDF0.gynLc.cn
http://F3CT9lVM.gynLc.cn
http://drhqI0PU.gynLc.cn
http://BdotrZCL.gynLc.cn
http://BKKWo0cP.gynLc.cn
http://f4JlyqqD.gynLc.cn
http://oZa2D50K.gynLc.cn
http://diDuwJoU.gynLc.cn
http://Dg5SdQ4j.gynLc.cn
http://2rCum4oe.gynLc.cn
http://dH65gKjN.gynLc.cn
http://feZY3LVI.gynLc.cn
http://4d6cupqs.gynLc.cn
http://www.dtcms.com/a/385888.html

相关文章:

  • TCP/IP模型
  • 智慧用电安全管理系统的核心优势
  • flutter结合NestedScrollView+TabBar实现嵌套滚动
  • 基于定制开发开源AI智能名片S2B2C商城小程序的社群团购线上平台搭建研究
  • DEDECMS 小程序插件简介 2.0全新上线
  • 详解 Spring Boot 单元测试:@SpringBootTest 与 JUnit 依赖配置及环境注入
  • JMeter元件简介与JMeter测试计划
  • 陪诊小程序:让医疗关怀触手可及
  • n*n矩阵方程组Ax=b,使用Eigen矩阵库常用解法介绍
  • IvorySQL 4.6:DocumentDB+FerretDB 实现 MongoDB 兼容部署指南
  • UART,IIC,SPI总线(通信协议)
  • 记录一次小程序请求报错:600001
  • 光谱相机的新兴领域应用
  • GO学习记录十——发包
  • OpenLayers数据源集成 -- 章节十六:XML图层详解:OpenStreetMap数据的动态加载与智能样式渲染方案
  • vector 模拟实现 4 大痛点解析:从 memcpy 到模板嵌套的实战方案
  • tuple/dict/list 这三个数据类型在取值时候的区别
  • 用Python实现自动化的Web测试(Selenium)
  • Spring Boot 2.5.0 集成 Elasticsearch 7.12.0 实现 CRUD 完整指南(Windows 环境)
  • 第九章:使用Jmeter+Ant+Jenkins实现接口自动化测试持续集成
  • 使用IP的好处
  • 育碧确定《AC影》3月20日发售并分享系列游戏首发数据
  • 容器热升级机制在云服务器零停机部署中的实施规范
  • 贪心算法应用:时间序列分段(PAA)问题详解
  • 微信小程序开发教程(十五)
  • 语音DDS系统架构与实现方案:车机与手机语音助手的差异分析
  • 手机群控平台的工作效率
  • DBAPI免费版对比apiSQL免费版
  • node.js在vscode中npm等出现的一个问题
  • node.js学习笔记:中间件