当前位置: 首页 > news >正文

Netty是怎么实现Java NIO多路复用的?(源码)

目录

  • NIO多路复用实现
  • 事件循环是什么?
  • 核心源码
    • (1)调用 NioEventLoopGroup 默认构造器
    • (2)指定 SelectorProvider
    • (3)创建 `Selector`
    • (4)创建单线程和队列
    • (5)单线程处理就绪IO事件

最近想再巩固一下NIO等多路复用的实现思路,本文通过Netty源码来进一步总结NIO多路复用的运用。

先上一组简单的NIO多路复用实现,

NIO多路复用实现

服务端通过selector组件轮询处理就绪IO事件,一个线程可以支持处理多个网络连接。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
    private static final int PORT = 8080;
    private static final int BUFFER_SIZE = 1024;

    public static void main(String[] args) {
        try {
            // 1. 创建Selector
            Selector selector = Selector.open();

            // 2. 创建ServerSocketChannel并绑定端口
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.socket().bind(new InetSocketAddress(PORT));
            serverChannel.configureBlocking(false); // 非阻塞模式

            // 3. 注册ServerSocketChannel到Selector,监听ACCEPT事件
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器启动,监听端口: " + PORT);

            while (true) {
                // 4. 阻塞等待就绪的事件(有新连接、数据可读等)
                selector.select();

                // 5. 获取所有就绪的事件集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iter = selectedKeys.iterator();

                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove(); // 必须手动移除,防止重复处理

                    if (key.isAcceptable()) {
                        // 6. 处理ACCEPT事件:接受客户端连接
                        handleAcceptMethod(key, selector);
                    } else if (key.isReadable()) {
                        // 7. 处理READ事件:读取客户端数据
                        handleReadMethod(key);
                    }
                }
            }
        } catch (IOException e) {
            ...
        }
    }

    ...
}

Netty 是基于Java NIO实现异步事件驱动的高性能网络应用框架,接下来结合Netty源码,看看在Netty的事件循环EventLoop中,是怎么实现NIO的多路复用的。

事件循环是什么?

(1)NioEventLoop事件循环核心组件,基于Java NIO实现。

职责:负责I/O事件处理,通过Selector轮询Channel的读写事件(如accept、read、write),触发对应Handler执行。

作用:

  • 每个NioEventLoop绑定一个独立线程,以串行无锁化方式处理所有注册到其上的Channel,确保线程安全。
  • 通过单线程处理多Channel的就绪事件,减少上下文切换,支撑高并发低延迟;

(2)NioEventLoopGroup 为事件循环组,主要是为NioEventLoop提供线程资源调度和事件循环管理;

职责:每个 NioEventLoopGroup 管理一组 NioEventLoop,统一管理组内所有NioEventLoop的启动、任务执行、资源释放,通过next()方法轮询分配 EventLoop,实现负载均衡。

其中,

  • Boss Group:服务端专用,负责监听 TCP 连接(accept事件),将新连接分配给 Worker Group。
  • Worker Group:处理已建立连接的 I/O 读写(read/write事件)及异步任务。

核心源码

(1)调用 NioEventLoopGroup 默认构造器

/**
 * boss 线程组,用于服务端接受客户端的连接
 */
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
 * worker 线程组,用于服务端接受客户端的数据读写
 */
private EventLoopGroup workerGroup = new NioEventLoopGroup();

(2)指定 SelectorProvider

可以看到 Netty 帮我们传入了 SelectorProvider.provider(),用于后续 NIO 创建 SelectorChannel 等关键组件。

// io.netty.channel.nio.NioEventLoopGroup

// 传入 SelectorProvider.provider()
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

// 默认拒绝策略,直接排除异常 - RejectedExecutionHandlers.reject()
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

// 真正创建 EventLoop - 单线程
// 会在下面父类 MultithreadEventExecutorGroup 构造器中被调用
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

super(…) 进入父类 MultithreadEventLoopGroup 构造器,默认创建 2倍CPU核心个数 的 EventLoop。

// io.netty.channel.MultithreadEventLoopGroup

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    ...
}

// 默认 EventLoop个数为 2倍CPU核心数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

// 构造器,开始初始化
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 新建 EventLoop 对象,在上述 NioEventLoopGroup 中实现了
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

(3)创建 Selector

回头来看 NioEventLoop 构造器,每个 NioEventLoop 都会创建并维护一个 Selector

