[Basic] 03.QEMU Task Model 概览
目录
1.基本概念
1.1.同步 & 异步
1.2.阻塞 & 非阻塞
1.3.并发 & 并行
1.4.协程
1.5.sigsetjmp() & siglongjmp()
2.GLib - 主事件循环
2.1.事件上下文 - GMainContext
2.2.事件源 - GSource
2.3.主事件循环 - GMainLoop
2.4.事件处理 - GSourceFuncs
2.5.事件回调 - GSourceCallbackFuncs
3.QEMU 中的协程
3.1.创建协程 - qemu_coroutine_create()
3.2.进入协程 - qemu_coroutine_enter()
3.3.移交协程控制权 - qemu_coroutine_yield()
4.QEMU 中的 AIO
4.1.创建 AIO 实例 - qemu_aio_context
4.2.注册需要监听的 FD - aio_set_fd_handler()
5.QEMU 中的 BH
5.1.AIO 主动通知 - aio_notify()
5.2.AIO 被动通知 - aio_context_notifier_cb()
5.3.向 AIO Context 中插入 BH - aio_bh_schedule_oneshot()
1.基本概念
同步和异步、阻塞和非阻塞、并发和并行、以及协程的基本概念
1.1.同步 & 异步
同步 Synchronised 和异步 Asynchronized 描述的是应用程序与内核的交互方式,对于操作结果来说,即自身是否会等待结果返回再继续执行
-
同步:应用程序发起 IO 请求后,需要等待或轮询内核 IO 操作是否已经返回结果才能继续执行;
-
异步:应用程序发起 IO 请求后仍继续执行,内核 IO 操作完成后通知应用程序,或调用应用程序的回调函数;
1.2.阻塞 & 非阻塞
阻塞 Blocking 和非阻塞 NonBlocking 描述的是应用程序调用内核 IO 操作的方式
-
阻塞:IO 操作需要彻底完成后才会返回用户空间;
-
非阻塞:IO 操作被调用后立即返回一个状态值,无需等待 IO 操作彻底完成;
1.3.并发 & 并行
操作系统中:
-
并发,指一个时间段内,有几个程序的状态均处于已启动运行到运行完毕之间,且这几个程序都在同一个 CPU 上运行,但任一个时刻只有一个程序在 CPU 上运行;
-
并行,指系统同时执行多个任务,每个任务在不同的 CPU 核心上执行,因此它们真正同时进行;
并发和并行的区别:
-
并发 Concurrency:逻辑上具备同时处理多个任务的能力;
-
并行 Parallesim:物理上在同一时刻执行多个并发任务,依赖多核处理器等物理设备;
1.4.协程
协程 Coroutine 是一种程序组件,由子例程(过程、函数、例程、方法、子程序)的概念泛化而来
子例程只有一个入口点且只返回一次,而协程允许多个入口点,可以在指定位置挂起和恢复执行
协程在行为逻辑上和线程/进程类似,都是实现不同逻辑流的切换和调度,但协程是编译器级的,进程/线程是操作系统级的
协程可以理解为用同步的语义解决异步问题,即业务逻辑看起来是同步的,但实际并不阻塞当前线程(一般靠事件循环处理分发消息)
1.5.sigsetjmp() & siglongjmp()
sigsetjmp() 保存当前的堆栈环境,然后将当前的地址作一个记号, 在程序其他地方调用 siglongjmp() 会直接跳到这个记号位置,然后还原堆栈,继续程序的执行
DESCRIPTION
A call to sigsetjmp() saves the calling environment in its env parameter
for later use by siglongjmp().
If the value of savemask is not zero,
sigsetjmp() also saves the current signal mask of the calling thread
as part of the calling environment.PARAMETERS
@env
Is the buffer where the calling thread's environment is saved.
@savemask
Determines whether or not the calling thread's signal mask
is saved as part of the environment.
If non-zero, the signal mask is saved.
If zero, the signal mask is not saved.#include <setjmp.h>
int sigsetjmp(sigjmp_buf env, int savemask);
-------------------------------------------------------------------DESCRIPTION
The siglongjmp() function restores the environment saved by
the most recent invocation of sigsetjmp() in the same thread,
with the corresponding env argument.
If there is no such invocation,
or if the function containing the invocation of sigsetjmp()
has terminated execution in the interim,
the behavior is undefined.
The siglongjmp() function restores the calling thread's signal mask
if and only if env was initialized by a call to sigsetjmp()
with a non-zero value for savemask.
PARAMETERS
@env
Is the buffer where the calling thread's environment has been saved.
@val
Is the preferred return value from sigsetjmp().#include <setjmp.h>
void siglongjmp(sigjmp_buf env, int val);
2.GLib - 主事件循环
GLib 实现了循环事件分发机制 Event Loop,其处理流程如下:
-
prepare():调用各事件源的 prepare() 回调函数,检查事件源中是否有事件发生,对于无需执行 poll 的事件源(如 idle)返回 TRUE,表示 idle 事件已经发生;对于需要 poll 的事件源(如 fd)返回 FALSE,因为只有在 poll fd 之后,才能知道事件是否发生;
-
query():获取需要执行 poll 的文件描述符 fd;
-
check():调用 poll 对 fd 进行监听,调用各事件源的 check() 回调函数,检查是否有事件发生;
-
dispatch():若某事件源有事件发生(prepare() 或 check() 返回 TRUE),则调用其事件处理函数;
GLib 中预定义的事件源:
-
文件描述符(fd、pipe、socket);
-
超时(timeout);
-
idle 事件;
2.1.事件上下文 - GMainContext
为了让多个独立事件源能够在不同的线程中处理,每个事件源都会关联一个 GMainContext
一个线程只能运行一个 GMainContext,但在其他线程中能够对事件源进行添加和删除操作
// /usr/include/glib-2.0/glib/gmain.h
typedef struct _GMainContext GMainContext;
-----------------------------------------------------// glib/gmain.c
struct _GMainContext
{/* The following lock is used for both the list of sources* and the list of poll records*/GMutex mutex;GCond cond;GThread *owner;guint owner_count;GMainContextFlags flags;GSList *waiters;gint ref_count; /* (atomic) */GHashTable *sources; /* guint -> GSource */GPtrArray *pending_dispatches;gint64 timeout_usec; /* Timeout for current iteration */guint next_id;GQueue source_lists;gint in_check_or_prepare;GPollRec *poll_records;guint n_poll_records;GPollFD *cached_poll_array;guint cached_poll_array_size;GWakeup *wakeup;GPollFD wake_up_rec;/* Flag indicating whether the set of fd's changed during a poll */gboolean poll_changed;GPollFunc poll_func;gint64 time;gboolean time_is_fresh;
};
2.2.事件源 - GSource
GSource 表示事件源
GSourcePrivate 为事件源的优先级
// /usr/include/glib-2.0/glib/gmain.h
typedef struct _GSource GSource;
typedef struct _GSourcePrivate GSourcePrivate;
-------------------------------------------------------// glib/gmain.h
struct _GSource
{/*< private >*/gpointer callback_data;GSourceCallbackFuncs *callback_funcs;const GSourceFuncs *source_funcs;guint ref_count;GMainContext *context;gint priority;guint flags; /* (atomic) */guint source_id;GSList *poll_fds;GSource *prev;GSource *next;char *name;GSourcePrivate *priv;
};
-------------------------------------------------------// glib/gmain.h
struct _GSourcePrivate
{GSList *child_sources;GSource *parent_source;gint64 ready_time;/* This is currently only used on UNIX, but we always declare it (and* let it remain empty on Windows) to avoid #ifdef all over the place.*/GSList *fds;GSourceDisposeFunc dispose;gboolean static_name;
};
2.3.主事件循环 - GMainLoop
GMainLoop 表示一个主事件循环
通过 g_main_loop_new() 创建 GMainLoop 对象
初始事件源添加后执行 g_main_loop_run(),主循环持续检查每个事件源是否产生了新的事件,如果有则分发它们,直到处理来自某个事件源的事件时触发了 g_main_loop_quit() 退出主循环为止
// /usr/include/glib-2.0/glib/gmain.h
typedef struct _GMainLoop GMainLoop;
--------------------------------------------------// glib/gmain.c
struct _GMainLoop
{GMainContext *context;gboolean is_running; /* (atomic) */gint ref_count; /* (atomic) */
};
2.4.事件处理 - GSourceFuncs
GSourceFuncs 包含分发和处理 GSource 的相关方法:prepare()、check()、dispatch()、finalize()
// /usr/include/glib-2.0/glib/gmain.h
typedef struct _GSourceFuncs GSourceFuncs;
-----------------------------------------------------// glib/gmain.h
struct _GSourceFuncs
{GSourceFuncsPrepareFunc prepare; /* Can be NULL */GSourceFuncsCheckFunc check; /* Can be NULL */GSourceFuncsDispatchFunc dispatch;GSourceFuncsFinalizeFunc finalize; /* Can be NULL *//*< private >*//* For use by g_source_set_closure */GSourceFunc closure_callback; GSourceDummyMarshal closure_marshal; /* Really is of type GClosureMarshal */
};
2.5.事件回调 - GSourceCallbackFuncs
事件源 GSource 的回调函数
// /usr/include/glib-2.0/glib/gmain.h
typedef struct _GSourceCallbackFuncs GSourceCallbackFuncs;
-------------------------------------------------------------// glib/gmain.h
struct _GSourceCallbackFuncs
{void (*ref) (gpointer cb_data);void (*unref) (gpointer cb_data);void (*get) (gpointer cb_data,GSource *source, GSourceFunc *func,gpointer *data);
};
3.QEMU 中的协程
QEMU 作为虚拟化软件,主体架构采用事件驱动模型,在 main-loop 中监控各种文件、事件,消息和状态的变化并进行各种操作,当大量阻塞操作发生时,为了不影响虚拟机的执行效率,一般采用异步的方式,而异步方式需要设置 callback 函数的调用时机,同时保存大量的执行状态,这会导致逻辑代码支离破碎,复杂并难以理解,所以采用协程以同步的方式将代码执行异步化,QEMU 中调用协程的大致流程如下:
3.1.创建协程 - qemu_coroutine_create()
协程创建时绑定入口函数以及该函数所需的参数
// util/qemu-coroutine.c
Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
{Coroutine *co = NULL;if (IS_ENABLED(CONFIG_COROUTINE_POOL)) {co = coroutine_pool_get();}if (!co) {co = qemu_coroutine_new(); // 创建新的协程}co->entry = entry; // 调用协程的入口函数co->entry_arg = opaque;QSIMPLEQ_INIT(&co->co_queue_wakeup);return co;
}
3.2.进入协程 - qemu_coroutine_enter()
调用 qemu_get_current_aio_context() 获取事件的上下文,然后将其与协程绑定
// util/qemu-coroutine.c
void qemu_coroutine_enter(Coroutine *co)
{qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
}
-----------------------------------------------------------------// util/qemu-coroutine.c
void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
{QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);Coroutine *from = qemu_coroutine_self();QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);/* Run co and any queued coroutines */while (!QSIMPLEQ_EMPTY(&pending)) {Coroutine *to = QSIMPLEQ_FIRST(&pending);CoroutineAction ret;/** Read to before to->scheduled; pairs with qatomic_cmpxchg in* qemu_co_sleep(), aio_co_schedule() etc.*/smp_read_barrier_depends();const char *scheduled = qatomic_read(&to->scheduled);QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);trace_qemu_aio_coroutine_enter(ctx, from, to, to->entry_arg);/* if the Coroutine has already been scheduled, entering it again will* cause us to enter it twice, potentially even after the coroutine has* been deleted */if (scheduled) {fprintf(stderr,"%s: Co-routine was already scheduled in '%s'\n",__func__, scheduled);abort();}if (to->caller) {fprintf(stderr, "Co-routine re-entered recursively\n");abort();}to->caller = from; // 协程的调用者to->ctx = ctx; // 事件上下文/* Store to->ctx before anything that stores to. Matches* barrier in aio_co_wake and qemu_co_mutex_wake.*/smp_wmb();ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);/* Queued coroutines are run depth-first; previously pending coroutines* run after those queued more recently.*/QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);switch (ret) {case COROUTINE_YIELD:break;case COROUTINE_TERMINATE:assert(!to->locks_held);trace_qemu_coroutine_terminate(to);coroutine_delete(to);break;default:abort();}}
}
3.3.移交协程控制权 - qemu_coroutine_yield()
将协程控制权转移给调用者
// util/qemu-coroutine.c
void coroutine_fn qemu_coroutine_yield(void)
{Coroutine *self = qemu_coroutine_self();Coroutine *to = self->caller;trace_qemu_coroutine_yield(self, to);if (!to) {fprintf(stderr, "Co-routine is yielding to no one\n");abort();}self->caller = NULL;qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}
--------------------------------------------------------------// util/coroutine-sigaltstack.c
CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,CoroutineAction action)
{CoroutineSigAltStack *from = DO_UPCAST(CoroutineSigAltStack, base, from_);CoroutineSigAltStack *to = DO_UPCAST(CoroutineSigAltStack, base, to_);CoroutineThreadState *s = coroutine_get_thread_state();int ret;s->current = to_;ret = sigsetjmp(from->env, 0);if (ret == 0) {siglongjmp(to->env, action);}return ret;
}
4.QEMU 中的 AIO
异步 I/O (Asynchronous I/O,AIO) 是对输入输出的一种处理方式,发起 I/O 请求的线程不等待 I/O 操作完成,就继续执行后续的代码,I/O 结果用其他方式通知发起 I/O 请求的程序
与异步 I/O 相对的是更为常见的同步/阻塞 I/O,发起 I/O 请求的线程不从正在调用的 I/O 操作函数返回(即被阻塞),直至 I/O 操作完成
QEMU 中的 AIO 用于处理主循环中发生的一系列事件,是 GSourceFuncs 的具体实现,从代码中可以看出,事件循环处理机制中的处理函数,在这里打包成完整的 GSourceFuncs
// util/async.c
static GSourceFuncs aio_source_funcs = {aio_ctx_prepare,aio_ctx_check,aio_ctx_dispatch,aio_ctx_finalize
};
4.1.创建 AIO 实例 - qemu_aio_context
QEMU 中定义了一个基本的 AIO 实例 qemu_aio_context
主循环初始化时创建并注册 aio-context(定义事件处理方法:GSourceFuncs),即 qemu_aio_context 是 Loop 中的一个 GSource:
// util/main-loop.c
static AioContext *qemu_aio_context;
------------------------------------// util/main-loop.c
int qemu_init_main_loop(Error **errp)
{...qemu_aio_context = aio_context_new(errp);|--> AioContext *ctx;|--> ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));src = aio_get_g_source(qemu_aio_context);g_source_set_name(src, "aio-context");g_source_attach(src, NULL);
--------------------------------------------------// system/runstate.c
int qemu_main_loop(void)
{...while (!main_loop_should_exit(&status)) {main_loop_wait(false);|--> ret = os_host_main_loop_wait(timeout_ns);|--> g_main_context_acquire(context);|--> glib_pollfds_fill(&timeout);|--> g_main_context_prepare(context, &max_priority);|--> n = g_main_context_query(context, max_priority, &timeout, pfds, glib_n_poll_fds);|--> ret = qemu_poll_ns((GPollFD *)gpollfds->data, gpollfds->len, timeout);|--> glib_pollfds_poll();|--> if (g_main_context_check(context, max_priority, pfds, glib_n_poll_fds))|--> g_main_context_dispatch(context);|--> g_main_context_release(context);
4.2.注册需要监听的 FD - aio_set_fd_handler()
通过 aio_set_fd_handler() 在 qemu_aio_context 中注册需要监听的 fd
// util/main-loop.c
int qemu_init_main_loop(Error **errp)
{...ret = qemu_signal_init(errp);|--> qemu_set_fd_handler(sigfd, sigfd_handler, NULL, (void *)(intptr_t)sigfd);|--> aio_set_fd_handler(iohandler_ctx, fd, fd_read, fd_write, NULL, NULL, opaque);|--> new_node = g_new0(AioHandler, 1);|--> new_node->pfd.fd = fd;|--> g_source_add_poll(&ctx->source, &new_node->pfd);|--> new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);|--> new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);|--> QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
监听到 fd 发生变化(相关事件发生)时调用 dispatch() 方法
aio_dispatch_handler() 会查看该 fd 发生的具体事件(read/write)然后调用对应的处理函数
// util/async.c
static GSourceFuncs aio_source_funcs = {aio_ctx_prepare,aio_ctx_check,aio_ctx_dispatch,aio_ctx_finalize
};
--------------------------------------------------// util/async.c
static gboolean
aio_ctx_dispatch(GSource *source,GSourceFunc callback,gpointer user_data)
{AioContext *ctx = (AioContext *) source;assert(callback == NULL);aio_dispatch(ctx);|--> aio_bh_poll(ctx);|--> aio_dispatch_handlers(ctx);|--> progress = aio_dispatch_handler(ctx, node) || progress;return true;
}
--------------------------------------------------// util/aio-posix.c
static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
{...if (!QLIST_IS_INSERTED(node, node_deleted) &&(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&node->io_read) {node->io_read(node->opaque);/* aio_notify() does not count as progress */if (node->opaque != &ctx->notifier) {progress = true;}}if (!QLIST_IS_INSERTED(node, node_deleted) &&(revents & (G_IO_OUT | G_IO_ERR)) &&node->io_write) {node->io_write(node->opaque);progress = true;}
5.QEMU 中的 BH
下半部(Bottom Half, BH)的作用是向 aio_context 中插入一个需要在处理 aio_context 过程中进行的操作,类似于 Linux 内核的中断将部分操作放入中断下半部,例如,将引起睡眠的操作放入 kworker 中
BH 是 AIO Context 中的一部分,通过 Notifier 调用 BH 执行,AIO 的 Notifier 其实就是 AIO Context 中内置的一个 EventFD
// util/async.c
AioContext *aio_context_new(Error **errp)
{...aio_set_event_notifier(ctx, &ctx->notifier,aio_context_notifier_cb,aio_context_notifier_poll,aio_context_notifier_poll_ready);
------------------------------------------------------------// util/aio-posix.c
void aio_set_event_notifier(AioContext *ctx,EventNotifier *notifier,EventNotifierHandler *io_read,AioPollFn *io_poll,EventNotifierHandler *io_poll_ready)
{aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),(IOHandler *)io_read, NULL, io_poll,(IOHandler *)io_poll_ready, notifier);
}
------------------------------------------------------------// util/event_notifier-posix.c
int event_notifier_get_fd(const EventNotifier *e)
{return e->rfd;
}
5.1.AIO 主动通知 - aio_notify()
主动调用 Notifier 通知其他模块通过 aio_notify() 实现, 通过向 EventNotifier 的 eventFD 中写 1 触发 Poll()
// include/qemu/event_notifier.h
struct EventNotifier {int rfd;int wfd;bool initialized;
--------------------------------------------// util/async.c
void aio_notify(AioContext *ctx)
{...if (qatomic_read(&ctx->notify_me)) {event_notifier_set(&ctx->notifier);
--------------------------------------------// util/event_notifier-posix.c
int event_notifier_set(EventNotifier *e)
{...do {ret = write(e->wfd, &value, sizeof(value));} while (ret < 0 && errno == EINTR);
5.2.AIO 被动通知 - aio_context_notifier_cb()
AIO Context 产生事件时触发对应的回调函数
// util/async.c
AioContext *aio_context_new(Error **errp)
{...aio_set_event_notifier(ctx, &ctx->notifier,aio_context_notifier_cb,aio_context_notifier_poll,aio_context_notifier_poll_ready);
------------------------------------------------------------// util/async.c
static void aio_context_notifier_cb(EventNotifier *e)
{AioContext *ctx = container_of(e, AioContext, notifier);event_notifier_test_and_clear(&ctx->notifier);
}
------------------------------------------------------------// util/async.c
static bool aio_context_notifier_poll(void *opaque)
{EventNotifier *e = opaque;AioContext *ctx = container_of(e, AioContext, notifier);...return qatomic_read(&ctx->notified);
}
5.3.向 AIO Context 中插入 BH - aio_bh_schedule_oneshot()
aio_bh_schedule_oneshot() 向 AIO Context 中插入一个 BH,同时使用 Notifier 通知该 Context
// include/block/aio.h
void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque,const char *name);
-------------------------------------------------------------------// include/block/aio.h
#define aio_bh_schedule_oneshot(ctx, cb, opaque) \aio_bh_schedule_oneshot_full((ctx), (cb), (opaque), (stringify(cb)))
-------------------------------------------------------------------// util/async.c
void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb,void *opaque, const char *name)
{QEMUBH *bh;bh = g_new(QEMUBH, 1);*bh = (QEMUBH){.ctx = ctx,.cb = cb,.opaque = opaque,.name = name,};aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
}
-------------------------------------------------------------------// util/async.c
static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
{AioContext *ctx = bh->ctx;unsigned old_flags;/** Synchronizes with atomic_fetch_and() in aio_bh_dequeue(), ensuring that* insertion starts after BH_PENDING is set.*/old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);if (!(old_flags & BH_PENDING)) {/** At this point the bottom half becomes visible to aio_bh_poll().* This insertion thus synchronizes with QSLIST_MOVE_ATOMIC in* aio_bh_poll(), ensuring that:* 1. any writes needed by the callback are visible from the callback* after aio_bh_dequeue() returns bh.* 2. ctx is loaded before the callback has a chance to execute and bh* could be freed.*/QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);}aio_notify(ctx);
AIO Context 执行 Dispatch 方法的同时,调用 aio_bh_call() 通知 BH 回调
// util/async.c
aio_ctx_dispatch()|--> aio_dispatch()|--> aio_bh_poll()|--> aio_bh_call()
--------------------------------------// util/async.c
void aio_bh_call(QEMUBH *bh)
{...bh->cb(bh->opaque);