当前位置: 首页 > news >正文

WEPollSelectorImpl

WEPollSelectorImpl

基于 Windows wepoll 的 Selector 实现。

WEPollSelectorImpl提供了带参数的构造函数WEPollSelectorImpl(SelectorProvider),创建WEPollSelectorImpl时需要指定SelectorProvider对象,这里的SelectorProvider对象为WEPollSelectorProvider。构造函数内对相关对象进行初始化。

WEPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);this.eph = WEPoll.create();// 在一次调用epoll_wait中轮询的最大事件数this.pollArrayAddress = WEPoll.allocatePollArray(NUM_EPOLLEVENTS);// wakeup support(唤醒的支持)try {this.pipe = new PipeImpl(sp, /* AF_UNIX */ true, /*buffering*/ false);} catch (IOException ioe) {WEPoll.freePollArray(pollArrayAddress);WEPoll.close(eph);throw ioe;}this.fd0Val = pipe.source().getFDVal();this.fd1Val = pipe.sink().getFDVal();// register one end of the pipe for wakeupsWEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN);
}
  • WEPoll.create():返回一个HANDLE,而不是一个文件描述符。在JNI中被转化为java可识别的long类型。用来表示一个新的 epoll 实例或epoll端口。
  • WEPoll.allocatePollArray(NUM_EPOLLEVENTS):此方法会分配一个NUM_EPOLLEVENTS*SIZEOF_EPOLLEVENT大小的堆外内存,就64位操作系统来说为256*16=4096字节(4KB),SIZEOF_EPOLLEVENT为结构体epoll_event的大小。然后返回分配内存的起始地址(long类型)。
  • fd0Val:连接到loopback address上的端口的SocketChannelImpl对象(客户端SocketChannel.open()调用)的文件描述符。
  • fd1Val:接受对已绑定到loopback address上的端口的SocketChannelImpl对象的 socket连接(服务端accept()调用)的文件描述符。
  • WEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN):控制 epoll 端口监控哪些socket事件。这里初始的时候监听的是:客户端socket有可用的传入数据,或者已准备好接受有到来的连接。

processUpdateQueue方法

在队列updateKeys为空时,内部的相关逻辑是不执行的,直到往队列updateKeys末尾添加了元素。updateKeys不为空时,这时内部会执行一个while循环一直调用updateKeys.pollFirst()方法,取出第一个元素后然后并从队列删除它,直到没有元素可取,退出while循环。

那在什么时候往updateKeys队列添加第一个元素?目前是在调用AbstractChannel.safeSetSuccess(ChannelPromise)方法时内部逻辑会往updateKeys队列末尾添加第一个元素。

由于此方法是doSelect(Consumer<SelectionKey>, long)方法的内部逻辑,doSelect方法一直被NioEventLoop.run()方法循环调用,在循环调用到第二次时updateKeys队列才存在第一个元素,然后执行while循环内部逻辑:

private void processUpdateQueue() {assert Thread.holdsLock(this);synchronized (updateLock) {SelectionKeyImpl ski;while ((ski = updateKeys.pollFirst()) != null) {if (ski.isValid()) {int fd = ski.getFDVal();// add to fdToKey if neededSelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);assert (previous == null) || (previous == ski);int newOps = ski.translateInterestOps();int registeredOps = ski.registeredEvents();if (newOps != registeredOps) {if (newOps == 0) {// remove from epollWEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);} else {int events = toEPollEvents(newOps);if (registeredOps == 0) {// add to epollWEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events);} else {// modify eventsWEPoll.ctl(eph, EPOLL_CTL_MOD, fd, events);}}ski.registeredEvents(newOps);}}}}
}

这时fdToKeyMap集合为空,会把{key:fd, value:ski} 添加到fdToKey中并返回null,调用translateInterestOps()方法表示把interestOps=16兴趣集(事件掩码)转化为本地 poll 事件集(掩码),返回Net.POLLIN,(Net.POLLIN它是poll.h中定义的事件宏)。调用registeredEvents()方法返回0。调用toEPollEvents(newOps)方法将本地 poll 事件集映射为epoll 事件集(掩码)(EPOLLPRI|EPOLLIN=3)。最后调用 WEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events)方法用来把events赋值给结构体epoll_event成员events,把fd文件描述符赋值给结构体epoll_event.data.sock成员。具体对应下面逻辑:

typedef union epoll_data {void* ptr;int fd;uint32_t u32;uint64_t u64;SOCKET sock; /* Windows specific */HANDLE hnd;  /* Windows specific */
} epoll_data_t;struct epoll_event {uint32_t events;   /* Epoll events and flags */epoll_data_t data; /* User data variable */
};Java_sun_nio_ch_WEPoll_ctl(JNIEnv *env, jclass clazz, jlong h,jint opcode, jlong s, jint events)
{struct epoll_event event;int res;SOCKET socket = (SOCKET) jlong_to_ptr(s);event.events = (uint32_t) events;event.data.sock = socket;res = epoll_ctl(jlong_to_ptr(h), opcode, socket, &event);return (res == 0) ? 0 : errno;
}

最后调用registeredEvents(int)方法把ski.registeredEvents属性由0修改为Net.POLLIN

doSelect方法

action参数:一般为 null,除非有自定义SelectionKey相关方法实现。
timeout参数:一般为-1或者0。当为0时执行到WEPoll.wait方法时不会阻塞,直接返回在等待的事件,为-1时,执行到WEPoll.wait方法时会无限期阻塞,直到有报告的事件,例如当在阻塞时,当socket服务端接收到socket客户端的连接时则停止阻塞,返回在等待的事件。

protected int doSelect(Consumer<SelectionKey> action, long timeout)throws IOException
{assert Thread.holdsLock(this);// epoll_wait timeout is intint to = (int) Math.min(timeout, Integer.MAX_VALUE);boolean blocking = (to != 0);int numEntries;processUpdateQueue();processDeregisterQueue();try {begin(blocking);long comp = Blocker.begin(blocking);try {numEntries = WEPoll.wait(eph, pollArrayAddress, NUM_EPOLLEVENTS, to);} finally {Blocker.end(comp);}} finally {end(blocking);}processDeregisterQueue();return processEvents(numEntries, action);
}

processEvents方法

numEntries参数:事件数。
action参数:一般为 null,除非有自定义SelectionKey相关方法实现。

private int processEvents(int numEntries, Consumer<SelectionKey> action)throws IOException
{assert Thread.holdsLock(this);boolean interrupted = false;int numKeysUpdated = 0;for (int i = 0; i < numEntries; i++) {long event = WEPoll.getEvent(pollArrayAddress, i);int fd = WEPoll.getDescriptor(event);if (fd == fd0Val) {interrupted = true;} else {SelectionKeyImpl ski = fdToKey.get(fd);if (ski != null) {int events = WEPoll.getEvents(event);if ((events & WEPoll.EPOLLPRI) != 0) {// 读取并丢弃socket上的紧急数据(MSG_OOB)。Net.discardOOB(ski.getFD());}int rOps = toReadyOps(events);numKeysUpdated += processReadyEvents(rOps, ski, action);}}}if (interrupted) {clearInterrupt();}return numKeysUpdated;
}

假设在第二次阻塞停止,开始调用processEvents(int, Consumer<SelectionKey>)方法:

  • WEPoll.getEvent(pollArrayAddress, i)方法:返回第i个结构体epoll_event内存的起始地址
  • WEPoll.getDescriptor(event)方法:用于计算结构体epoll_event成员data相对于结构体起始地址的字节偏移量。
  • WEPoll.getEvents(event)方法:用于计算结构体epoll_event成员events相对于结构体起始地址的字节偏移量,返回epoll事件掩码。
  • toReadyOps(events)方法:将epoll 事件掩码(1)转化为本都 poll 事件掩码(Net.POLLIN
  • processReadyEvents(rOps, ski, action)方法:设置ski.readyOps SelectionKey.OP_ACCEPT(16),最后将ski对象添加到ServerSocketChannelImpl.selectedKeys Set集合里,numKeysUpdated加1
http://www.dtcms.com/a/305150.html

相关文章:

  • AI工作流赋能,业务的超级加速器
  • mybatis-plus代码生成器
  • 主数据管理系统能代替数据中台吗?
  • ESP32学习-1.第一个程序helloworld
  • OTA | xmodem ymodem文件传输协议收发的C语言实现
  • FlowLong工作流
  • OI 杂讲
  • ASDIP Concrete(混凝土结构设计软件) v6.0.0.2 免费版
  • 光环云 × 零一万物在上海WAIC联合发布“法律智算综合云服务”,以专业Agent助力法律普惠发展
  • debug redis里面的lua脚本
  • JSON在java中的使用
  • c++之链表
  • 技术干货 | 矢网DTF测量技术:透视线缆、天线与波导内部缺陷的“射频X光”(二)
  • 人工智能赋能社会治理:深度解析与未来展望
  • 移位运算以及定点数的加减法操作
  • 深入解析 Spring SpEL:SpelExpressionParser 的使用与实践
  • Python游戏开发:Pygame全面指南与实战
  • JAVA存储原生json字符串到redis,去除@class,实现原生命令操作教程
  • 从传统到智能:Midscene.js 如何用 AI 颠覆自动化测试!
  • 【Lua】题目小练4
  • 深入解析RocksDB的MVCC和LSM Tree level
  • 基于springboot/java/VUE的旅游管理系统/旅游网站的设计与实现
  • USB Type-C PD协议一文通
  • mangoDB面试题及详细答案 117道(026-050)
  • CVE-2021-1675
  • 【C语言进阶】题目练习
  • docker部署zingerbee/netop 轻量级网络流量监控工具
  • 河南萌新联赛2025第(二)场:河南农业大学(补题)
  • 高端医疗超声AFE模拟前端应用
  • 机器学习之线性回归——小白教学