RTT操作系统(2)
RTT操作系统(2)
本笔记为作者再学习RTT操作系统的一些心得体会,如有不对的地方,请包含与谅解!
————by wsoz
紧接上文的时钟同步,下面我们继续RTT的学习
线程间同步
线程间同步研究和解决的问题就是1. 共享资源竞争问题(同时对一个资源进行修改,导致资源发生错误) 2. 临界区保护(某些代码段必须==原子性执行==,不能被其他线程打断) 3. 线程执行顺序控制(对一些特定的线程按照特定的执行顺序进行控制) 4. 资源访问协调(有限资源【如串口、文件句柄】需要合理分配)
同步是指按预定的先后次序进行运行,线程同步是指多个线程通过特定的机制(如互斥量,事件对象,临界区)来控制线程之间的执行顺序,也可以说是在线程之间通过同步建立起执行顺序的关系,如果没有同步,那线程之间将是无序的。
多个线程操作 / 访问同一块区域(代码),这块代码就称为临界区,比如共享内存块就是临界区。线程互斥是指对于临界区资源访问的排它性。当多个线程都要使用临界区资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。线程互斥可以看成是一种特殊的线程同步。
RTT提供了三种线程间同步的方式:信号量(semaphore)、互斥量(mutex)、和事件集(event)
信号量
信号量是一种轻型的用于解决线程间同步问题的内核对象,线程可以获取或释放它,从而达到同步或互斥的目的。
下面为一些应用:
信号量实现同步是通过设置多个信号量,每一个线程对应修改下一个线程的信号量
信号量实现互斥是通过设置一个信号量值为1,然后使用完了,恢复信号量
锁,单一的锁常应用于多个线程间对同一共享资源(即临界区)的访问。信号量在作为锁来使用时,通常应将信号量资源实例初始化成 1,代表系统默认有一个资源可用,因为信号量的值始终在 1 和 0 之间变动,所以这类锁也叫做二值信号量。
资源计数,信号量是一个非负的值,我们可以利用获取或者释放信号量的方式实现资源计数器。资源计数适合于线程间工作处理速度不匹配的场合,这个时候信号量可以做为前一线程工作完成个数的计数,而当调度到后一线程时,它也可以以一种连续的方式一次处理多个事件。(比如采集传感器数据很快,我们处理很慢,此时就可以通过信号量来表示待处理的数据个数)。
中断与线程的同步,不是通过互斥二值信号量实现的,而是通过开关量(一个线程关,一个线程开),常见的就是通过中断触发了导致某一个线程需要执行,此时就释放信号量让需要执行的线程唤醒,然后在需要执行的线程中看有没有成功获取到信号量进而是否执行下面的内容,这就是典型的开关量实现方式。下面的FinSH线程就是常见的中断与线程的应用。
信号量工作机制
每个信号量对象都有一个信号量值和一个线程等待队列,信号量的值对应了信号量对象的实例数目、资源数目,假如信号量值为 5,则表示共有 5 个信号量实例(资源)可以被使用,当信号量实例数目为零时,再申请该信号量的线程就会被挂起在该信号量的等待队列上,等待可用的信号量实例(资源)。
==重点:==如果没有成功获取到信号量就会进入到挂起同时加入到该信号量的等待序列,当另一个线程释放了信号量之后,就会立刻进行检查有没有线程在该信号量的等待队列中,如果有线程就被唤醒,同时线程状态由挂起态变为就绪态。
信号量控制块
信号量控制块结构体原型如下:
struct rt_semaphore
{struct rt_ipc_object parent; /* 继承自 ipc_object 类 */rt_uint16_t value; /* 信号量的值 */
};
/* rt_sem_t 是指向 semaphore 结构体的指针类型 */
typedef struct rt_semaphore* rt_sem_t;
rt_semaphore 对象从 rt_ipc_object 中派生,由 IPC 容器所管理,信号量的最大值是 65535。
信号量使用
信号量控制块中含有信号量相关的重要参数,在信号量各种状态间起到纽带的作用。信号量常用的函数如下:
动态创建信号量
原型:
rt_sem_t rt_sem_create(const char *name, //信号量名rt_uint32_t value, //信号量值rt_uint8_t flag); //信号量标志, RT_IPC_FLAG_FIFO 或 RT_IPC_FLAG_PRIO
当调用这个函数时,系统将先从对象管理器中分配一个 semaphore 对象,并初始化这个对象,然后初始化父类 IPC 对象以及与 semaphore 相关的部分。
RT_IPC_FLAG_FIFO
(先进先出):那么等待线程队列将按照先进先出的方式排队,先进入的线程将先获得等待的信号量; RT_IPC_FLAG_PRIO
(优先级等待):等待线程队列将按照优先级进行排队,优先级高的等待线程将先获得等待的信号量。
注: RT_IPC_FLAG_FIFO
属于非实时调度方式,除非应用程序非常在意先来后到,并且你清楚地明白所有涉及到该信号量的线程都将会变为非实时线程,方可使用 RT_IPC_FLAG_FIFO
,否则建议采用 RT_IPC_FLAG_PRIO
,即确保线程的实时性。
返回值:
返回 | 描述 |
---|---|
RT_NULL | 创建失败 |
信号量的控制块指针(句柄) | 创建成功 |
动态信号量删除
原型:
rt_err_t rt_sem_delete(rt_sem_t sem); //信号量句柄
如果删除该信号量时,有线程正在等待该信号量,那么删除操作会先唤醒等待在该信号量上的线程(等待线程的返回值是 - RT_ERROR
),然后再释放信号量的内存资源。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 删除成功 |
初始化静态信号量
原型:
rt_err_t rt_sem_init(rt_sem_t sem, //静态(static)信号量句柄const char *name, //信号量名rt_uint32_t value, //信号量值rt_uint8_t flag) //信号量标志位,RT_IPC_FLAG_FIFO 或 RT_IPC_FLAG_PRIO
当调用这个函数时,编译时就会分配空间给我们的信号量,系统将对这个 semaphore 对象进行初始化,然后初始化 IPC 对象以及与 semaphore 相关的部分。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 初始化成功 |
静态信号量脱离
原型:
rt_err_t rt_sem_detach(rt_sem_t sem); //信号量句柄
如果删除该信号量时,有线程正在等待该信号量,那么删除操作会先唤醒等待在该信号量上的线程(等待线程的返回值是 - RT_ERROR
),然后再释放信号量的内存资源。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 脱离成功 |
信号量获取
原型:
rt_err_t rt_sem_take (rt_sem_t sem, rt_int32_t time); //信号量句柄 等待时间(时钟节拍)
等待时间宏: RT_WAITING_FOREVER
永久等待 RT_WAITING_NO
不等待立刻返回
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功获得信号量 |
-RT_ETIMEOUT | 超时依然未获得信号量 |
-RT_ERROR | 其他错误 |
无等待获取信号量
原型:
rt_err_t rt_sem_trytake(rt_sem_t sem); //信号量句柄
该函数相当于 rt_sem_take(sem, RT_WAITING_NO)
,直接不等待不加入等待序列,有信号量就直接返回成功,没有就返回失败
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功获得信号量 |
-RT_ETIMEOUT | 获取失败 |
信号量释放
原型:
rt_err_t rt_sem_release(rt_sem_t sem); //信号量句柄
当信号量的值等于零时,并且有线程等待这个信号量时,释放信号量将唤醒等待在该信号量线程队列中的第一个线程,由它获取信号量;否则将把信号量的值加 1。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功释放信号量 |
信号量应用
#include <rtthread.h>
//可自己尝试去除信号量锁,会发现有问题
#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>static uint16_t count_num=0;
rt_sem_t sem1=NULL;void my_thread1(void* param)
{while(1){rt_sem_take(sem1, RT_WAITING_FOREVER);for(int i = 0; i < 10000; i++) {count_num++; // 大量快速操作}rt_kprintf("thread1 batch->num=%d\r\n", count_num);rt_thread_mdelay(1);rt_sem_release(sem1);}
}void my_thread2(void* param)
{while(1){rt_sem_take(sem1, RT_WAITING_FOREVER);for(int i = 0; i < 10000; i++) {count_num--; // 大量快速操作}rt_kprintf("thread2 batch->num=%d\r\n", count_num);rt_thread_mdelay(1);rt_sem_release(sem1);}
}int main(void)
{rt_kprintf("||main start!||\r\n");//线程创建rt_thread_t thread1=NULL;thread1=rt_thread_create("thread1", my_thread1, NULL, 512, 10, 20);static struct rt_thread thread2;static uint8_t thread2_stack[512];rt_thread_init(&thread2, "thread2", my_thread2, NULL, thread2_stack, 512, 11, 20);//信号量创建sem1=rt_sem_create("sem1", 1, RT_IPC_FLAG_PRIO);//启动线程rt_thread_startup(thread1);rt_thread_startup(&thread2);return RT_EOK;
}
互斥量
互斥量专为互斥而生,拥有互斥量的线程拥有互斥量的所有权(获取了就属于该线程),互斥量支持递归访问(允许同一个线程多次获取同一个互斥量而不会死锁,但是每次上锁都需要对应的解锁)且能防止线程优先级翻转(低优先级上锁后被高优先级抢断,进而导致最高优先级也需要获取互斥量的进入等待【虽然最终也会执行,但是时间可能太长无法承受】);并且互斥量只能由持有线程释放,而信号量则可以由任何线程释放。
下图为优先级翻转具体描述:所谓优先级翻转,即当一个高优先级线程试图通过信号量机制访问共享资源时,如果该信号量已被一低优先级线程持有,而这个低优先级线程在运行过程中可能又被其它一些中等优先级的线程抢占,因此造成高优先级线程被许多具有较低优先级的线程阻塞,实时性难以得到保证。
==如何解决:==互斥量可以解决优先级翻转问题,实现的是优先级继承协议 (Sha, 1990)。优先级继承是通过在线程 A 尝试获取共享资源而被挂起的期间内,将线程 C 的优先级提升到线程 A 的优先级别,从而解决优先级翻转引起的问题。优先级继承是指,提高某个占有某种资源的低优先级线程的优先级,使之与所有等待该资源的线程中优先级最高的那个线程的优先级相等,然后执行,而当这个低优先级线程释放该资源时,优先级重新回到初始设定。因此,继承优先级的线程避免了系统资源被任何中间优先级的线程抢占。
互斥量的状态只有两种,开锁或闭锁(两种状态值),当有线程持有它时,互斥量处于闭锁状态,由这个线程获得它的所有权。相反,当这个线程释放它时,将对互斥量进行开锁,失去它的所有权。
**注意:**在获得互斥量后,请尽快释放互斥量,并且在持有互斥量的过程中,不得再行更改持有互斥量线程的优先级,否则可能人为引入无界优先级反转的问题。
互斥量控制块
在 RT-Thread 中,互斥量控制块是操作系统用于管理互斥量的一个数据结构,由结构体struct rt_mutex
表示。原型表示如下:
struct rt_mutex{struct rt_ipc_object parent; /* 继承自 ipc_object 类 */rt_uint16_t value; /* 互斥量的值 */rt_uint8_t original_priority; /* 持有线程的原始优先级 */rt_uint8_t hold; /* 持有线程的持有次数 */struct rt_thread *owner; /* 当前拥有互斥量的线程 */};/* rt_mutext_t 为指向互斥量结构体的指针类型 */typedef struct rt_mutex* rt_mutex_t;
rt_mutex
对象从rt_ipc_object
中派生,由 IPC
容器所管理。
互斥量使用
互斥量使用包括初始化删除,主要使用是通过获取和释放互斥量实现的。常见的API
函数如下:
动态创建互斥量
原型:
rt_mutex_t rt_mutex_create (const char* name, rt_uint8_t flag); //互斥量名 标志位RT_IPC_FLAG_PRIO
注意: 该标志已经作废,无论用户选择 RT_IPC_FLAG_PRIO
还是 RT_IPC_FLAG_FIFO
,内核均按照 T_IPC_FLAG_PRIO
处理,同时注意如果在其他地方使用RT_IPC_FLAG_FIFO
会破坏线程的优先级调度。
返回值:
返回 | 描述 |
---|---|
互斥量句柄 | 创建成功 |
RT_NULL | 创建失败 |
动态互斥量删除
原型:
rt_err_t rt_mutex_delete (rt_mutex_t mutex); //互斥量句柄
当删除一个互斥量时,所有等待此互斥量的线程都将被唤醒,等待线程获得的返回值是 - RT_ERROR
。然后系统将该互斥量从内核对象管理器链表中删除并释放互斥量占用的内存空间。
返回值:
返回 | 描述 |
---|---|
互斥量句柄 | 创建成功 |
RT_NULL | 创建失败 |
静态互斥量初始化
原型:
rt_err_t rt_mutex_init (rt_mutex_t mutex, const char* name, rt_uint8_t flag);//互斥量句柄 互斥量名 标志位
返回值:
返回 | 描述 |
---|---|
RT_EOK | 初始化成功 |
静态互斥量脱离
原型:
rt_err_t rt_mutex_detach (rt_mutex_t mutex); //互斥量句柄
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
获取互斥量
原型:
rt_err_t rt_mutex_take (rt_mutex_t mutex, rt_int32_t time); //互斥量句柄 等待时间
线程获取互斥量之后,该互斥量就被该线程所有,只有该线程可以释放掉该线程,同时支持递归获取互斥量(重复上锁),相应的上了多少锁就需要对应多少把钥匙来解锁。
当线程获取的该互斥量已经上锁时,就会等待该互斥量解锁才可以让该线程唤醒执行。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功获得互斥量 |
-RT_ETIMEOUT | 超时 |
-RT_ERROR | 获取失败 |
无等待获取互斥量
原型:
rt_err_t rt_mutex_trytake(rt_mutex_t mutex); //互斥量句柄
等于rt_mutex_take(mutex,RT_WAITING_NO)
,当获取互斥量失败后不会进入等待直接返回错误。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功获得互斥量 |
-RT_ETIMEOUT | 获取失败 |
释放信号量
原型:
rt_err_t rt_mutex_release(rt_mutex_t mutex); //互斥量句柄
当持有线程释放时会在原有计数值上减1,当减到0就说明已经解锁了,此时将会唤醒等待该信号量的线程。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
互斥量应用
互斥量最重要的应用就是保护临界资源区和防止优先级反转,下面为你展示应用:
#include <rtthread.h>#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>static uint16_t count1_num=65535;
rt_mutex_t mutex1=NULL;void my_thread1(void* param) //最高优先级->等待互斥量
{while(1){rt_thread_mdelay(1000); //保证低优先级运行rt_mutex_take(mutex1, RT_WAITING_FOREVER); //获取互斥量for (uint16_t i = 0; i < 1000; ++i){count1_num++;}rt_kprintf("thread1 num=%d\r\n",count1_num);rt_mutex_release(mutex1); //释放信号量}
}void my_thread2(void* param) //中优先级->正常运行
{while(1){rt_thread_mdelay(1000); //保证低优先级运行rt_kprintf("thread2 is working...\r\n");}
}void my_thread3(void* param) //低优先级首先持有互斥量
{while(1){rt_mutex_take(mutex1, RT_WAITING_FOREVER); //获取互斥量for (uint16_t i = 0; i < 1000; ++i){count1_num--;}rt_kprintf("thread3 num=%d\r\n",count1_num);rt_mutex_release(mutex1); //释放信号量rt_thread_mdelay(1000);}
}int main(void)
{rt_kprintf("||main start!||\r\n");//线程创建rt_thread_t thread1=NULL;thread1=rt_thread_create("thread1", my_thread1, NULL, 512, 9, 20);static struct rt_thread thread2;static uint8_t thread2_stack[512];rt_thread_init(&thread2, "thread2", my_thread2, NULL, thread2_stack, 512, 10, 20);rt_thread_t thread3=NULL;thread3=rt_thread_create("thread3", my_thread3, NULL, 512, 11, 20);//互斥量创建mutex1=rt_mutex_create("mutex1", RT_IPC_FLAG_PRIO);//启动线程rt_thread_startup(thread1);rt_thread_startup(thread3);rt_thread_startup(&thread2);return RT_EOK;
}
注意:需要切记的是互斥量不能在中断服务例程中使用。因为互斥量只能拥有互斥量的线程释放,同时中断没有优先级概念,继承机制无法实现。
事件集
事件集(Event Set)是一种轻量级的线程间同步机制,主要用于实现一对多、多对一的线程同步。
事件集工作机制
一对多:当其中任意一个事件被标志后,可以唤醒等待这个事件的线程
多对一:当几个事件都到达后才唤醒线程
事件集也有一个队列,当需要等待该事件集的线程都会加入到该队列,下面为主要工作流程:
- 线程A和线程B都会被挂起到==同一个等待队列==中
- 每个等待的线程会记录自己的等待条件(等待哪些事件、AND还是OR)
- 当有事件发生时,系统会==遍历整个等待队列==
- 检查每个等待线程的条件是否满足,满足就唤醒
下面为事件集的核心特性
- 事件标志位
- 事件集由32个事件标志位组成(32位整数)
- 每个标志位代表一个事件的发生状态
- 标志位为1表示事件发生,为0表示事件未发生
- 同步模式
- 逻辑与(AND):等待多个事件同时发生
RT_EVENT_FLAG_AND
- 逻辑或(OR):等待任意一个事件发生
RT_EVENT_FLAG_OR
- 清除模式:接收事件后自动清除标志位
RT_EVENT_FLAG_CLEAR
事件集控制块
事件集控制块是操作系统用于管理事件的一个数据结构,由结构体 struct rt_event
表示
struct rt_event
{struct rt_ipc_object parent; /* 继承自 ipc_object 类 *//* 事件集合,每一 bit 表示 1 个事件,bit 位的值可以标记某事件是否发生 */rt_uint32_t set;
};
/* rt_event_t 是指向事件结构体的指针类型 */
typedef struct rt_event* rt_event_t;
事件集使用
事件集使用包括下面这些常见的API,对一个事件集的操作包含:创建 / 初始化事件集、发送事件、接收事件、删除 / 脱离事件集。
动态创建事件集
原型:
rt_event_t rt_event_create(const char* name, rt_uint8_t flag); //事件集名 标志RT_IPC_FLAG_PRIO
注意:flag标志可选FIFO,但是会破环线程调度优先级不推荐
返回值:
返回 | 描述 |
---|---|
RT_NULL | 创建失败 |
事件对象的句柄 | 创建成功 |
删除动态事件集
原型:
rt_err_t rt_event_delete(rt_event_t event); //事件集句柄
在调用 rt_event_delete 函数删除一个事件集对象时,应该确保该事件集不再被使用。在删除前会唤醒所有挂起在该事件集上的线程(线程的返回值是 - RT_ERROR),然后释放事件集对象占用的内存块。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
初始化静态事件集
原型:
rt_err_t rt_event_init(rt_event_t event, const char* name, rt_uint8_t flag); //事件集句柄 名字 标志位
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
静态事件集脱离
原型:
rt_err_t rt_event_detach(rt_event_t event);
在调用 rt_event_detach函数删除一个事件集对象时,应该确保该事件集不再被使用。在删除前会唤醒所有挂起在该事件集上的线程(线程的返回值是 - RT_ERROR),然后释放事件集对象占用的内存块。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
发送事件
原型:
rt_err_t rt_event_send(rt_event_t event, rt_uint32_t set); //事件句柄 发送的一个或多个事件的标志值
发送事件函数,会把对应的事件由0置1,标志着发送事件完成。
==注意:==由于该函数只会把哪个标志位置位,所以如果重复调用同一事件现象不会发生变化。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
接收事件
原型:
rt_err_t rt_event_recv(rt_event_t event, //事件集句柄rt_uint32_t set, //事件(具体位数) 可以用|多选rt_uint8_t option, //接收选项如 或与 清除标志位 可以用|多选rt_int32_t timeout, //等待时间rt_uint32_t* recved); //指向接收到的事件
recved
是一个输出参数,用于返回实际接收到的事件标志位。可以知道具体是哪一个事件触发了还是没触发
#define EVENT_A (1 << 0) // 0x01
#define EVENT_B (1 << 1) // 0x02
#define EVENT_C (1 << 2) // 0x04rt_uint32_t received_events;// 等待事件A、B、C中的任意一个
rt_event_recv(event, EVENT_A | EVENT_B | EVENT_C, RT_EVENT_FLAG_OR | RT_EVENT_FLAG_CLEAR,RT_WAITING_FOREVER, &received_events);// 通过received_events可以知道具体是哪个事件触发了
if (received_events & EVENT_A) {rt_kprintf("事件A发生了\n");
}
if (received_events & EVENT_B) {rt_kprintf("事件B发生了\n");
}
if (received_events & EVENT_C) {rt_kprintf("事件C发生了\n");
}
但是在AND模式下,只有这几个事件都发生才可以触发返回,常用来再OR模式下判断哪个事件触发
事件集应用
事件集常用在一对多和多对一的线程间同步中,下面为应用代码:
#include <rtthread.h>#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>rt_event_t envent1=NULL;
// 定义事件标志,避免冲突
#define EVENT3_FLAG (0X01<<3) // 只给thread2
#define EVENT4_T2_FLAG (0X01<<4) // thread2专用的EVENT4
#define EVENT4_T3_FLAG (0X01<<5) // thread3专用的EVENT4
#define EVENT5_FLAG (0X01<<6) // 只给thread3void my_thread1(void* param)
{while(1){static uint16_t count=0;rt_kprintf("thread1 is working...\r\n");count++;if(count==1){rt_event_send(envent1, EVENT3_FLAG);rt_kprintf("thread1 send EVENT3\r\n");}if(count>=5&&count%2==1){// 同时发送给两个线程rt_event_send(envent1, EVENT4_T2_FLAG | EVENT4_T3_FLAG);rt_kprintf("thread1 send EVENT4 to both threads\r\n");}if(count>=5&&count%2==0){rt_event_send(envent1, EVENT5_FLAG);rt_kprintf("thread1 send EVENT5\r\n");}rt_thread_mdelay(1000);}
}void my_thread2(void* param)
{while(1){rt_uint32_t rec_event;rt_event_recv(envent1, EVENT3_FLAG | EVENT4_T2_FLAG,RT_EVENT_FLAG_OR | RT_EVENT_FLAG_CLEAR,RT_WAITING_FOREVER, &rec_event);rt_kprintf("thread2 is working...\r\n");if(rec_event & EVENT3_FLAG){rt_kprintf("thread2 recieve EVENT3\r\n");}if(rec_event & EVENT4_T2_FLAG){rt_kprintf("thread2 recieve EVENT4\r\n");}rt_thread_mdelay(1000);}
}void my_thread3(void* param)
{while(1){rt_uint32_t rec_event;rt_event_recv(envent1, EVENT4_T3_FLAG | EVENT5_FLAG,RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR,RT_WAITING_FOREVER, &rec_event);rt_kprintf("thread3 is working...\r\n");rt_kprintf("thread3 recieve EVENT4 and EVENT5\r\n");rt_thread_mdelay(1000);}
}int main(void)
{rt_kprintf("||main start!||\r\n");//线程创建rt_thread_t thread1=NULL;thread1=rt_thread_create("thread1", my_thread1, NULL, 512, 9, 20);static struct rt_thread thread2;static uint8_t thread2_stack[512];rt_thread_init(&thread2, "thread2", my_thread2, NULL, thread2_stack, 512, 10, 20);rt_thread_t thread3=NULL;thread3=rt_thread_create("thread3", my_thread3, NULL, 512, 10, 20);//事件集创建envent1=rt_event_create("event1", RT_IPC_FLAG_PRIO);//启动线程rt_thread_startup(thread3);rt_thread_startup(thread1);rt_thread_startup(&thread2);return RT_EOK;
}
注意:事件集中很容易出现竞争关系,合理设计事件的生命周期管理!最好是对每一个线程单独设置一个事件标志位,避免出现竞争。
线程间同步对比选择
由于线程间同步的方法比较多,因此为了凸显每一种同步方法,我们进行如下对比:
信号量:
场景类型 | 具体应用 | 示例 |
---|---|---|
资源池管理 | 管理有限数量的资源 | 串口缓冲区、内存池 |
生产消费 | 数据流控制 | 队列满/空控制 |
任务同步 | 简单的任务协调 | 启动顺序控制 |
中断通知 | 中断与线程同步 | 数据接收完成通知 |
互斥量:
场景类型 | 具体应用 | 示例 |
---|---|---|
临界区保护 | 保护共享数据结构 | 全局变量、链表操作 |
设备独占 | 硬件资源互斥访问 | SPI、I2C设备 |
文件系统 | 文件读写保护 | 配置文件访问 |
复杂算法 | 多步骤原子操作 | 状态机更新 |
事件集:
场景类型 | 具体应用 | 示例 |
---|---|---|
多条件等待 | 等待多个事件同时发生 | 系统初始化完成 |
状态机控制 | 复杂状态转换 | 设备状态管理 |
GUI事件 | 用户界面事件处理 | 按键、触摸、定时器 |
协议栈 | 网络事件处理 | 连接、数据、错误事件 |
推荐场景:
应用需求 | 推荐机制 | 理由 |
---|---|---|
资源池管理 | 信号量 | 天然支持计数,管理多个相同资源 |
共享数据保护 | 互斥量 | 优先级继承,防止优先级反转 |
设备驱动同步 | 事件集 | 多种中断事件的灵活组合 |
生产者消费者 | 信号量 | 经典模式,简单高效 |
系统初始化 | 事件集 | 多个模块初始化状态的AND组合 |
文件系统访问 | 互斥量 | 复杂操作的原子性保护 |
GUI事件处理 | 事件集 | 多种用户事件的OR组合 |
网络协议栈 | 事件集 | 连接、数据、错误等多事件 |
避免误用
- ❌ 用信号量保护临界区(无优先级继承)
- ❌ 用互斥量做简单计数(功能过重)
- ❌ 用事件集传递数据(只能传递状态)
线程间通信
首先在裸机变成中我们使用的是传统的全局变量来进行数据传输,但是在多线程中如果继续使用全局变量可能会导致数据错乱的情况,在简单情况下我们是可以使用互斥锁保护临界区+全局变量的方式来进行线程间的通信的,但是为了提高我们的一个工作效率与应对复杂情况,利用邮箱、消息队列、信号来进行线程间通信也是必不可少的。
邮箱
邮箱是RT-Thread提供的一种轻量级消息传递机制,主要用于在线程间传递少量数据(通常是4字节的指针或数值)。
核心特性
- 数据传递方式
- 传递内容:4字节数据(通常是指针、句柄或小整数)
- 传递方式:值拷贝,不是引用传递
- 数据大小:固定4字节,适合传递地址或简单数值
- 缓冲机制
- FIFO队列:先进先出的消息队列
- 容量限制:创建时指定最大邮件数量
- 阻塞机制:支持发送和接收的超时等待
邮箱工作机制
邮箱特点是开销比较低,效率较高。邮箱中的每一封邮件只能容纳固定的 4 字节内容(32位系统则刚好可以容纳一个指针)。线程或中断服务例程把一封 4 字节长度的邮件发送到邮箱中,而一个或多个线程可以从邮箱中接收这些邮件并进行处理。
非阻塞方式的邮件发送过程能够安全的应用于中断服务中,是线程、中断服务、定时器向线程发送消息的有效手段。通常来说,邮件收取过程可能是阻塞的,这取决于邮箱中是否有邮件,以及收取邮件时设置的超时时间。当邮箱中不存在邮件且超时时间不为 0 时,邮件收取过程将变成阻塞方式。
当一个线程向邮箱发送邮件时,如果邮箱没满,将把邮件复制到邮箱中。如果邮箱已经满了,发送线程可以设置超时时间,选择等待挂起或直接返回 - RT_EFULL
。如果发送线程选择挂起等待,那么当邮箱中的邮件被收取而空出空间来时,等待挂起的发送线程将被唤醒继续发送。
当一个线程从邮箱中接收邮件时,如果邮箱是空的,接收线程可以选择是否等待挂起直到收到新的邮件而唤醒,或可以设置超时时间。当达到设置的超时时间,邮箱依然未收到邮件时,这个选择超时等待的线程将被唤醒并返回 - RT_ETIMEOUT
。如果邮箱中存在邮件,那么接收线程将复制邮箱中的 4 个字节邮件到接收缓存中。
邮箱控制块
邮箱控制块是操作系统用于管理邮箱的一个数据结构,由结构体 struct rt_mailbox
表示,具体如下:
struct rt_mailbox
{struct rt_ipc_object parent;rt_uint32_t* msg_pool; /* 邮箱缓冲区的开始地址 */rt_uint16_t size; /* 邮箱缓冲区的大小 */rt_uint16_t entry; /* 邮箱中邮件的数目 */rt_uint16_t in_offset, out_offset; /* 邮箱缓冲的进出指针 */rt_list_t suspend_sender_thread; /* 发送线程的挂起等待队列 */
};
typedef struct rt_mailbox* rt_mailbox_t;
邮箱使用
邮箱的使用常见的操作函数有创建 / 初始化邮箱、发送邮件、接收邮件、删除 / 脱离邮箱。如下:
创建动态邮箱
原型:
rt_mailbox_t rt_mb_create (const char* name, rt_size_t size, rt_uint8_t flag); //邮箱名 邮箱大小 等待标志
邮箱动态分配一块内存空间来存邮箱,这块内存的大小等于邮件大小(4 字节)与邮箱容量的乘积。
返回值:
返回 | 描述 |
---|---|
RT_NULL | 创建失败 |
邮箱对象的句柄 | 创建成功 |
删除动态邮箱
原型:
rt_err_t rt_mb_delete (rt_mailbox_t mb); //邮箱句柄
删除邮箱时,如果有线程被挂起在该邮箱对象上,内核先唤醒挂起在该邮箱上的所有线程(线程返回值是 -RT_ERROR),然后再释放邮箱使用的内存,最后删除邮箱对象。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
初始化静态邮箱
原型:
rt_err_t rt_mb_init(rt_mailbox_t mb, //邮箱句柄const char* name, //邮箱名void* msgpool, //邮箱内存起始地址rt_size_t size, //邮箱大小rt_uint8_t flag) //标志
注意:邮箱内存和邮箱结构体一定要设置位静态的数组,static
或者全局变量。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
静态邮箱脱离
原型:
rt_err_t rt_mb_detach(rt_mailbox_t mb); //邮箱句柄
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
发送邮件
原型:
rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value); //邮箱句柄 邮箱值
该函数如果在邮箱满的情况下不会等待会直接返回错误,同时这里需要注意一下,我们发送的数据可以是一个值也可以是指针下面讲解一下
rt_mb_send(mb, 100); // 传递整数
//接收
rt_uint32_t value;
rt_mb_recv(cmd_mb, &value, RT_WAITING_FOREVER);//最常用的传递指针
char* message = rt_malloc(100);
strcpy(message, "Hello RT-Thread");
rt_mb_send(data_mb, (rt_uint32_t)message); // 强制转换为uint32_t
// 接收
rt_uint32_t addr;
rt_mb_recv(data_mb, &addr, RT_WAITING_FOREVER);
char* received_msg = (char*)addr; // 转换回指针
rt_kprintf("收到消息: %s\n", received_msg);
rt_free(received_msg); // 记得释放内存
返回值:
返回 | 描述 |
---|---|
RT_EOK | 发送成功 |
-RT_EFULL | 邮箱已经满了 |
带等待的发送邮件
原型:
rt_err_t rt_mb_send_wait (rt_mailbox_t mb, //邮箱句柄rt_uint32_t value, //发送值rt_int32_t timeout); //等待时间
该函数就可以等待邮箱从满的状态变为有空间来存数据,之后再进行返回。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 发送成功 |
-RT_ETIMEOUT | 超时 |
-RT_ERROR | 失败,返回错误 |
发送紧急邮件
原型:
rt_err_t rt_mb_urgent (rt_mailbox_t mb, rt_ubase_t value); //邮箱句柄 发送值
该函数会直接把我们的邮件插入到邮箱首部位置,但是如果邮箱满的情况下一样的返回错误。
返回值:
返回 | 描述 |
---|---|
RT_EOK | 发送成功 |
-RT_EFULL | 邮箱已满 |
接收邮件
原型:
rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout); //邮箱句柄 接收地址 超时时间
只有当接收者接收的邮箱中有邮件时,接收者才能立即取到邮件并返回 RT_EOK 的返回值,否则接收线程会根据超时时间设置,或挂起在邮箱的等待线程队列上,或直接返回。
注意:邮件取出的顺序就是FIFO模式
返回值:
返回 | 描述 |
---|---|
RT_EOK | 接收成功 |
-RT_ETIMEOUT | 超时 |
-RT_ERROR | 失败,返回错误 |
邮箱应用
邮箱最主要的目的是传递指针,通过传递结构体消息指针可以更加方便的传递我们想要的数据,下面为应用代码:
#include <rtthread.h>#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>rt_mailbox_t mailbox1=NULL;
char* str1="Hello RT_Thread!";
char* str2="Try My best to study!";//消息结构体
typedef enum {MSG_TYPE_INT = 1,MSG_TYPE_STRING = 2
} msg_type_t;typedef struct{msg_type_t Flag;union {int number;char* string;} data;
}message_t;message_t msg1;
message_t msg2;
message_t msg3;void my_thread1(void* param) //发送邮件
{while(1){rt_kprintf("thread1 is working...\r\n");static int16_t count;static int16_t count_time;count++;if(count==5){count%=5;count_time++;msg3.data.number=count_time;rt_mb_send(mailbox1, (rt_uint32_t)&msg3);rt_kprintf("thread1 send num ok\r\n");}if(count==0&&count_time==5){rt_mb_send(mailbox1, (rt_uint32_t)&msg1);rt_kprintf("thread1 send message ok\r\n");}else if(count==0&&count_time==10){rt_mb_send(mailbox1, (rt_uint32_t)&msg2);rt_kprintf("thread1 send message ok\r\n");break;}rt_thread_mdelay(1000);}
}void my_thread2(void* param) //接收邮件
{while(1){rt_kprintf("thread2 is working...\r\n");rt_uint32_t temp;rt_mb_recv(mailbox1, &temp, RT_WAITING_FOREVER);message_t* msg = (message_t*)temp;if(msg->Flag == MSG_TYPE_INT){rt_kprintf("receive num:%d\r\n", msg->data.number);} else if(msg->Flag == MSG_TYPE_STRING){rt_kprintf("receive string:%s\r\n", msg->data.string);}rt_thread_mdelay(1000);}
}int main(void)
{rt_kprintf("||main start!||\r\n");//线程创建rt_thread_t thread1=NULL;thread1=rt_thread_create("thread1", my_thread1, NULL, 512, 9, 20);static struct rt_thread thread2;static uint8_t thread2_stack[512];rt_thread_init(&thread2, "thread2", my_thread2, NULL, thread2_stack, 512, 10, 20);//邮箱创建mailbox1=rt_mb_create("mailbox1", 15, RT_IPC_FLAG_PRIO);//消息结构体初始化msg1.Flag=MSG_TYPE_STRING;msg1.data.string="Hello RT_Thread!";msg2.Flag=MSG_TYPE_STRING;msg2.data.string="Try My best to study!";msg3.Flag=MSG_TYPE_INT;//启动线程rt_thread_startup(thread1);rt_thread_startup(&thread2);return RT_EOK;
}
技巧:消息结构体,通过枚举定义加联合体创造一个消息结构体,可以更加方便的传递任何我们想要的数据
//消息结构体
typedef enum {MSG_TYPE_INT = 1,MSG_TYPE_STRING = 2
} msg_type_t;typedef struct{msg_type_t Flag;union {int number;char* string;} data;
}message_t;
//消息结构体初始化
msg1.Flag=MSG_TYPE_STRING;
msg1.data.string="Hello RT_Thread!";
消息队列
消息队列是另一种常用的线程间通讯方式,是邮箱的扩展。可以应用在多种场合:线程间的消息交换、使用串口接收不定长数据等。
核心特性
- 数据传递方式
- 传递内容:可变长度的数据(不限于4字节)
- 传递方式:数据拷贝,完整传递数据内容
- 数据大小:创建时指定,支持任意大小的消息
- 缓冲机制
- FIFO队列:先进先出的消息队列
- 容量管理:指定最大消息数量和每条消息的最大大小
- 阻塞机制:支持发送和接收的超时等待
消息队列工作机制
消息队列能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。其他线程也能够从消息队列中读取相应的消息,而当消息队列是空的时候,可以挂起读取线程。当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。消息队列是一种异步的通信方式。
线程先得到的是最先进入消息队列的消息,即先进先出原则 (FIFO)。
消息队列被创建时,它就被分配了消息队列控制块,每个消息队列对象中包含着多个消息框,每个消息框可以存放一条消息;消息队列中的第一个和最后一个消息框被分别称为消息链表头和消息链表尾,对应于消息队列控制块中的 msg_queue_head
和 msg_queue_tail
;有些消息框可能是空的,它们通过 msg_queue_free
形成一个空闲消息框链表。所有消息队列中的消息框总数即是消息队列的长度,这个长度可在消息队列创建时指定。
每一个消息框(消息节点)都必须有一个消息头,下图为结构
// 每个消息框的完整结构
┌─────────────────────────────────┐
│ 消息框(Message Node) │
├─────────────────────────────────┤
│ 消息头(8字节) │
│ ┌─────────────────────────────┐│
│ │ next指针 (4字节) ││ ← 指向下一个消息框
│ │ length字段 (2字节) ││ ← 消息实际长度
│ │ reserved字段 (2字节) ││ ← 保留/对齐
│ └─────────────────────────────┘│
├─────────────────────────────────┤
│ 消息数据区域(N字节) │ ← 实际的用户数据
│ ┌─────────────────────────────┐│
│ │ 用户的实际消息内容 ││
│ │ (结构体、数组、字符串等) ││
│ └─────────────────────────────┘│
└─────────────────────────────────┘
消息队列控制块
消息队列控制块是操作系统用于管理消息队列的一个数据结构,由结构体 struct rt_messagequeue 表示。下面为具体表示
struct rt_messagequeue
{struct rt_ipc_object parent;void* msg_pool; /* 指向存放消息的缓冲区的指针 */rt_uint16_t msg_size; /* 每个消息的长度 */rt_uint16_t max_msgs; /* 最大能够容纳的消息数 */rt_uint16_t entry; /* 队列中已有的消息数 */void* msg_queue_head; /* 消息链表头 */void* msg_queue_tail; /* 消息链表尾 */void* msg_queue_free; /* 空闲消息链表 */rt_list_t suspend_sender_thread; /* 发送线程的挂起等待队列 */
};
typedef struct rt_messagequeue* rt_mq_t;
消息队列使用
消息队列的操作包含:创建消息队列 - 发送消息 - 接收消息 - 删除消息队列。下图可以看出来
动态创建消息队列
原型:
rt_mq_t rt_mq_create(const char* name, rt_size_t msg_size, //消息队列名 消息最大长度字节rt_size_t max_msgs, rt_uint8_t flag); //消息队列最大个数 标志RT_IPC_FLAG_PRIO
创建消息队列时先从对象管理器中分配一个消息队列对象,然后给消息队列对象分配一块内存空间,组织成空闲消息链表,这块内存的大小 =[消息大小 + 消息头(用于链表连接)的大小]X 消息队列最大个数,接着再初始化消息队列,此时消息队列为空。
返回值:
返回 | 描述 |
---|---|
消息队列对象的句柄 | 成功 |
RT_NULL | 失败 |
删除动态消息队列
原型:
rt_err_t rt_mq_delete(rt_mq_t mq); //消息队列句柄
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
静态消息队列初始化
原型:
rt_err_t rt_mq_init(rt_mq_t mq, const char* name, //消息队列句柄 消息队列名void *msgpool, rt_size_t msg_size, //指向存放消息的缓冲区的指针 一条消息的最大长度字节rt_size_t pool_size, rt_uint8_t flag); //存放消息的缓冲区大小 标志
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
静态消息队列脱离
原型:
rt_err_t rt_mq_detach(rt_mq_t mq); //消息队列句柄
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
发送消息
原型:
rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size); //消息队列句柄 消息内容 消息大小字节
当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块,说明消息队列已满。
该函数不会等待,如果无空闲消息块的情况下,会直接返回错误。
消息大小单位是==字节==,然后通常可以使用sizeof
以及strlen
来得到
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
-RT_EFULL | 消息队列已满 |
-RT_ERROR | 失败,表示发送的消息长度大于消息队列中消息的最大长度 |
带等待的发送消息
原型:
rt_err_t rt_mq_send_wait(rt_mq_t mq, //消息句柄const void *buffer, //消息内容rt_size_t size, //消息大小字节rt_int32_t timeout); //超时时间
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
-RT_EFULL | 消息队列已满 |
-RT_ERROR | 失败,表示发送的消息长度大于消息队列中消息的最大长度 |
紧急消息发送
原型:
rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size); //消息队列句柄 消息内容 消息大小字节
当发送紧急消息时,从空闲消息链表上取下来的消息块不是挂到消息队列的队尾,而是挂到队首
返回值:
返回 | 描述 |
---|---|
RT_EOK | 成功 |
-RT_EFULL | 消息队列已满 |
-RT_ERROR | 失败 |
接收消息
原型:
rt_ssize_t rt_mq_recv (rt_mq_t mq, void* buffer, //消息队列句柄 消息内容接收区rt_size_t size, rt_int32_t timeout); //接收区大小字节 超时时间
当消息队列中有消息时,接收者才能接收消息,否则接收者会根据超时时间设置,或挂起在消息队列的等待线程队列上。
接收消息时,接收者需指定存储消息的消息队列对象句柄,并且指定一个内存缓冲区,接收到的消息内容将被复制到该缓冲区里。
返回值:
返回 | 描述 |
---|---|
接收到消息的长度 | 成功收到 |
-RT_ETIMEOUT | 超时 |
-RT_ERROR | 失败,返回错误 |
消息队列应用
消息队列可以应用于发送不定长消息的场合,包括线程与线程间的消息交换,以及中断服务例程中给线程发送消息(中断服务例程不能接收消息)。(通过邮箱传指针也可以实现)下面为具体应用:
#include <rtthread.h>#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>rt_mq_t messagequeue1 = NULL; // 改为消息队列
char* str1 = "Hello RT_Thread!";
char* str2 = "Try My best to study!";// 消息结构体
typedef enum {MSG_TYPE_INT = 1,MSG_TYPE_STRING = 2
} msg_type_t;typedef struct{msg_type_t Flag;union {int number;char string[64]; // 改为固定长度字符数组,避免指针问题} data;
} message_t;message_t msg1;
message_t msg2;
message_t msg3;void my_thread1(void* param) // 发送消息
{while(1){rt_kprintf("thread1 is working...\r\n");static int16_t count;static int16_t count_time;count++;if(count == 3){count=0;count_time++;msg3.data.number = count_time;// 发送整个消息结构体rt_mq_send(messagequeue1, &msg3, sizeof(message_t));rt_kprintf("thread1 send num ok\r\n");}if(count == 0 && count_time == 2){rt_mq_send(messagequeue1, &msg1, sizeof(message_t));rt_kprintf("thread1 send message ok\r\n");}else if(count == 0 && count_time == 4){rt_mq_send(messagequeue1, &msg2, sizeof(message_t));rt_kprintf("thread1 send message ok\r\n");break;}rt_thread_mdelay(1000);}
}void my_thread2(void* param) // 接收消息
{while(1){rt_kprintf("thread2 is working...\r\n");message_t received_msg;// 接收消息到本地变量if(rt_mq_recv(messagequeue1, &received_msg, sizeof(message_t), RT_WAITING_FOREVER) == RT_EOK){if(received_msg.Flag == MSG_TYPE_INT){rt_kprintf("receive num:%d\r\n", received_msg.data.number);}else if(received_msg.Flag == MSG_TYPE_STRING){rt_kprintf("receive string:%s\r\n", received_msg.data.string);}}rt_thread_mdelay(1000);}
}int main(void)
{rt_kprintf("||main start!||\r\n");// 线程创建rt_thread_t thread1 = NULL;thread1 = rt_thread_create("thread1", my_thread1, NULL, 512, 9, 20);static struct rt_thread thread2;static uint8_t thread2_stack[512];rt_thread_init(&thread2, "thread2", my_thread2, NULL, thread2_stack, 512, 10, 20);// 消息队列创建 (队列长度15,每个消息大小为message_t结构体大小)messagequeue1 = rt_mq_create("mq1", sizeof(message_t), 15, RT_IPC_FLAG_PRIO);// 消息结构体初始化msg1.Flag = MSG_TYPE_STRING;rt_strncpy(msg1.data.string, str1, sizeof(msg1.data.string) - 1);msg2.Flag = MSG_TYPE_STRING;rt_strncpy(msg2.data.string, str2, sizeof(msg2.data.string) - 1);msg3.Flag = MSG_TYPE_INT;// 启动线程rt_thread_startup(thread1);rt_thread_startup(&thread2);return RT_EOK;
}
邮箱消息队列对比
特性 | 邮箱 (Mailbox) | 消息队列 (Message Queue) |
---|---|---|
数据单位 | 4字节 的整数值 或指针 | 任意长度的数据块(复制)或指针(传递) |
容量 | 固定数量的槽位(每个槽位4字节) | 固定数量的消息条数(每条消息长度可变) |
效率 | 极高(只拷贝4字节) | 相对较低(需要拷贝整个数据块) |
适用场景 | 传递小型控制命令、状态标志、或指针 | 传递实际的数据内容、结构体、字符串等 |
灵活性 | 低 | 高 |