从 C 到 C++20 协程编写方法的演变。第一部分:函数 + 宏 = 协程
大家好!我是大聪明-PLUS!
在我看来,协程是 C++20 标准中最难用的新特性。尽管网上有很多关于这个主题的文章(例如,一、二、三),以及大量的会议演示视频,但由于多种原因,学习 C++20 中的协程仍然面临诸多挑战。
首先,C++20 没有专门的协程,只有一个实现它们的框架。这个框架包含相当多的函数。这为开发人员提供了一个灵活的工具,可以根据他们的需求进行调整。然而,对于那些只想用协程编写代码而不深入研究其实现细节的开发人员来说,这带来了额外的挑战。
其次,关于协程如何工作的各种技术细节,出现了许多问题。例如,如何实现协程的暂停?它的状态是什么?如何保存?协程如何从中断的地方恢复?何时应该使用协程?何时它们会使代码更具可读性和运行速度更快?协程和多线程可以在同一个程序中共存吗?是否应该像在多线程代码中那样对协程使用同步原语?
我花了大量时间寻找这些问题的答案,发现早在 C++20 之前,使用协程进行编程就已经是可行的。你甚至不必使用 C++,用 C 就可以开发。现在,我想与读者分享我的经验。我希望这些经验对 C++ 和其他语言的开发者有所帮助,他们和我一样,总是试图从内部理解某项技术的实现方式。实现这种理解的最佳方法是尝试自己创建协程背后的机制。由于我积累了大量的资料,我计划撰写一系列文章,尝试回答这些问题并分享我的解决经验。那么,让我们开始吧。
让我们先回顾一下定义。协程是一个专门组织起来的软件模块,它使用协作式多任务处理原则与其他模块交互:一个模块在某个时间点暂停,保存其完整状态(包括调用堆栈和程序计数器),然后将控制权移交给另一个模块。另一个模块则执行该任务并返回控制权,同时保留其状态。在应用于协程时,可以使用术语“延续”来代替“状态”——程序在特定时间点状态的抽象表示,可以保存并用于转换到该状态。
因此,理解延续的实现原理是编写协程的关键。因此,本文及后续文章将详细探讨这些原理。使用哪种编程语言的问题就变得次要了。如果你能实现延续,那么你就可以使用协程编写代码。
如果你深入研究该理论,你会发现有几种类型的延续。最常见的是无界延续,它表示程序在特定时刻的状态。调用这种延续与调用函数不同,因为它对应于到已保存的程序状态的转换,并且不返回任何值。这种延续通常不能被多次调用。
还有所谓的有界延续,它抽象了程序块结果对该块子表达式结果的依赖关系。使用有界延续比无界延续需要存储的信息更少。这样的延续可以像函数一样使用,并且可以多次调用。
C 和 C++ 不内置堆栈保留支持。因此,要实现无界延续,必须借助内联汇编代码。下次再讨论这个主题。
协议线程 (Protothread) 是一种不使用操作系统线程机制的软件处理单元。它是一种协程实现。它以轻量级线程的形式运行,不使用堆栈来存储程序状态。因此,这项任务落到了程序员身上。可以使用全局变量(这通常不是一个好的做法)或用户定义的数据结构来实现。协议线程不可抢占,因此上下文切换只能在阻塞操作期间进行。C 和 C++ 中存在用于实现协议线程的库。本文将讨论协议线程库。它主要用于为内存有限的系统(例如基于微控制器的系统)编写程序。
对于 C++ 程序员来说,坏消息是整个库都是使用宏实现的,正如 Scott Meyers 的建议,应尽可能避免使用宏。然而,在这种情况下,我们仍然需要使用宏,因为与简单函数不同,宏只能使用标准 C 语言结构来更改程序的控制流。重要的是要理解,protothread 在单个 C 函数内运行,不能跨越其他函数。protothread 函数声明如下:
#define PT_THREAD(name_args) char name_args 
name_args 变量将被替换为线程执行的函数名称及其参数。返回一个 char 变量,其值反映传递给 PT_THREAD 宏的函数的当前状态。有四种状态可用:
-  
PT_WAITING //等待某些条件发生
 -  
