Netty是怎么实现Java NIO多路复用的?(源码)
目录
- NIO多路复用实现
- 事件循环是什么?
- 核心源码
- (1)调用 NioEventLoopGroup 默认构造器
- (2)指定 SelectorProvider
- (3)创建 `Selector`
- (4)创建单线程和队列
- (5)单线程处理就绪IO事件
最近想再巩固一下NIO等多路复用的实现思路,本文通过Netty源码来进一步总结NIO多路复用的运用。
先上一组简单的NIO多路复用实现,
NIO多路复用实现
服务端通过selector组件轮询处理就绪IO事件,一个线程可以支持处理多个网络连接。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
private static final int PORT = 8080;
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) {
try {
// 1. 创建Selector
Selector selector = Selector.open();
// 2. 创建ServerSocketChannel并绑定端口
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false); // 非阻塞模式
// 3. 注册ServerSocketChannel到Selector,监听ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动,监听端口: " + PORT);
while (true) {
// 4. 阻塞等待就绪的事件(有新连接、数据可读等)
selector.select();
// 5. 获取所有就绪的事件集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 必须手动移除,防止重复处理
if (key.isAcceptable()) {
// 6. 处理ACCEPT事件:接受客户端连接
handleAcceptMethod(key, selector);
} else if (key.isReadable()) {
// 7. 处理READ事件:读取客户端数据
handleReadMethod(key);
}
}
}
} catch (IOException e) {
...
}
}
...
}
Netty 是基于Java NIO实现异步事件驱动的高性能网络应用框架,接下来结合Netty源码,看看在Netty的事件循环EventLoop中,是怎么实现NIO的多路复用的。
事件循环是什么?
(1)NioEventLoop
是事件循环核心组件,基于Java NIO实现。
职责:负责I/O事件处理,通过Selector
轮询Channel的读写事件(如accept、read、write),触发对应Handler执行。
作用:
- 每个
NioEventLoop
绑定一个独立线程,以串行无锁化方式处理所有注册到其上的Channel
,确保线程安全。 - 通过单线程处理多
Channel
的就绪事件,减少上下文切换,支撑高并发低延迟;
(2)NioEventLoopGroup
为事件循环组,主要是为NioEventLoop
提供线程资源调度和事件循环管理;
职责:每个 NioEventLoopGroup
管理一组 NioEventLoop
,统一管理组内所有NioEventLoop
的启动、任务执行、资源释放,通过next()
方法轮询分配 EventLoop,实现负载均衡。
其中,
- Boss Group:服务端专用,负责监听 TCP 连接(
accept
事件),将新连接分配给 Worker Group。 - Worker Group:处理已建立连接的 I/O 读写(
read/write
事件)及异步任务。
核心源码
(1)调用 NioEventLoopGroup 默认构造器
/**
* boss 线程组,用于服务端接受客户端的连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* worker 线程组,用于服务端接受客户端的数据读写
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
(2)指定 SelectorProvider
可以看到 Netty 帮我们传入了 SelectorProvider.provider()
,用于后续 NIO 创建 Selector
、Channel
等关键组件。
// io.netty.channel.nio.NioEventLoopGroup
// 传入 SelectorProvider.provider()
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 默认拒绝策略,直接排除异常 - RejectedExecutionHandlers.reject()
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
// 真正创建 EventLoop - 单线程
// 会在下面父类 MultithreadEventExecutorGroup 构造器中被调用
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
super(…) 进入父类 MultithreadEventLoopGroup 构造器,默认创建 2倍CPU核心个数 的 EventLoop。
// io.netty.channel.MultithreadEventLoopGroup
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
...
}
// 默认 EventLoop个数为 2倍CPU核心数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 构造器,开始初始化
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 新建 EventLoop 对象,在上述 NioEventLoopGroup 中实现了
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
(3)创建 Selector
回头来看 NioEventLoop
构造器,每个 NioEventLoop
都会创建并维护一个 Selector
。
Tips: Netty 对 Selector
做了优化, 即下面的 unwrappedSelector
,在初始化时,会调用 openSelector()
方法,强制替换 Selector
的 selectedKeys
为 SelectedSelectionKeySet
(内部是数组实现)。
// io.netty.channel.nio.NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 创建了两个任务队列,普通任务队列(taskQueue) 和 尾部任务队列(tailTaskQueue)
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
// 新建并维护 Selector 事件轮询器
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
// 默认使用 无锁高性能队列(如 MpscChunkedArrayQueue),特点是 多生产者单消费者(MPSC),避免多线程竞争。
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks);
}
// 普通任务队列(taskQueue):处理IO事件以及用户提交的任务 channel.eventLoop().execute(...)
// 尾部任务队列(tailTaskQueue):处理 低优先级或非紧急任务
(4)创建单线程和队列
接着构造,每个 NioEventLoop
会维护 任务队列以及一个线程用于任务执行和调度。
进入父类 -> SingleThreadEventLoop
构造器
// io.netty.channel.SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
// 传入刚刚创建的 普通任务队列
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 传入尾部任务队列
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
进入父类 -> SingleThreadEventExecutor 构造器,实现了execute() 方法进行任务调度,
// io.netty.util.concurrent.SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.threadLock = new CountDownLatch(1);
this.shutdownHooks = new LinkedHashSet();
this.state = 1;
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
// 任务队列,用于接收任务
this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
/**
核心执行任务逻辑
**/
private void execute(Runnable task, boolean immediate) {
// 检查是否在事件循环线程内
boolean inEventLoop = this.inEventLoop();
// 添加任务到 任务队列中
this.addTask(task);
// 若首次不在事件循环线程内,即首次提交任务,需要触发线程启动
if (!inEventLoop) {
// 启动线程
this.startThread();
if (this.isShutdown()) {
boolean reject = false;
try {
if (this.removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException var6) {
}
if (reject) {
reject();
}
}
}
if (!this.addTaskWakesUp && immediate) {
this.wakeup(inEventLoop);
}
}
/**
在这里 EventLoop 会启动一个线程, 用于执行任务
**/
private void startThread() {
if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
boolean success = false;
try {
this.doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, 2, 1);
}
}
}
}
/**
this.executor 默认是 ThreadPerTaskExecutor,该执行器为每个任务创建一个新线程,因此每个 NioEventLoop 都会绑定一个独立线程
**/
private void doStartThread() {
assert this.thread == null;
// ThreadPerTaskExecutor 会创建一个新的线程
this.executor.execute(new Runnable() {
public void run() {
...
// 执行事件循环 - NioEventLoop 具体实现 run()
SingleThreadEventExecutor.this.run();
...
}
});
}
(5)单线程处理就绪IO事件
回到 NioEventLoop,查看 run() 方法处理就绪的I/O事件
// 主要框架如下,
protected void run() {
for (;;) { // 无限循环
// 1. 轮询 I/O 事件(strategy = Selector.select)
// 2. 处理 I/O 事件(selector.selectedKeys() --> processSelectedKeys)
// 3. 执行任务队列(runAllTasks)
}
}
源码如下,
// io.netty.channel.nio.NioEventLoop
@Override
protected void run() {
// 记录连续无有效事件时的 select 调用次数(用于检测空轮询)
int selectCnt = 0;
for (;;) {
try {
// (1) 超时等待 I/O事件就绪 -> select
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 任务队列为空,超时等待 I/O 事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
// 出现异常,重置 select 计数
selectCnt = 0;
handleLoopException(e);
continue;
}
// (2) 执行IO事件以及执行任务
selectCnt++; // 递增 select 调用次数
cancelledKeys = 0;
needsToSelectAgain = false;
// 获取 I/O 时间占比(默认 50,即 50% 时间处理 I/O,50% 处理任务)
final int ioRatio = this.ioRatio;
boolean ranTasks;
// 根据 ioRatio 分配 I/O 处理与任务执行时间
if (ioRatio == 100) {// 全时处理 I/O,任务无时间限制
try {
if (strategy > 0) { // 有就绪I/O事件
processSelectedKeys();// 处理 I/O 事件(如 accept、read、write)
}
} finally {
// 执行所有任务(普通队列 + 尾部队列)
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // 限时处理IO事件
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();// 处理 I/O 事件(如 accept、read、write)
} finally {
// 限时执行所有任务(普通队列 + 尾部队列)
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {// 无 I/O 事件(strategy == 0),仅执行任务
ranTasks = runAllTasks(0);
}
// (3) 检测空轮询和异常唤醒
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// 正常情况:执行了任务或处理了 I/O 事件,重置计数
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // 异常唤醒(如未处理事件却唤醒)
selectCnt = 0; // 重置计数
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// 处理一些关闭逻辑...
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
/**
查询就绪I/O事件,调用 select 方法
**/
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
/**
处理就绪的IO事件,selector.selectedKeys() 获取就绪事件
**/
private void processSelectedKeys() {
// selectedKeys 为null说明Netty启用了优化模式,通过反射替换 JDK 原生的 Selector 实现,将 selectedKeys 替换为高性能的数组结构(SelectedSelectionKeySet),实现高效添加和遍历,避免Hash冲突的开销,默认开启优化模式
if (selectedKeys != null) {
// 优化模式,SelectedSelectionKeySet selectedKeys 数组存储就绪事件
processSelectedKeysOptimized();
} else {
// 普通模式
processSelectedKeysPlain(selector.selectedKeys());
}
}
// 非优化模式下的处理IO事件
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
// 遍历 selectedKeys 并执行相应Channel的事件
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
// run 方法已经设置 needsToSelectAgain = false
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();// 获取就绪的操作类型
// 处理连接就绪事件(客户端已建立连接)
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;// 移除 CONNECT 监听
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理写就绪事件(数据可写)
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush(); // Socket 缓冲区可写,强制刷新待发送数据
}
// 处理读就绪或 Accept 事件(数据可读/新连接)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); // 触发读取操作(如读取数据或 Accept 连接)
}
} catch (CancelledKeyException ignored) {
// Key 被取消,关闭 Channel
unsafe.close(unsafe.voidPromise());
}
}
/**
Netty是走的优化模式,主要在于遍历方式的区别,SelectedSelectionKeySet 是数组存储遍历高效,没有迭代器开销
**/
private void processSelectedKeysOptimized() {
// 便利数组
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a); // 同上
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
总结,NIO多路复用核心思想就是单线程管理多通道:通过一个Selector(选择器)监控多个通道(Channel)的I/O事件(如连接、读、写),由单线程轮询就绪事件,避免为每个连接创建独立线程,实现非阻塞高效并发。本质上用事件通知机制替代线程轮询,以最小线程开销处理海量连接。