Netty编程模型介绍
目录
一 Netty 简介与核心概念
二 服务端(Server)开发结构
1 ServerBootstrap
2 EventLoopGroup
3 ChannelInitializer 与 ChannelPipeline
4 常用 Handler
5 配置与启动流程
三 客户端(Client)开发结构
1 Bootstrap
2 EventLoopGroup
3 ChannelInitializer 与 Pipeline
4 配置与连接流程
四 Server 与 Client 的区别对比
五 通信流程概览
六 聊天室示例
1 项目结构
2 服务端代码详解
2.1 Netty server
2.2 Server handler
3 客户端代码详解
3.1 Netty client
3.1 Client handler
4 公共代码
4.1 工具类
4.2 编解码器
4.3 传输协议
5 运行与测试
七 常见配置与优化建议
在高性能网络通信领域,Netty 以其高吞吐、低延迟、极佳的可扩展性,成为 Java 生态中广泛使用的异步事件驱动框架。对于初学者而言,Netty 的底层细节与丰富组件可能显得复杂,但在生产环境中,我们并不需要一次性掌握所有原理。本文聚焦于代码结构与开发流程,帮助读者在短时间内完成基于 Netty 的服务端与客户端编码,并理解两者如何协同工作。文章后会以“聊天室”为例写一个demo,示例完整、可直接运行,即使对 Netty 完全陌生,也能借此快速构建起自己的网络应用骨架。
-
Netty应用非常广泛,典型Netty应用包括
-
Apache Flink
-
Apache Spark
-
Apache DolphinScheduler
-
Hera
-
Vert.x
-
一 Netty 简介与核心概念
-
异步、事件驱动 Netty 的核心思想是基于 Reactor 模式,通过非阻塞 I/O(NIO)与事件回调,实现高并发、少线程上下文切换的通信模型。
-
Bootstrap 核心启动类
-
服务端使用
ServerBootstrap
-
客户端使用
Bootstrap
-
-
EventLoopGroup 管理一组
EventLoop
(事件循环线程),负责接收、读写、触发各种 I/O 事件。通常至少包含两个组:-
服务端:
bossGroup
(接收连接)、workerGroup
(处理读写) -
客户端:单个
group
即可
-
-
Channel 与 ChannelPipeline
-
Channel
:抽象的网络连接(TCP 连接、UDP 通道等)。 -
ChannelPipeline
:一条由多个ChannelHandler
(处理器)组成的拦截链,用于对进出数据进行编解码、处理等。
-
-
ChannelHandler
-
ChannelInboundHandler
:处理入站(接收)事件,如数据读取、连接建立等。 -
ChannelOutboundHandler
:处理出站(写出)事件,如数据编码、发送等。
-
以上组件共同构成了 Netty 编程模型的主体。接下来分别介绍服务端与客户端的典型代码结构。
二 服务端(Server)开发结构
1 ServerBootstrap
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChatServerInitializer());ChannelFuture future = serverBootstrap.bind(8000).sync();System.out.println("Chat server started at port 8000");future.channel().closeFuture().sync(); } finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully(); }
-
.group(bossGroup, workerGroup)
:指定两个线程组,一个接收连接,一个处理读写。 -
.channel(NioServerSocketChannel.class)
:指定底层通道类型为 NIO。 -
.childHandler(...)
:为每个新连接初始化ChannelPipeline
。
2 EventLoopGroup
-
bossGroup:负责监听客户端连接请求,将连接注册给 workerGroup。
-
workerGroup:负责对已建立连接的数据读写与业务处理。
3 ChannelInitializer 与 ChannelPipeline
ChatServerInitializer
负责设置管道,示例:
public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new PacketDecoder());pipeline.addLast("encoder",new PacketEncoder());pipeline.addLast(new LoginRequestHandler());pipeline.addLast(new ChatMessageHandler());pipeline.addLast(new ExceptionHandler());} }
-
PacketDecoder/PacketEncoder:负责将字节流与业务对象(POJO)相互转换。
-
LoginRequestHandler:处理登录请求,将用户与 Channel 建立映射。
-
ChatMessageHandler:接收聊天消息并转发给其他客户端。
-
ExceptionHandler:捕获并记录管道中抛出的异常,防止连接因未捕获异常关闭。
4 常用 Handler
-
SimpleChannelInboundHandler<T>
泛型指定处理的消息类型,框架在调用后会自动释放 ByteBuf。 -
自定义心跳、空闲检测 可在管道中添加
IdleStateHandler
,并配合ChannelInboundHandler
处理超时。
5 配置与启动流程
-
定义协议:设计消息包格式,如魔数、版本、序列化算法、指令类型、长度、数据体等。
-
编写编解码器:根据协议实现
ByteToMessageDecoder
与MessageToByteEncoder
。 -
业务 Handler:分层处理不同请求,保持单一职责。
-
ServerBootstrap 启动:指定监听端口、线程组、管道初始化器。
-
优雅退出:
shutdownGracefully()
可确保已接任务执行完成后再退出。
三 客户端(Client)开发结构
1 Bootstrap
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChatClientInitializer());ChannelFuture future = bootstrap.connect(host, port).sync();Channel channel = future.channel();// 发送登录请求channel.writeAndFlush(loginRequestPacket);// 阻塞关闭channel.closeFuture().sync(); } finally {group.shutdownGracefully(); }
2 EventLoopGroup
客户端只需一个线程组,负责连接、读写。
3 ChannelInitializer 与 Pipeline
public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new PacketDecoder());pipeline.addLast("encoder",new PacketEncoder());pipeline.addLast(new LoginResponseHandler());pipeline.addLast(new ChatMessageResponseHandler());pipeline.addLast(new ExceptionHandler());} }
4 配置与连接流程
-
设计与服务端一致的协议。
-
编解码器复用,协议保持统一。
-
连接建立:
bootstrap.connect()
,并使用sync()
等待连接完成。 -
发送与接收:通过
channel.writeAndFlush()
发送消息,由ChannelInboundHandler
处理响应。 -
关闭与清理:
closeFuture().sync()
阻塞等待对端关闭,随后优雅停机。
四 Server 与 Client 的区别对比
Server | Client | |
---|---|---|
Bootstrap | ServerBootstrap | Bootstrap |
Channel | NioServerSocketChannel | NioSocketChannel |
ThreadGroup | 两组:bossGroup(接收)+ workerGroup(处理) | 一组:workerGroup(连接 + 处理) |
绑定/连接 | .bind(port) | .connect(host, port) |
Pipeline | childHandler() | handler() |
用途 | 被动接受客户端请求,管理多个连接 | 主动发起与服务端的连接,通常一对一或少量连接 |
五 通信流程概览
-
服务端启动:
ServerBootstrap
创建ServerSocketChannel
并监听指定端口,bossGroup
接收连接并注册给workerGroup
。 -
客户端连接:
Bootstrap.connect()
和远端建立 TCP 连接,生成SocketChannel
,并初始化管道。 -
协议握手:客户端发送登录/认证包,服务端响应。
-
数据读写:
-
出站:
channel.writeAndFlush(msg)
,数据经过ChannelPipeline
的出站Encoder
,并由底层 NIO 写出。 -
入站:底层读取 TCP 字节,分派给入站
Decoder
,再传递给业务Handler
。
-
-
异常与超时:通过
ExceptionHandler
与IdleStateHandler
进行捕获和处理。 -
关闭:任意一端调用
channel.close()
,触发closeFuture
,并释放资源;最后调用shutdownGracefully()
停止线程组。
六 聊天室示例
下面以一个简洁的“命令行聊天室”为例,展示完整服务端与客户端代码结构,帮助读者“照抄即用”。
1 项目结构
2 服务端代码详解
2.1 Netty server
// 1 Netty Server启动类 package com.caesar.chat.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class ChatServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChatServerInitializer());ChannelFuture future = serverBootstrap.bind(8000).sync();System.out.println("Chat server started at port 8000");future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} } // 2 Server端处理链 package com.caesar.chat.server; import com.caesar.chat.codec.PacketDecoder; import com.caesar.chat.codec.PacketEncoder; import com.caesar.chat.handler.ChatMessageHandler; import com.caesar.chat.handler.ExceptionHandler; import com.caesar.chat.handler.LoginRequestHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.channel.ChannelPipeline; public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new PacketDecoder());pipeline.addLast("encoder",new PacketEncoder());pipeline.addLast(new LoginRequestHandler());pipeline.addLast(new ChatMessageHandler());pipeline.addLast(new ExceptionHandler());} }
2.2 Server handler
// 1 登陆处理器 package com.caesar.chat.handler; import com.caesar.chat.SessionUtil; import com.caesar.chat.message.LoginRequestPacket; import com.caesar.chat.message.LoginResponsePacket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket msg) {System.out.println(String.format("LoginRequestHandler => Client request Login,Info: username=%s, password=%s",msg.getUsername(),msg.getPassword()));SessionUtil.bindSession(ctx.channel());LoginResponsePacket responsePacket = new LoginResponsePacket(true, "Login success");ctx.channel().writeAndFlush(responsePacket);} } // 2 消息处理器 package com.caesar.chat.handler; import com.caesar.chat.SessionUtil; import com.caesar.chat.message.ChatMessagePacket; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatMessageHandler extends SimpleChannelInboundHandler<ChatMessagePacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatMessagePacket msg) {msg.setFromUser(ctx.channel().remoteAddress().toString());for (Channel channel : SessionUtil.getAllChannels()) {if (channel != ctx.channel()) {channel.writeAndFlush(msg);}}} } // 3 异常处理器 package com.caesar.chat.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ExceptionHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }
3 客户端代码详解
3.1 Netty client
// 1 客户端A package com.caesar.chat.client; import com.caesar.chat.message.ChatMessagePacket; import com.caesar.chat.message.LoginRequestPacket; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; public class ChatClientA {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChatClientInitializer()); Channel channel = bootstrap.connect("localhost", 8000).sync().channel(); // 登录消息LoginRequestPacket loginPacket = new LoginRequestPacket();loginPacket.setUsername("gawyn");loginPacket.setPassword("123456");channel.writeAndFlush(loginPacket); // 发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String line = scanner.nextLine();channel.writeAndFlush(new ChatMessagePacket(line));}} finally {workerGroup.shutdownGracefully();}} } // 2 客户端B package com.caesar.chat.client; import com.caesar.chat.message.ChatMessagePacket; import com.caesar.chat.message.LoginRequestPacket; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; public class ChatClientB {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChatClientInitializer()); Channel channel = bootstrap.connect("localhost", 8000).sync().channel(); // 登录消息LoginRequestPacket loginPacket = new LoginRequestPacket();loginPacket.setUsername("caesar");loginPacket.setPassword("123456");channel.writeAndFlush(loginPacket); // 发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String line = scanner.nextLine();channel.writeAndFlush(new ChatMessagePacket(line));}} finally {workerGroup.shutdownGracefully();}} } // 3 客户单处理链 package com.caesar.chat.client; import com.caesar.chat.codec.PacketDecoder; import com.caesar.chat.codec.PacketEncoder; import com.caesar.chat.handler.ChatMessageResponseHandler; import com.caesar.chat.handler.ExceptionHandler; import com.caesar.chat.handler.LoginResponseHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new PacketDecoder());pipeline.addLast("encoder",new PacketEncoder());pipeline.addLast(new LoginResponseHandler());pipeline.addLast(new ChatMessageResponseHandler());pipeline.addLast(new ExceptionHandler());} }
3.1 Client handler
// 1 登陆处理器 package com.caesar.chat.handler; import com.caesar.chat.message.LoginResponsePacket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket msg) {System.out.println(String.format("LoginResponseHandler => Reciver Master Message: success=%s, reason=%s",msg.isSuccess(),msg.getReason()));System.out.println(msg.isSuccess() ? "Login success" : "Login failed: " + msg.getReason());} } // 2 消息处理器 package com.caesar.chat.handler; import com.caesar.chat.message.ChatMessagePacket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatMessageResponseHandler extends SimpleChannelInboundHandler<ChatMessagePacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatMessagePacket msg) {System.out.println(msg.getFromUser() + ": " + msg.getMessage());} } // 3 异常处理器,同Server package com.caesar.chat.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ExceptionHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }
4 公共代码
4.1 工具类
package com.caesar.chat; public interface Command {Byte LOGIN_REQUEST = 1;Byte LOGIN_RESPONSE = 2;Byte MESSAGE_REQUEST = 3;Byte MESSAGE_RESPONSE = 4; } package com.caesar.chat; import io.netty.channel.Channel; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class SessionUtil {private static final Map<Channel,String> channels = new ConcurrentHashMap<>(); public static void bindSession(Channel channel) {channels.put(channel,"login-in");} public static void unbindSession(Channel channel) {channels.remove(channel);} public static Set<Channel> getAllChannels() {return channels.keySet();} }
4.2 编解码器
package com.caesar.chat.codec; import com.caesar.chat.Command; import com.caesar.chat.message.ChatMessagePacket; import com.caesar.chat.message.LoginRequestPacket; import com.caesar.chat.message.LoginResponsePacket; import com.caesar.chat.message.Packet; import com.alibaba.fastjson.JSON; public class PacketCodec {public static final PacketCodec INSTANCE = new PacketCodec(); public byte[] encode(Packet packet) {return JSON.toJSONBytes(packet);} public Packet decode(byte command, byte[] bytes) { if(command == Command.LOGIN_REQUEST){return JSON.parseObject(bytes, LoginRequestPacket.class);}if(command == Command.LOGIN_RESPONSE){return JSON.parseObject(bytes, LoginResponsePacket.class);}if(command == Command.MESSAGE_REQUEST){return JSON.parseObject(bytes, ChatMessagePacket.class);}if(command == Command.MESSAGE_RESPONSE){return JSON.parseObject(bytes, ChatMessagePacket.class);} return null; } } package com.caesar.chat.codec; import com.caesar.chat.message.Packet; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class PacketDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 10) return; // 不够基础长度 in.markReaderIndex(); int magic = in.readInt();if (magic != 0x12345678) {ctx.close();return;} in.readByte(); // versionbyte command = in.readByte();int length = in.readInt(); if (in.readableBytes() < length) {in.resetReaderIndex();return;} byte[] bytes = new byte[length];in.readBytes(bytes); Packet packet = PacketCodec.INSTANCE.decode(command, bytes);if (packet != null) {out.add(packet);}} } package com.caesar.chat.codec; import com.caesar.chat.message.Packet; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class PacketEncoder extends MessageToByteEncoder<Packet> {@Overrideprotected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) throws Exception {byte[] data = PacketCodec.INSTANCE.encode(packet); out.writeInt(0x12345678); // magicout.writeByte(1); // versionout.writeByte(packet.getCommand());out.writeInt(data.length);out.writeBytes(data);} }
4.3 传输协议
package com.caesar.chat.message; public abstract class Packet {public abstract Byte getCommand(); } package com.caesar.chat.message; import com.caesar.chat.Command; public class ChatMessagePacket extends Packet {private String fromUser;private String message; public ChatMessagePacket() {}public ChatMessagePacket(String message) { this.message = message; } public String getFromUser() { return fromUser; }public void setFromUser(String fromUser) { this.fromUser = fromUser; } public String getMessage() { return message; }public void setMessage(String message) { this.message = message; } @Overridepublic Byte getCommand() {return Command.MESSAGE_REQUEST;} } package com.caesar.chat.message; import com.caesar.chat.Command; public class LoginRequestPacket extends Packet {private String username;private String password; // getters and setterspublic String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getPassword() { return password; }public void setPassword(String password) { this.password = password; } @Overridepublic Byte getCommand() {return Command.LOGIN_REQUEST;} } package com.caesar.chat.message; import com.caesar.chat.Command; public class LoginResponsePacket extends Packet {private boolean success;private String reason; public LoginResponsePacket() {}public LoginResponsePacket(boolean success, String reason) {this.success = success;this.reason = reason;} public void setSuccess(boolean success) {this.success = success;} public void setReason(String reason) {this.reason = reason;} public boolean isSuccess() { return success; }public String getReason() { return reason; } @Overridepublic Byte getCommand() {return Command.LOGIN_RESPONSE;} }
5 运行与测试
-
启动服务端:运行
ChatServer.main
,控制台显示 “ChatServer 启动,端口:8000”。 -
启动客户端:可在多台或多进程中运行
ChatClient.main
,输入用户名、密码后登录。 -
聊天测试:输入文本并回车,即可在所有客户端之间广播消息。
七 常见配置与优化建议
-
线程数
-
根据硬件与业务量,调整
NioEventLoopGroup
线程数,避免线程过多导致上下文切换。
-
-
TCP 参数
-
SO_BACKLOG
控制半连接队列长度; -
SO_KEEPALIVE
启用心跳; -
TCP_NODELAY
关闭 Nagle 算法,减小延迟。
-
-
内存管理
-
使用
PooledByteBufAllocator
复用内存块,减少 GC 开销。
-
-
IdleStateHandler
-
检测空闲连接,及时清理。
-
-
序列化方案
-
可选 JSON、Protobuf、Hessian 等,根据性能与可维护性取舍。
-
-
SSL/TLS
-
若需安全通信,使用 Netty 的
SslHandler
添加到管道最前端。
-
本文从代码结构与开发流程出发,详细介绍了 Netty 服务端与客户端的骨架搭建、核心组件的职责分工,以及二者在启动、连接、通信、关闭阶段的协作方式。最后通过“命令行聊天室”示例,提供了可直接运行的生产级代码,让读者在不深入底层原理的情况下,也能快速上手 Netty 开发。