PT_EXITED //protothread 退出
 -  
PT_ENDED //函数已结束
 -  
PT_YIELDED //返回此值表示,遵循协作多任务原则,协议线程将处理器时间让给调用代码
 
协议线程 API 包含四个基本操作:初始化本地延续 (PT_INIT)、执行协议线程 (PT_BEGIN)、从线程返回并保留其状态 (PT_YIELD) 以及终止线程 (PT_END)。此外,还有针对 PT_WAIT_UNTIL 和 PT_WAIT_WHILE 条件的锁,以及针对协议线程的锁 (PT_WAIT_THREAD)。您还可以等待子协议线程执行 (PT_WAIT_THREAD 和 PT_SPAWN)、重新启动线程 (PT_RESTART) 以及使用 PT_SCHEDULE 调度线程。
原型线程通过重复调用其所在函数来管理。每次调用该函数时,线程都会运行,直到锁定或终止。因此,线程管理由使用它的应用程序执行。
让我们看一下基本 protothreads 数据类型的实现。
typedef unsigned short lc_t; // 用于存储状态的变量,该状态是本地延续正确运行所需的信息。存储首次或重复调用时要跳转到的protoflow源代码的行号。
struct pt { //структура хранящая состояние
  lc_t lc; };
现在让我们看看如何实现辅助宏来与 protothreads API 协同工作。
为使用其他语言编写代码的程序员提供帮助。C和 C++ 具有一种称为预处理器的机制。它的作用是在编译程序源代码之前对其进行处理。预处理器有其自己的指令,特别是 #define,它指定要替换的文本,例如 #define PI 3.14。程序文本使用符号名 PI。预处理本质上是文本替换,其结果是程序中所有位置的 PI 都将替换为值 3.14。
宏的实现很简单,具有以下形式
#define LC_INIT(s) s = 0; 
此宏用于将上面讨论的 lc 变量初始化为零。Protothreads 中宏的主要目的是使用 switch-case 结构将状态组合成有限状态机。LC_INIT 调用是必需的,以确保进入到 switch 状态(代码中标记为 case 0)的第一次转换。
#define LC_RESUME(s) switch(s) { case 0:
LC_RESUME 用于将开关块和机器的初始状态嵌入到流函数中。
#define LC_SET(s) s = __LINE__ ; case __LINE__:
LC_SET 用于根据程序逻辑生成剩余状态。pt 结构中的变量 lc(局部延续)用作输入参数 s。该宏的目的是使用源代码的当前行号(__LINE__ 宏)初始化 lc,并使用 case 生成有限状态机的新状态。稍后我们将通过一个具体示例讨论其用途。
#define LC_END(s)  }
LC_END 将 switch 的右括号嵌入到 protothread 函数代码中。
现在我们来看看protothreads API的主要宏。
#define PT_INIT(pt)   LC_INIT((pt)->lc)
PT_INIT 将本地延续 lc 初始化为 0。
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)
PT_BEGIN 初始化 PT_YIELD_FLAG 变量,该变量用于使用协作多任务将控制转移到另一个函数,并且还将 switch case 0 块嵌入到代码中。
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                                     PT_INIT(pt); return PT_ENDED; }
为 switch 语句插入一个右括号,将 PT_YIELD_FLAG 重置为 0,将 lc 设置为 0,并返回 PT_ENDED,表示原始线程已执行完毕。之后可以再次调用该线程,并从零状态开始执行。
最复杂的多行宏 PT_YIELD 负责返回逻辑,同时保留局部延续。
#define PT_YIELD(pt)				\do {						\PT_YIELD_FLAG = 0;				\LC_SET((pt)->lc);				\if(PT_YIELD_FLAG == 0) {			\return PT_YIELDED;			\}						\} while(0)       
它将 lc 设置为线程源代码的行号,下一个 protothread 调用将跳转并返回 PT_YIELDED 代码,这表示协程尚未完成并可以再次调用。
为了理解上述宏是如何协同工作的,让我们使用 ProtoThreads 创建第一个协程。按照惯例,我们将用 C 语言编写一个斐波那契数列生成器。源代码位于代码库fibonacci.c 文件中。为了清晰起见,预处理器(命令 gcc -E fibonacci.c)的输出包含在源代码旁边(左栏)。
|   
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  |       
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  | 
创建此结构的动机是希望使代码更加通用,并避免使用静态变量。
变量 task.a 和 task.b 最初存储第一个和第二个斐波那契数(分别为 0 和 1),并用于进一步的计算。主函数使用 PT_INIT 宏初始化任务(包括局部延续),并将 lc 设置为 0(右侧列第 30 行)。
接下来,循环中调用 fibonacci 协程获取前 10 个数字,并显示结果。我们来看一下预处理后的协程代码(右侧列中的 8-20 行)。在第一次调用 fibonacci 之前,lc 的值为 0。进入函数后,将 PT_YIELD_FLAG 设置为 1,然后在 switch 语句中,使用 lc 的值跳转到 case 0,进而进入 while(1) 循环。
第 13 行包含 PT_YIELD 宏代码。首先,PT_YIELD_FLAG 被设置为 0,准备退出函数。接下来,lc 被设置为 17——PT_YIELD 宏所在的行号。此后,函数分支到 case 17。由于 PT_YIELD_FLAG 为 0,函数将控制权转移到 main 函数,并返回 PT_YIELDED 代码。
函数退出以这种方式完成,是因为第一个斐波那契数不需要计算;它是在主函数初始化 tsk 时设置的。在第二次调用 fibonacci 期间,PT_YIELD_FLAG 再次设置为 1,但 switch 语句跳转到 case 17,因为 lc 在上一次调用期间已设置为 17。这确保记住了协程的执行位置。if(PT_YIELD_FLAG == 0) 条件语句失败,并执行计算下一个斐波那契数的代码行。之后,while(1) 循环跳转到顶部,将 PT_YIELD_FLAG 设置为 0 并退出函数。这就是获得第二个数字的方式。然后重复所述操作。
尽管斐波那契计算是在无限循环中进行的,但仍然需要在协程末尾插入 PT_END 宏。否则,程序将无法编译。
现在让我们看一个两个协程之间交互的例子,它实现了最简单的协作式多任务处理。该程序打印一个文本文件的内容,并插入行号。为了简单起见,我们假设文件行的长度限制为 256 字节。此外,为了简洁起见,我们省略了一些输入参数的验证检查。
1:#include <stdio.h>2:#include <stdlib.h>3:#include "lib/pt.h"4:5:#define LINE_LENGTH 2566:struct line_task {7:	unsigned long lc;8:	struct pt state;9:};10:11:struct file_task {12:	FILE* stream;13:	char* line;14:	int line_length;15:	struct pt state;16:};17:18:PT_THREAD(line_counter(struct line_task *tsk))19:{20:  PT_BEGIN(&tsk->state);21:  while(1) {22:	PT_YIELD(&tsk->state);23:	tsk->lc++;24:  }25:  26:  PT_END(&tsk->state);27:}28:29:PT_THREAD(line_reader(struct file_task *fltsk))30:{31:  PT_BEGIN(&fltsk->state);32:  while(1) {33:	if(!fgets(fltsk->line, fltsk->line_length, fltsk->stream))34:      	break;35:	PT_YIELD(&fltsk->state);36:  }37:  38:  PT_END(&fltsk->state);39:}40:41:int main(int argc, char *argv[])42:{43:   char line[LINE_LENGTH];   44:   struct file_task fltsk;45:46:   if (argc != 2) {47:        printf("Usage : %s <file_name>\n", argv[0]);48:        return EXIT_FAILURE;49:    }50:51:    fltsk.stream = fopen(argv[1], "r");52:    53:    if (!fltsk.stream) {54:        printf("File not found\n");55:        return EXIT_FAILURE;56:    }57:    fltsk.line = (char *)&line;58:    fltsk.line_length = sizeof(line);59:   60:    PT_INIT(&fltsk.state);61:    62:    struct line_task tsk;63:    tsk.lc=1;64:65:    PT_INIT(&tsk.state);66:67:    while(line_reader(&fltsk) != PT_ENDED) {68:        line_counter(&tsk);69:        printf("%lu:", tsk.lc);70:	  printf("%s", fltsk.line);71:    }	72:73:    fclose(fltsk.stream);74:    return EXIT_SUCCESS;75:} 
该程序的核心函数是 line_counter 和 line_reader 协程。line_counter 生成行号,每次调用时递增(第 21-24 行)。line_reader 每次调用时使用 fgets 函数逐行读取文件,直到到达文件末尾。
这两个协程背后的逻辑与上例相同。主函数循环调用 line_counter 和 line_reader (第 67-71 行)。如果 line_reader 协程尚未完成,则从文件中读取下一行,为其生成一个序列号,并显示该行。
在上面的示例中,协作式多任务处理是通过循环顺序调用协程来实现的。在更复杂的情况下,可以添加一个调度程序,该调度程序按顺序从队列中选择任务执行。与抢占式多任务处理相比,这种类型的多任务处理具有以下几个优势:
-  
如果所有协程都在同一个线程上运行,则无需使用同步原语来保护数据。
 -  
