WEPollSelectorImpl
WEPollSelectorImpl
基于 Windows wepoll 的 Selector 实现。
WEPollSelectorImpl提供了带参数的构造函数WEPollSelectorImpl(SelectorProvider)
,创建WEPollSelectorImpl时需要指定SelectorProvider对象,这里的SelectorProvider对象为WEPollSelectorProvider。构造函数内对相关对象进行初始化。
WEPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);this.eph = WEPoll.create();// 在一次调用epoll_wait中轮询的最大事件数this.pollArrayAddress = WEPoll.allocatePollArray(NUM_EPOLLEVENTS);// wakeup support(唤醒的支持)try {this.pipe = new PipeImpl(sp, /* AF_UNIX */ true, /*buffering*/ false);} catch (IOException ioe) {WEPoll.freePollArray(pollArrayAddress);WEPoll.close(eph);throw ioe;}this.fd0Val = pipe.source().getFDVal();this.fd1Val = pipe.sink().getFDVal();// register one end of the pipe for wakeupsWEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN);
}
- WEPoll.create():返回一个
HANDLE
,而不是一个文件描述符。在JNI中被转化为java可识别的long类型。用来表示一个新的 epoll 实例或epoll端口。 - WEPoll.allocatePollArray(NUM_EPOLLEVENTS):此方法会分配一个
NUM_EPOLLEVENTS*SIZEOF_EPOLLEVENT
大小的堆外内存,就64位操作系统来说为256*16=4096字节(4KB),SIZEOF_EPOLLEVENT为结构体epoll_event的大小。然后返回分配内存的起始地址(long类型)。 - fd0Val:连接到loopback address上的端口的SocketChannelImpl对象(客户端
SocketChannel.open()
调用)的文件描述符。 - fd1Val:接受对已绑定到loopback address上的端口的SocketChannelImpl对象的 socket连接(服务端
accept()
调用)的文件描述符。 - WEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN):控制 epoll 端口监控哪些socket事件。这里初始的时候监听的是:客户端socket有可用的传入数据,或者已准备好接受有到来的连接。
processUpdateQueue方法
在队列updateKeys为空时,内部的相关逻辑是不执行的,直到往队列updateKeys末尾添加了元素。updateKeys不为空时,这时内部会执行一个while循环一直调用updateKeys.pollFirst()
方法,取出第一个元素后然后并从队列删除它,直到没有元素可取,退出while循环。
那在什么时候往updateKeys队列添加第一个元素?目前是在调用AbstractChannel.safeSetSuccess(ChannelPromise)
方法时内部逻辑会往updateKeys队列末尾添加第一个元素。
由于此方法是doSelect(Consumer<SelectionKey>, long)
方法的内部逻辑,doSelect方法一直被NioEventLoop.run()
方法循环调用,在循环调用到第二次时updateKeys队列才存在第一个元素,然后执行while循环内部逻辑:
private void processUpdateQueue() {assert Thread.holdsLock(this);synchronized (updateLock) {SelectionKeyImpl ski;while ((ski = updateKeys.pollFirst()) != null) {if (ski.isValid()) {int fd = ski.getFDVal();// add to fdToKey if neededSelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);assert (previous == null) || (previous == ski);int newOps = ski.translateInterestOps();int registeredOps = ski.registeredEvents();if (newOps != registeredOps) {if (newOps == 0) {// remove from epollWEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);} else {int events = toEPollEvents(newOps);if (registeredOps == 0) {// add to epollWEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events);} else {// modify eventsWEPoll.ctl(eph, EPOLL_CTL_MOD, fd, events);}}ski.registeredEvents(newOps);}}}}
}
这时fdToKey
Map集合为空,会把{key:fd, value:ski} 添加到fdToKey
中并返回null
,调用translateInterestOps()
方法表示把interestOps=16
兴趣集(事件掩码)转化为本地 poll 事件集(掩码),返回Net.POLLIN
,(Net.POLLIN它是poll.h中定义的事件宏)。调用registeredEvents()
方法返回0
。调用toEPollEvents(newOps)
方法将本地 poll 事件集映射为epoll 事件集(掩码)(EPOLLPRI|EPOLLIN=3)。最后调用 WEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events)
方法用来把events
赋值给结构体epoll_event成员events,把fd
文件描述符赋值给结构体epoll_event.data.sock成员。具体对应下面逻辑:
typedef union epoll_data {void* ptr;int fd;uint32_t u32;uint64_t u64;SOCKET sock; /* Windows specific */HANDLE hnd; /* Windows specific */
} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events and flags */epoll_data_t data; /* User data variable */
};Java_sun_nio_ch_WEPoll_ctl(JNIEnv *env, jclass clazz, jlong h,jint opcode, jlong s, jint events)
{struct epoll_event event;int res;SOCKET socket = (SOCKET) jlong_to_ptr(s);event.events = (uint32_t) events;event.data.sock = socket;res = epoll_ctl(jlong_to_ptr(h), opcode, socket, &event);return (res == 0) ? 0 : errno;
}
最后调用registeredEvents(int)
方法把ski.registeredEvents
属性由0
修改为Net.POLLIN
。
doSelect方法
action参数:一般为 null
,除非有自定义SelectionKey
相关方法实现。
timeout参数:一般为-1
或者0
。当为0
时执行到WEPoll.wait方法时不会阻塞,直接返回在等待的事件,为-1
时,执行到WEPoll.wait方法时会无限期阻塞,直到有报告的事件,例如当在阻塞时,当socket服务端接收到socket客户端的连接时则停止阻塞,返回在等待的事件。
protected int doSelect(Consumer<SelectionKey> action, long timeout)throws IOException
{assert Thread.holdsLock(this);// epoll_wait timeout is intint to = (int) Math.min(timeout, Integer.MAX_VALUE);boolean blocking = (to != 0);int numEntries;processUpdateQueue();processDeregisterQueue();try {begin(blocking);long comp = Blocker.begin(blocking);try {numEntries = WEPoll.wait(eph, pollArrayAddress, NUM_EPOLLEVENTS, to);} finally {Blocker.end(comp);}} finally {end(blocking);}processDeregisterQueue();return processEvents(numEntries, action);
}
processEvents方法
numEntries参数:事件数。
action参数:一般为 null
,除非有自定义SelectionKey
相关方法实现。
private int processEvents(int numEntries, Consumer<SelectionKey> action)throws IOException
{assert Thread.holdsLock(this);boolean interrupted = false;int numKeysUpdated = 0;for (int i = 0; i < numEntries; i++) {long event = WEPoll.getEvent(pollArrayAddress, i);int fd = WEPoll.getDescriptor(event);if (fd == fd0Val) {interrupted = true;} else {SelectionKeyImpl ski = fdToKey.get(fd);if (ski != null) {int events = WEPoll.getEvents(event);if ((events & WEPoll.EPOLLPRI) != 0) {// 读取并丢弃socket上的紧急数据(MSG_OOB)。Net.discardOOB(ski.getFD());}int rOps = toReadyOps(events);numKeysUpdated += processReadyEvents(rOps, ski, action);}}}if (interrupted) {clearInterrupt();}return numKeysUpdated;
}
假设在第二次阻塞停止,开始调用processEvents(int, Consumer<SelectionKey>)
方法:
- WEPoll.getEvent(pollArrayAddress, i)方法:返回第i个结构体epoll_event内存的起始地址
- WEPoll.getDescriptor(event)方法:用于计算结构体epoll_event成员data相对于结构体起始地址的字节偏移量。
- WEPoll.getEvents(event)方法:用于计算结构体epoll_event成员events相对于结构体起始地址的字节偏移量,返回epoll事件掩码。
- toReadyOps(events)方法:将epoll 事件掩码(1)转化为本都 poll 事件掩码(
Net.POLLIN
) - processReadyEvents(rOps, ski, action)方法:设置
ski.readyOps
为SelectionKey.OP_ACCEPT
(16),最后将ski对象添加到ServerSocketChannelImpl.selectedKeys
Set集合里,numKeysUpdated加1
。