纯C协程框架NtyCo
原文是由写的,写的真的很好,原文链接:纯c协程框架NtyCo实现与原理-CSDN博客
1.为什么会有协程,协程解决了什么问题?
网络IO优化
在CS,BS的开发模式下,服务器的吞吐量是一个受关注的参数,吞吐量等于1秒内业务处理的次数,那么这个业务处理其实是 由 网络IO事件 + 业务处理时间 组成的。 业务不同处理时间也就不同,但是网络IO时间是可以进行优化的。
也就是说,如何提升recv和send的性能?以epoll管理百万长连接为例,测试IO同步操作与异步操作的性能差别。
对于响应式服务器来说,所有客户端的操作都是源于这个大循环,对于服务器处理网络IO,有两种方式。第一种,IO同步;第二种,IO异步。
IO同步 操作性能测试
对于IO同步操作来说,handle(sockfd) 函数内部实现如下
同步:检测 IO 与 读写 IO 在同一个流程中
测试出来,每一千个连接,耗时7.5秒左右。
优点:
1.sockfd 管理方便
2.代码逻辑清晰
缺点:
1.服务器程序依赖 epoll_wait 的循环,响应速度慢。
2.程序性能差
IO异步 操作性能测试
对于IO 异步操作来说,将任务push到线程池中,有其他线程进行读写。
异步:检测 IO 与 读写 IO 不在同一个流程中
IO操作与epoll_wait不在一个处理流程中,实现了解耦,这是IO异步操作,每一千个连接耗时2.5秒左右
优点:
1.子模块好规划
2.程序性能高
缺点:
1.管理fd麻烦,需要避免一个fd被多个线程操作的情况发生。
协程的诞生
对比项 | IO 同步操作 | IO 异步操作 | 协程 |
sockfd 管理 | 管理方便 | 多个线程共同管理 | 管理方便 |
代码逻辑 | 程序整体逻辑清晰 | 子模块逻辑清晰 | 程序整体逻辑清晰 |
程序性能 | 响应时间长,性能差 | 响应时间短,性能好 | 响应时间短,性能好 |
从上面我们知道了IO同步操作,写代码逻辑清晰,但是效率低;而IO异步操作,fd管理复杂,但是效率高。由此,协程便出现了。
协程:把两者结合起来,以同步的编程方式,实现异步的性能。
即写代码的时候,同步;运行的逻辑,异步。
2.原语
yield()
让出,将当前的执行流程让出,让出给调度器。
那么什么时候需要yield让出呢?很明显在recv之前,send之前,也就是在io之前,因为我们不知道io是否准备就绪了,所以我们先将fd加入epoll中,然后yield让出,将执行流程给调度器运行。
schedule
schedule调度器做什么事情呢?调度器就是io检测,调度器就是不断的调用epoll_wait,来检测哪些fd准备就绪了,然后就恢复相应fd的执行流程执行现场
。注意schedule不是原语,schedule是调度器。
resume()
从上面我们得知恢复是被schedule恢复的,那么现在恢复到了原来流程的哪里呢?其实是恢复到了yield的下一条代码处
。通常下面的代码都会将fd从epoll中移除,然后执行recv或send操作,因为一旦被resume,就说明肯定是准备就绪的。
如何实现yield和resume
- yield :从io操作流程切换到调度器流程(让出)
- resume : 从调度器流程切换到io操作流程
可以基于以下方法实现yield和resume:
1.setjmp/longjmp
#include <stdio.h>
#include <setjmp.h>jmp_buf env; // 定义一个jmp_buf类型的变量env,用于保存跳转环境void func(int arg) {printf("func: %d\n", arg);longjmp(env, ++arg); // 使用longjmp函数跳转回之前设置的环境,并传递增加后的参数值
}int main() {int ret = setjmp(env); // 调用setjmp函数,将当前环境保存到env中,并返回0if (ret == 0) { // 如果setjmp返回0,表示这是第一次调用setjmpfunc(ret);} else if (ret == 1) { // 如果setjmp返回1,表示这是通过longjmp跳转回来的func(ret);} else if (ret == 2) { // 如果setjmp返回2,表示这是通过longjmp跳转回来的func(ret);} else if (ret == 3) { // 如果setjmp返回3,表示这是通过longjmp跳转回来的func(ret);}return 0;
}
2.ucontext
void func1(void) {while (count ++ < 30) {printf("1\n");//swapcontext(&ctx[0], &ctx[1]); // 注释掉的代码:交换上下文,从ctx[0]切换到ctx[1]swapcontext(&ctx[0], &main_ctx); // 实际执行的代码:交换上下文,从ctx[0]切换到main_ctx,即主程序的上下文printf("4\n");}}
// coroutine2
void func2(void) {while (count ++ < 30) {printf("2\n");//swapcontext(&ctx[1], &ctx[2]);swapcontext(&ctx[1], &main_ctx); // 注释掉的代码:将当前上下文ctx[1]切换到上下文ctx[2]printf("5\n"); // 将当前上下文ctx[1]切换到主上下文main_ctx}
}// coroutine3
void func3(void) {while (count ++ < 30) {printf("3\n");//swapcontext(&ctx[2], &ctx[0]);swapcontext(&ctx[2], &main_ctx); // 注释掉的代码:将当前上下文ctx[2]切换到上下文ctx[0]printf("6\n"); // 将当前上下文ctx[2]切换到主上下文main_ctx}
}char stack1[2048] = {0}; // 定义三个栈,每个栈大小为2048字节,并初始化为0char stack2[2048] = {0};char stack3[2048] = {0};getcontext(&ctx[0]); // 获取当前上下文并保存到ctx[0]ctx[0].uc_stack.ss_sp = stack1; // 设置ctx[0]的栈指针为stack1ctx[0].uc_stack.ss_size = sizeof(stack1); // 设置ctx[0]的栈大小为stack1的大小ctx[0].uc_link = &main_ctx; // 设置ctx[0]的链接上下文为main_ctx,当ctx[0]执行完毕后,会切换到main_ctxmakecontext(&ctx[0], func1, 0); // 创建一个新的上下文ctx[0],并指定其执行的函数为func1,参数个数为0getcontext(&ctx[1]);ctx[1].uc_stack.ss_sp = stack2;ctx[1].uc_stack.ss_size = sizeof(stack2);ctx[1].uc_link = &main_ctx;makecontext(&ctx[1], func2, 0);getcontext(&ctx[2]);ctx[2].uc_stack.ss_sp = stack3;ctx[2].uc_stack.ss_size = sizeof(stack3);ctx[2].uc_link = &main_ctx;makecontext(&ctx[2], func3, 0);printf("swapcontext\n");//int i = 30;while (count <= 30) { // schedulerswapcontext(&main_ctx, &ctx[count%3]);}
3.用汇编代码自己实现切换
//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);// yield让出
void nty_coroutine_yield(nty_coroutine *co) {_switch(&co->sched->ctx, &co->ctx);
}// resume协程恢复执行
int nty_coroutine_resume(nty_coroutine *co) {//...nty_schedule * sched = nty_coroutine_get_sched();sched->curr_thread = co;_switch(&co->ctx, &co->sched->ctx);//...
}
如何从一个协程切换到另一个协程呢?我们只需要将当前协程的上下文从寄存器组中保存下来;将下一个要运行的协程的上下文放到寄存器组上去,即可实现协程的切换。
3.切换
寄存器介绍
下面介绍的都是x86_64的寄存器。
- %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数…(这里我们只需关注%rdi和%rsi)
- %rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者使用规则,简单说就是随便用,调用子函数之前要备份它,以防他被修改
- new_ctx是一个指针,指向一块内存,它现在存在%rid里面,同理cur_ctx存在%rsi里面
- %rsp代表栈顶,%rbp代表栈底,%eip代表cpu下一条待取指令的地址(这也就是为什么resume之后会接着运行代码流程的原因)
//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
汇编实现切换
//寄存器 cpu上下文
typedef struct _nty_cpu_ctx {void *rsp;//栈顶void *rbp;//栈底void *eip;//CPU通过EIP寄存器读取即将要执行的指令void *edi;void *esi;void *rbx;void *r1;void *r2;void *r3;void *r4;void *r5;
} nty_cpu_ctx;//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
//默认x86_64
__asm__(
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) # save eip \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx # restore rbx,r12-r15 \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) # restore eip \n"
" ret # 出栈,回到栈指针,执行eip指向的指令。\n"
);
上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别mov 到相对应的寄存器上。此时上下文完成切换。
4.协程的运行流程
协程如何使用,协程的api
在网络IO编程的时候,如果每次accept返回的时候,为新来的fd单独分配一个线程,这一个fd对应一个线程,就不会存在多个线程共用一个fd的问题了,虽然这样代码逻辑清晰易读,但是这是无稽之谈,线程创建与线程调度的代价是很大的
但是如果把线程换成协程,线程API的思维来使用协程,那不就可行了吗?
NtyCo封装了两类接口
- 协程本身的api
-
//创建协程 int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg); //调度器运行 void nty_schedule_run(void);
- posix api的异步封装协程api
-
//POSIX 异步封装 API int nty_socket(int domain, int type, int protocol);int nty_accept(int fd, struct sockaddr *addr, socklen_t *len);ssize_t nty_recv(int fd, void *buf, size_t len, int flags);ssize_t nty_send(int fd, const void *buf, size_t len, int flags);int nty_close(int fd);int nty_connect(int fd, struct sockaddr *name, socklen_t len);ssize_t nty_recvfrom(int fd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);ssize_t nty_sendto(int fd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
协程工作流程
创建协程
int nty_coroutine_create(nty_coroutine **new_co,proc_coroutine func,void *arg);
- nty_coroutine **new_co:需要传入空的协程对象,这个对象是由内部创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。
- proc_coroutine func:协程的子过程。当协程被调度的时候,就会执行该函数
- void *arg : 需要传入到新协程子过程的参数。
协程不存在亲属关系,都是一致的调度关系,接受调度器的调度。调用 create API就会创建一个新协程,新协程就会加入到调度器的就绪队列中。
回调协程的子过程
在 create 协程后,何时回调子过程?何种方式回调子过程?我们知道CPU的EIP寄存器就是存储cpu下一条指令的地址,我们可以把回调函数的地址存储到 EIP 中。这样在resume之后,就会执行协程的子过程了。
// eip 执行入口
static void _exec(void *lt) {nty_coroutine *co = (nty_coroutine *) lt;co->func(co->arg);
}
// 初始化协程栈
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **) (co->stack + co->stack_size);stack[-3] = NULL;stack[-2] = (void *) co;//设置参数co->ctx.rsp = (void *) stack - (4 * sizeof(void *));co->ctx.rbp = (void *) stack - (3 * sizeof(void *));co->ctx.eip = (void *) _exec;//设置回调函数入口co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
协程封装posix api异步原理
在send与recv 调用的时候,如何实现异步操作?
在进行 IO 操作(recv,send)之前,先执行了 epoll_ctl 的 del 操作,将相应的 sockfd 从 epfd中删除掉,在执行完 IO 操作(recv,send)再进行 epoll_ctl 的 add 的动作。这段代码看起来似乎好像没有什么作用。
如果是在多个上下文中,这样的做法就很有意义了。能够保证 sockfd 只在一个上下文中能够操作 IO 的。不会出现在多个上下文同时对一个 IO 进行操作的。协程的 IO 异步操作正式是采用此模式进行的。
// 创建协程recv接口
ssize_t nty_recv(int fd, void *buf, size_t len, int flags) {struct epoll_event ev;ev.events = POLLIN | POLLERR | POLLHUP;ev.data.fd = fd;//加入epoll,然后yieldnty_epoll_inner(&ev, 1, 1);//resumessize_t ret = recv(fd, buf, len, flags);return ret;
}
// 加入epoll,更改状态,加入wait集合,然后yield与resume
static int nty_epoll_inner(struct epoll_event *ev, int ev_num, int timeout) {nty_schedule * sched = nty_coroutine_get_sched();nty_coroutine *co = sched->curr_thread;int i;for (i = 0; i < ev_num; i++) {epoll_ctl(sched->epfd, EPOLL_CTL_ADD, ev->data.fd, ev);co->events = ev->events;//加入wait集合,添加wait状态nty_schedule_sched_wait(co, ev->data.fd, ev->events, timeout);}//yieldnty_coroutine_yield(co);for (i = 0; i < ev_num; i++) {epoll_ctl(sched->epfd, EPOLL_CTL_DEL, ev->data.fd, ev);//移除wait集合,移除wait状态nty_schedule_desched_wait(ev->data.fd);}return ev_num;
}
一个简单的使用案例
可以看到,我们编写代码只需以同步的编程方式,就能实现异步的性能了。
#include "nty_coroutine.h"
#include <arpa/inet.h>void server_reader(void *arg) {int fd = *(int *) arg;ssize_t ret;struct pollfd fds;fds.fd = fd;fds.events = POLLIN;while (1) {char buf[1024] = {0};ret = nty_recv(fd, buf, 1024, 0);if (ret > 0) {nty_send(fd, buf, strlen(buf), 0);}else if (ret == 0) {nty_close(fd);break;}}
}void server(void *arg) {unsigned short port = *(unsigned short *) arg;int fd = nty_socket(AF_INET, SOCK_STREAM, 0);if (fd < 0) return;struct sockaddr_in local, remote;local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;bind(fd, (struct sockaddr *) &local, sizeof(struct sockaddr_in));listen(fd, 128);while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = nty_accept(fd, (struct sockaddr *) &remote, &len);printf("new client comming\n");nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}
}int main(int argc, char *argv[]) {nty_coroutine *co = NULL;unsigned short port = 8080;nty_coroutine_create(&co, server, &port);nty_schedule_run(); //runreturn 0;
}
5.协程 与 调度器 结构体定义
协程定义
一个协程会有哪些状态呢?如果协程sleep了,那么就是睡眠状态,如果协程刚创建出来,那它肯定是就绪状态,如果协程在等待数据的到来,那就是等待状态。这里这里定义协程的三个运行状态{就绪,睡眠,等待}。
- 新创建的协程,加入就绪集合等待调度
- io未就绪的协程,加入等待集合等待epoll_wait
- 有sleep操作的协程,加入睡眠集合
- 就绪集合没有设置优先级,所以在就绪集合里面的协程优先级一样,那么就可以用队列来存储,先进先出
- 等待集合就是等待IO准备就绪,这个等待IO是有时间长短的,这里用红黑树来存储
- 睡眠集合需要按照睡眠时间的长短进行唤醒,所以也用红黑树存储,key为睡眠时长
我们描述了每一个协程有自己的上下文环境,需要保存 CPU 的寄存器 ctx;需要有子过程的回调函数 func;需要有子过程回调函数的参数 arg;需要定义自己的栈空stack;需要有自己栈空间的大小 stack_size;需要定义协程的创建时间birth;需要定义协程当前的运行状态 status;需要定当前运行状态的结点(ready_next, wait_node, sleep_node);需要定义协程 id;需要定义调度器的全局对象 sched。
typedef struct _nty_coroutine {//cpu ctxnty_cpu_ctx ctx;// funcproc_coroutine func;void *arg;// create timeuint64_t birth;//stackvoid *stack;size_t stack_size;size_t last_stack_size;//statusnty_coroutine_status status;//rootnty_schedule *sched;//co iduint64_t id;//fd eventint fd;uint16_t events;//sleep timeuint64_t sleep_usecs;//setRB_ENTRY(_nty_coroutine) sleep_node;RB_ENTRY(_nty_coroutine) wait_node;TAILQ_ENTRY(_nty_coroutine) ready_node;
} nty_coroutine;
调度器定义
每个协程所需要使用的,而且不同的,就是协程的属性,那么每个协程所需要的,且相同的,就是调度器的属性。用来管理所有协程的属性,作为调度器的属性。调度器的属性,需要有保存 CPU 的寄存器上下文 ctx,可以从协程运行状态yield 到调度器运行的。从协程到调度器用 yield,从调度器到协程用 resume。
typedef struct _nty_schedule {// create timeuint64_t birth;//cpu ctxnty_cpu_ctx ctx;//stack_sizesize_t stack_size;//coroutine numint spawned_coroutines;//default_timeoutuint64_t default_timeout;//当前调度的协程struct _nty_coroutine *curr_thread;//页大小int page_size;//epoll fdint epfd;//线程通知相关,暂未实现int eventfd;//eventsstruct epoll_event eventlist[NTY_CO_MAX_EVENTS];int num_new_events;//setnty_coroutine_queue ready;nty_coroutine_rbtree_sleep sleeping;nty_coroutine_rbtree_wait waiting;
} nty_schedule;
调度的策略
调度器的实现,有两种方案
1.生产者消费者模式
2.多状态运行
生产者消费者模式
多状态运行
7.协程api 与 hook
需要封装为异步的posix api 分析
所有对io的操作,我们都需要取重新封装一遍。为什么不能用posix api,而是我们需要再去封装一次呢?比如我们调用recv的时候,如果我们调用系统的,那么这个fd怎么yield到调度器上呢,所以我们需要在posix api的基础上封装,当然有些接口需要封装,有些不需要。
就像下面的伪代码一样,从同步的recv变成异步的ney_recv
//伪代码
ney_recv(){epoll add fd;yield();epoll del fd;recv(fd);
}
站在同步封装成异步的角度,如果不需要判断io是否就绪的这些api,则不需要封装为异步的。
需要封装的api,这些api在实现的时候,皆采用上面伪代码的策略
1. accept()
2. connect()
3. recv()
4. read()
5. send()
6. write()
7. recvfrom()
8. sendto()
不需要封装的api,这些api因为不会引起阻塞,所以不用封装。
socket()
listen()
close()
fcntl()
setsockopt()
getsockopt()
hook
hook提供了两个接口;1. dlsym()是针对系统的,系统原始的api。2. dlopen()是针对第三方的库。
connect_f = dlsym(RTLD_NEXT, "connect");
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <dlfcn.h>
#include<mysql/mysql.h>
//
// Created by 68725 on 2022/7/17.
//typedef int (*connect_t)(int, struct sockaddr *, socklen_t);connect_t connect_f;typedef ssize_t (*recv_t)(int, void *buf, size_t, int);recv_t recv_f;typedef ssize_t (*send_t)(int, const void *buf, size_t, int);send_t send_f;typedef ssize_t (*read_t)(int, void *buf, size_t);read_t read_f;typedef ssize_t (*write_t)(int, const void *buf, size_t);write_t write_f;int connect(int fd, struct sockaddr *name, socklen_t len) {printf("in connect\n");return connect_f(fd, name, len);
}ssize_t recv(int fd, void *buf, size_t len, int flags) {printf("in recv\n");return recv_f(fd, buf, len, flags);
}ssize_t send(int fd, const void *buf, size_t len, int flags) {printf("in send\n");return send_f(fd, buf, len, flags);
}
ssize_t read(int fd, void *buf, size_t len) {printf("in read\n");return read_f(fd, buf, len);
}ssize_t write(int fd, const void *buf, size_t len) {printf("in write\n");return write_f(fd, buf, len);
}static int init_hook() {connect_f = dlsym(RTLD_NEXT, "connect");recv_f = dlsym(RTLD_NEXT, "recv");send_f = dlsym(RTLD_NEXT, "send");read_f = dlsym(RTLD_NEXT, "read");write_f = dlsym(RTLD_NEXT, "write");
}void main() {init_hook();MYSQL *m_mysql = mysql_init(NULL);if (!m_mysql) {printf("mysql_init failed\n");return;}if (!mysql_real_connect(m_mysql, "192.168.109.1", "root", "123456", "cdb", 3306, NULL, 0)) {printf("mysql_real_connect failed\n");return;}else {printf("mysql_real_connect success\n");}
}
//gcc -o hook hook.c -lmysqlclient -I /usr/include/mysql/ -ldl
8.多核模式
解决协程多核的问题有两种方式,多线程/多进程 与 CPU核心做亲和性。
- 多进程(实现起来容易,对协程代码本身不用去改)
- 多线程(复杂,需要对调度器进行加锁)
那么做多线程对调度器进行加锁,锁放在哪呢?锁放在调度器结构体里面,因为调度器是全局唯一的,那么要锁哪里呢,很明显,<取协程,恢复协程>,这里需要加锁。