每个协议线程的内存开销较低。为局部变量分配内存基本上是程序员的责任。在多线程应用程序中,分配给堆栈的内存量以兆字节为单位。
 -  
线程之间的切换速度明显更快,因为它发生在用户空间,而不是像多线程程序那样发生在内核空间。因此,协程的数量可以远远超过线程的数量,达到数百甚至数千。
 -  
使用协程可以省去回调函数,回调函数在多线程开发中被广泛使用,并且会降低程序的可读性。
 
使用协程的一个明显缺点是锁定问题。如果协程等待某个事件(例如从文件读取),则整个程序将被暂停,直到该事件发生。
在某些情况下,可以通过对 I/O 操作使用非阻塞模式来避免这种阻塞。本文将研究在类 Unix 操作系统上使用此模式的一个例子。
默认情况下,Unix 系统上的所有文件描述符都以“阻塞”模式创建。这意味着 I/O 系统调用(例如 read 或 write)会阻塞程序执行,直到数据准备就绪。例如,在 stdin 上调用 read 时,后续执行将被阻塞,直到用户从键盘输入数据并由系统读取。其他文件描述符也是如此。有两种消除阻塞的方法,它们可以相互补充:
-  
非阻塞 I/O 模式。
 -  
使用特殊 API 进行多路复用,例如 select、epoll、io_uring。
 
