【15】Strongswan watcher详解2
watcher的核心业务函数watch:
(1)如果count为0,没有要监听的句柄,则watcher状态设置为WATCHER_STOPPED,返回,返回值为JOB_REQUEUE_NONE,这会返回到“【11】Strongswan processor 详解1”中的process_job返回处,销毁watcher job。再调用add时,再加入。
(2)watcher状态机WATCHER_QUEUED-》 WATCHER_RUNNING;
(3)使用poll机制,
struct pollfd {
int fd; // 文件描述符
short events; // 需要监控的事件
short revents; // 实际发生的事件
};
//根据监听的fd数量分配pollfd空间,此外为pipe管道的读事件也额外分配一个pfd空间。
pfd = alloca(sizeof(*pfd) * (count + 1));
//监听pipe管道的读事件
pfd[0].fd = this->notify[0];
pfd[0].events = POLLIN;
//遍历pipe管道的读事件之外的所有监听fd,设置pfd
count = 1;
for (entry = this->fds; entry; entry = entry->next)
{
//in_callback在notify中++,在notify_end中--,作用是已有事件没处理完(异步地processor那里没有处理完)先不再加入pfd监测,
if (!entry->in_callback)
{
pfd[count].fd = entry->fd;
pfd[count].events = 0;
reset_event_log(eventbuf, eventpos);
if (entry->events & WATCHER_READ)
{
log_event(eventpos, 'r');
pfd[count].events |= POLLIN;
}
if (entry->events & WATCHER_WRITE)
{
log_event(eventpos, 'w');
pfd[count].events |= POLLOUT;
}
end_event_log(eventpos);
log_fd(logpos, loglen, entry->fd, eventbuf);
count++;
}
}
(4) 调用poll
thread_cleanup_push((void*)activate_all, this);
old = thread_cancelability(TRUE);
res = poll(pfd, count, -1);
if (res == -1 && errno == EINTR)
{
/* LinuxThreads interrupts poll(), but does not make it a
* cancellation point. Manually test if we got canceled. */
thread_cancellation_point();
}
thread_cancelability(old);
//参数false,压入的activate_all不执行
thread_cleanup_pop(FALSE);
......
//revents 为实际发生的事件,没有发生则为空
if (pfd[0].revents & POLLIN)
{
while (TRUE)
{
//pipe O_NONBLOCK enable:当当没有数据可读时,调用返回-1,errno值为EAGAIN,跳出循环
//有数据时,读到buf中,
len = read(this->notify[0], buf, sizeof(buf));
if (len == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
DBG1(DBG_JOB, "reading watcher notify failed: %s",
strerror(errno));
}
break;
}
}
this->pending = FALSE;
DBG2(DBG_JOB, "watcher got notification, rebuilding");
break;
}
......
for (entry = this->fds; entry; entry = entry->next)
{
//?
if (entry->in_callback)
{
continue;
}
reset_event_log(eventbuf, eventpos);
//返回当前entry->fd对应pfd的实际发生事件,可能为空没发生或发生,也可能是错误
revents = find_revents(pfd, count, entry->fd);
if (revents & POLLERR)
{
log_event(eventpos, 'e');
}
if (revents & POLLIN)
{
log_event(eventpos, 'r');
}
if (revents & POLLOUT)
{
log_event(eventpos, 'w');
}
//fd的读事件,如socket收到了数据
if (entry->events & WATCHER_READ &&
revents & (POLLIN | POLLERR | POLLHUP | POLLNVAL))
{
//notify暂时将entry插入jobs队列,后续统一在(5)中再加入processor的job队列处理
notify(this, entry, WATCHER_READ);
}
//fd的写事件,如socket发送数据
if (entry->events & WATCHER_WRITE &&
revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
{
notify(this, entry, WATCHER_WRITE);
}
end_event_log(eventpos);
log_fd(logpos, loglen, entry->fd, eventbuf);
}
notify函数:
/**
* 为已注册的 FD 执行回调
*/
static void notify(private_watcher_t *this, entry_t *entry,
watcher_event_t event)
{
notify_data_t *data;
/* 获取 Async Job 的 entry 副本,但具有特定事件 */
INIT(data,
.entry = entry,
.fd = entry->fd,
.event = event,
.cb = entry->cb,
.data = entry->data,
.keep = TRUE,
.this = this,
);
/* deactivate entry,这样我们就可以 poll() 其他 FDs,即使异步处理还没有处理该事件 */
//换句话说,processer异步处理完成该事件,再重新poll该事件
entry->in_callback++;
//notify_async是该事件业务的最终调用,会调用该事件的处理回调
this->jobs->insert_last(this->jobs,
callback_job_create_with_prio((void*)notify_async, data,
(void*)notify_end, (callback_job_cancel_t)return_false,
JOB_PRIO_CRITICAL));
}
/**
* 执行已注册 FD 的回调,异步
*/
static job_requeue_t notify_async(notify_data_t *data)
{
thread_cleanup_push((void*)unregister, data);
data->keep = data->cb(data->data, data->fd, data->event);
thread_cleanup_pop(FALSE);
//只执行一次,在processor的process_job中销毁该事件对应processor里的job时会调用notify_end,见(6)
return JOB_REQUEUE_NONE;
}
(5)watcher 监听fd有事件时,
if (this->jobs->get_count(this->jobs))
{
while (this->jobs->remove_first(this->jobs,
(void**)&job) == SUCCESS)
{
lib->processor->execute_job(lib->processor, job);
}
/* we temporarily disable a notified FD, rebuild FDSET */
break;
}
(6)notify_end
/**
* 清理通知数据,重新激活 FD
*/
static void notify_end(notify_data_t *data)
{
private_watcher_t *this = data->this;
entry_t *entry, *prev = NULL;
watcher_event_t updated = 0;
bool removed = FALSE;
/* reactivate the disabled entry */
this->mutex->lock(this->mutex);
//遍历watcher的fds,匹配后,判断该事件业务函数的返回值(见notify_async的data->keep = data->cb(data->data, data->fd, data->event);)如果不保持则从fds中移除该事件,注意in_callback--,以便于该事件可以重新poll
for (entry = this->fds; entry; prev = entry, entry = entry->next)
{
if (entry == data->entry)
{
if (!data->keep)
{
//可能的读写事件互换,这发生事件业务回调里面修改了data->event(见notify_async中的data->cb)
entry->events &= ~data->event;
updated = entry->events;
if (!entry->events)
{
remove_entry(this, entry, prev);
removed = TRUE;
break;
}
}
entry->in_callback--;
break;
}
}
this->condvar->broadcast(this->condvar);
update_and_unlock(this);
if (removed)
{
DBG3(DBG_JOB, "removed fd %d[%s%s] from watcher after callback", data->fd,
data->event & WATCHER_READ ? "r" : "",
data->event & WATCHER_WRITE ? "w" : "");
}
else if (updated)
{
DBG3(DBG_JOB, "updated fd %d[%s%s] to %d[%s%s] after callback", data->fd,
(updated | data->event) & WATCHER_READ ? "r" : "",
(updated | data->event) & WATCHER_WRITE ? "w" : "", data->fd,
updated & WATCHER_READ ? "r" : "",
updated & WATCHER_WRITE ? "w" : "");
}
free(data);
}