Tips: Netty 对 Selector 做了优化, 即下面的 unwrappedSelector,在初始化时,会调用 openSelector() 方法,强制替换 SelectorselectedKeysSelectedSelectionKeySet(内部是数组实现)。

// io.netty.channel.nio.NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    // 创建了两个任务队列,普通任务队列(taskQueue) 和 尾部任务队列(tailTaskQueue)
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
          rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    // 新建并维护 Selector 事件轮询器
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

// 默认使用 无锁高性能队列(如 MpscChunkedArrayQueue),特点是 多生产者单消费者(MPSC),避免多线程竞争。
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks);
}
// 普通任务队列(taskQueue):处理IO事件以及用户提交的任务 channel.eventLoop().execute(...)
// 尾部任务队列(tailTaskQueue):处理 低优先级或非紧急任务

(4)创建单线程和队列

接着构造,每个 NioEventLoop 会维护 任务队列以及一个线程用于任务执行和调度。

进入父类 -> SingleThreadEventLoop 构造器

// io.netty.channel.SingleThreadEventLoop

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
    // 传入刚刚创建的 普通任务队列
    super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    // 传入尾部任务队列
    tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

进入父类 -> SingleThreadEventExecutor 构造器,实现了execute() 方法进行任务调度,

// io.netty.util.concurrent.SingleThreadEventExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.threadLock = new CountDownLatch(1);
    this.shutdownHooks = new LinkedHashSet();
    this.state = 1;
    this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 任务队列,用于接收任务
    this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

/**
    核心执行任务逻辑
**/
private void execute(Runnable task, boolean immediate) {
    // 检查是否在事件循环线程内
    boolean inEventLoop = this.inEventLoop();
    // 添加任务到 任务队列中
    this.addTask(task);
    // 若首次不在事件循环线程内,即首次提交任务,需要触发线程启动
    if (!inEventLoop) {
        // 启动线程
        this.startThread();
        if (this.isShutdown()) {
            boolean reject = false;

            try {
                if (this.removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException var6) {
            }

            if (reject) {
                reject();
            }
        }
    }

    if (!this.addTaskWakesUp && immediate) {
        this.wakeup(inEventLoop);
    }

}

/**
    在这里 EventLoop 会启动一个线程, 用于执行任务
**/
private void startThread() {
    if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
        boolean success = false;

        try {
            this.doStartThread();
            success = true;
        } finally {
            if (!success) {
                STATE_UPDATER.compareAndSet(this, 2, 1);
            }

        }
    }
}

/**
    this.executor 默认是 ThreadPerTaskExecutor,该执行器为每个任务创建一个新线程,因此每个 NioEventLoop 都会绑定一个独立线程
**/
private void doStartThread() {
    assert this.thread == null;

    // ThreadPerTaskExecutor 会创建一个新的线程
    this.executor.execute(new Runnable() {
        public void run() {
            ...
            // 执行事件循环 - NioEventLoop 具体实现 run()
            SingleThreadEventExecutor.this.run();                    
            ... 
        }
    });
}

(5)单线程处理就绪IO事件

回到 NioEventLoop,查看 run() 方法处理就绪的I/O事件

// 主要框架如下,
protected void run() {
    for (;;) { // 无限循环
        // 1. 轮询 I/O 事件(strategy = Selector.select)
        // 2. 处理 I/O 事件(selector.selectedKeys() --> processSelectedKeys)
        // 3. 执行任务队列(runAllTasks)
    }
}

源码如下,

// io.netty.channel.nio.NioEventLoop