在本文中,我们只讨论第一种方法。要将文件描述符设置为非阻塞模式,请使用 fcntl 函数将 O_NONBLOCK 标志添加到描述符的现有标志集中。
一旦设置了 O_NONBLOCK,描述符就会变为非阻塞状态。任何 I/O 系统调用(例如 read 和 write)如果没有数据都会阻塞。现在它们返回 -1。此外,全局 errno 变量可以设置为 EWOULDBLOCK 或 EAGAIN。利用这些知识,让我们编写一个程序,使用协程以非阻塞模式从标准输入读取一行。
1:#include <stdio.h>2:#include <stdlib.h>3:4:#include <unistd.h>5:#include <fcntl.h>6:#include <errno.h>7:8:#include "lib/pt.h"9:10:#define READ_SIZE 25611:12:struct read_task {13:	char* line;14:	int line_length;15:	int index;16:	ssize_t bytes_read;17:	struct pt state;18:};19:20:PT_THREAD(stdin_reader(struct read_task *rdtsk))21:{22:  PT_BEGIN(&rdtsk->state);23:  while(1) {24:	PT_YIELD(&rdtsk->state);25:	26:	rdtsk->bytes_read = read(STDIN_FILENO, &rdtsk->line[rdtsk->index], rdtsk->line_length);27:	28:	if((rdtsk->bytes_read < 0) && (errno != EAGAIN) && (errno != EWOULDBLOCK))29:		break;30:		31:	if(rdtsk->bytes_read >= 0)32:	{33:	rdtsk->index += rdtsk->bytes_read;34:	if(rdtsk->line[rdtsk->index-1] == '\n') {35:		rdtsk->line[rdtsk->index] = '\0';36:		break;37:	}38:    }39:  }40:  41:  PT_END(&rdtsk->state);42:}43:44:int set_stdin_nonblock_mode() {45:    46:    int flags = fcntl(STDIN_FILENO, F_GETFL, 0);47:    if (flags == -1) {48:        perror("Error fcntl getting flag F_GETFL\n");49:        return EXIT_FAILURE;50:    }51:    if (fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK) == -1) {52:        perror("Error fcntl setting flag F_SETFL\n");53:        return EXIT_FAILURE;54:    }55:    return EXIT_SUCCESS;56:}57:58:void init_reader(struct read_task rtask, char buffer, int size) {59:    rtask->line = buffer;60:    rtask->line_length = size;61:    rtask->index = 0;62:    PT_INIT(&rtask->state);63:}64:65:int main(void) {66:    char buffer[READ_SIZE];67:    struct read_task rtask;68:69:    if (set_stdin_nonblock_mode())70:    return EXIT_FAILURE;71:72:    init_reader(&rtask, (char *)&buffer, sizeof(buffer));73:74:     printf("Enter some string:\n");75:    do{76:    }while(stdin_reader(&rtask) != PT_ENDED);77:    78:    if(rtask.bytes_read >= 0)79:		printf("Read: %s \n", rtask.line);80:	else 81:		printf("Error reading from STDIN\n");82:83:  return EXIT_SUCCESS;84:} 
该程序在 set_stdin_nonblock_mode 函数中将 stdin 设置为非阻塞模式。为此,我们首先使用 fcntl 函数获取当前标志。接下来,我们为其添加 O_NONBLOCK 并使用 fcntl 写入更改。之后,在 init_reader 函数中,我们为 stdin_reader 协程准备数据结构。在协程主体(第 26 行)中,我们尝试通过调用 read 来读取数据。它返回读取的字节数,如果用户未输入任何内容或发生其他错误,则返回 -1。
在第 28 行,我们检查是否发生了数据不完整错误(errno 设置为 EWOULDBLOCK 或 EAGAIN)。如果 read 函数返回负值,且错误代码与数据不完整错误无关,则协程终止。如果读取了数据,数据会累积在缓冲区中(第 31-37 行),直到收到换行符。之后,协程终止。在缓冲区中累积输入字符是必要的,因为数据现在是逐渐接收的,而不是像阻塞模式那样一次性接收。与上例一样,数据累积以简化的方式实现。假设缓冲区溢出永远不会发生。
总而言之,我想指出使用协议线程实现协程的以下优点:
-  
使用 protothreads 允许您使用协程编写可移植代码,而无需使用汇编来保存协程状态。
 -  
创建和切换协程的开销较低,这对于资源受限的嵌入式系统尤其重要。
 -  
操作系统独立性。
 
我们可以强调以下明显的缺点:
1. 需要手动管理协程的状态并将控制权转移给调用代码。由于基于嵌入式宏的“特定”编程风格,最终的程序可能更难调试。
2. 该库的实现对使用 ProtoThreads 的代码引入了一个小限制:代码本身无法使用 switch 语句。不过,这个问题可以通过使用 gcc 编译器及其扩展来解决。