@Override
protected void run() {
    // 记录连续无有效事件时的 select 调用次数(用于检测空轮询)
    int selectCnt = 0;
    for (;;) {
        try {
            // (1) 超时等待 I/O事件就绪 -> select
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        // 任务队列为空,超时等待 I/O 事件
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                // 出现异常,重置 select 计数
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            // (2) 执行IO事件以及执行任务
            selectCnt++; // 递增 select 调用次数
            cancelledKeys = 0; 
            needsToSelectAgain = false; 
            // 获取 I/O 时间占比(默认 50,即 50% 时间处理 I/O,50% 处理任务)
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            // 根据 ioRatio 分配 I/O 处理与任务执行时间
            if (ioRatio == 100) {// 全时处理 I/O,任务无时间限制
                try {
                    if (strategy > 0) { // 有就绪I/O事件
                        processSelectedKeys();// 处理 I/O 事件(如 accept、read、write)
                    }
                } finally {
                    // 执行所有任务(普通队列 + 尾部队列)
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) { // 限时处理IO事件
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();// 处理 I/O 事件(如 accept、read、write)
                } finally {
                    // 限时执行所有任务(普通队列 + 尾部队列)
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {// 无 I/O 事件(strategy == 0),仅执行任务
                ranTasks = runAllTasks(0); 
            }

            // (3) 检测空轮询和异常唤醒
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                // 正常情况:执行了任务或处理了 I/O 事件,重置计数
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) {  // 异常唤醒(如未处理事件却唤醒)
                selectCnt = 0; // 重置计数
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // 处理一些关闭逻辑...
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

/**
    查询就绪I/O事件,调用 select 方法 
**/
private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

/**
    处理就绪的IO事件,selector.selectedKeys() 获取就绪事件
**/
private void processSelectedKeys() {
    // selectedKeys 为null说明Netty启用了优化模式,通过反射替换 JDK 原生的 Selector 实现,将 selectedKeys 替换为高性能的数组结构(SelectedSelectionKeySet),实现高效添加和遍历,避免Hash冲突的开销,默认开启优化模式
    if (selectedKeys != null) {
        // 优化模式,SelectedSelectionKeySet selectedKeys 数组存储就绪事件
        processSelectedKeysOptimized();
    } else {
        // 普通模式 
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

// 非优化模式下的处理IO事件
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    // 遍历 selectedKeys 并执行相应Channel的事件
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        // run 方法已经设置 needsToSelectAgain = false
        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    ...

    try {
        int readyOps = k.readyOps();// 获取就绪的操作类型
        
        // 处理连接就绪事件(客户端已建立连接)
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;// 移除 CONNECT 监听
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 处理写就绪事件(数据可写)
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush(); // Socket 缓冲区可写,强制刷新待发送数据
        }

        // 处理读就绪或 Accept 事件(数据可读/新连接)
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read(); // 触发读取操作(如读取数据或 Accept 连接)
        }
    } catch (CancelledKeyException ignored) {
        // Key 被取消,关闭 Channel
        unsafe.close(unsafe.voidPromise());
    }
}

/**
    Netty是走的优化模式,主要在于遍历方式的区别,SelectedSelectionKeySet 是数组存储遍历高效,没有迭代器开销
**/
private void processSelectedKeysOptimized() {
    // 便利数组
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a); // 同上
        }  else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
    }

    if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);

        selectAgain();
        i = -1;
    }
}

总结,NIO多路复用核心思想就是单线程管理多通道:通过一个Selector(选择器)监控多个通道(Channel)的I/O事件(如连接、读、写),由单线程轮询就绪事件,避免为每个连接创建独立线程,实现非阻塞高效并发。本质上用事件通知机制替代线程轮询,以最小线程开销处理海量连接。

相关文章:

  • 【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取
  • 微软具身智能感知交互多面手!Magma:基于基础模型的多模态AI智能体
  • 初识SQL
  • 在 macOS 使用 .pem 私钥免密登录腾讯云服务器
  • java高级(IO流多线程)
  • 【愚公系列】《Python网络爬虫从入门到精通》036-DataFrame日期数据处理
  • GPT-4.5实际性能评测:实际探索
  • 3475相或为k
  • 1-PostgreSQL 简介
  • huggingface下载模型到本地缓存环境变量配置详解
  • 《每天读一个JDK源码》之HashMap解读
  • Python线程池知多少
  • 【Qt】ffmpeg照片提取、视频播放▲
  • 【Java 基础(人话版)】Java SE vs Java EE
  • 请解释 Node.js 中的网络模块(http、https),如何创建 HTTP服务器?
  • ESP32+Mixly+温湿度传感器DHT11
  • LangChain项目实战1——基于公司制度RAG回答机器人
  • PHP的学习
  • 如何通过 LlamaIndex 将数据导入 Elasticsearch
  • DAY09 Map接口、斗地主案例(有序版本)、冒泡排序
  • 推销什么企业做网站和app6/北京seo全网营销
  • 南宁网站开发推广/网站优化包括哪些内容
  • 有哪些企业网站/危机舆情公关公司
  • p2p网站建设cms/中国互联网公司排名
  • 南宁微信网站制作/东方网络律师团队
  • 搜索引擎网站建设/seo外链在线工具