FreeRTOS消息队列剖析讲解(思路+源码)
消息队列的概念
在FreeRTOS中,消息队列的实现就是通过在内存中开辟了一个共享的区域,通过其内部的一些指针、变量实现的任务间通讯。说的比较宽泛,但是要记住核心的点,那就是消息队列的实现方式就是一块共享的区域。
消息队列中成员
在这里我会根据具体成员在消息队列中担任的角色入手,让大家不仅仅知道消息队列里有它,也要让大家知道为什么消息队列里会有它,我自认为这样的学习方式会更有记忆点。
首先要明白的是,消息队列的写和读,是在一个环形缓冲区内实现的,至于环形缓冲区怎么实现的,我会在后面给大家讲。
环形缓冲区
环形缓冲区的实现实际上非常简单,它设计的成员有四个
int8_t *pcHead; //消息队列的头指针
int8_t *pcTail; //消息队列的尾指针
int8_t *pcWriteTo; //消息队列下一个要写的位置的指针
int8_t *pcReadFrom; //消息队列下一个要读的位置的指针
有关这四个指针的初始化如下所示
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue; //在prvInitialiseNewQueue被赋值,也就是指向我们传入的消息队列指针的位置
//后面三个都是在初始化函数中的xQueueGenericReset函数中被赋值,也就是消息队列的重置函数
pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
pxQueue->pcWriteTo = pxQueue->pcHead;
pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
//值得一提的是消息队列中,读指针的在读消息时是先++后去读的,所以其初始化的位置是在消息队列空间的尾部。
现在有了这四名大酱,相比大家对于如何实现缓冲区也有了一点想法,实际上非常之简单。源码如图所示。
if( pxQueue->pcWriteTo >= pxQueue->pcTail ){pxQueue->pcWriteTo = pxQueue->pcHead;} //在prvCopyDataToQueue()函数中if( pxQueue->pcReadFrom >= pxQueue->pcTail ){pxQueue->pcReadFrom = pxQueue->pcHead;} //在prvCopyDataFromQueue()函数中
这样一看是不是对于RTOS的实现祛魅了,想着,我寻思多厉害呢,这不就是两个if判断嘛,对,其实就是两个if判断,实际上FreeRTOS的很多功能都是由这些if判断来堆叠起来的。
但是有聪明的小伙伴看到这里有疑问了,怎么感觉消息队列的读和写是分开来搞的,一个是闷头读,一个是闷头写,这样的话不就会出问题吗,如果我的读指针超过了写指针怎么办?我岂不是在瞎读。能考虑到这些的同学真的很棒,因为你们在学习中加入了自己的思考,值得鼓励。其实这时候你就可以加入自己的优化思路,你就可以在读的时候加一个判断指针的位置,当然,FReeRTOS本身也考虑到了这一点,但是它并没有选择在读的时候判断指针的位置,而是用了一个更简单的方式,用一个变量来表示当前队列中消息的个数,这就引申出来我们下一个功能要实现的重要人物。
阻塞等待
书接上文,如果我当前的读指针“追”上了写指针,那是不是我们就不能再去读了,或者说我们的写指针“追”上了读指针,那么我们也没有地方去读了,这时候就有两种选择,一个是我写不进去我就不写了,我去干别的,另一个就是我写不进去我就等等,等我能写进去了我就写写,这个等等的行为在操作系统中就叫阻塞。
下面清楚本章涉及到的成员
volatile UBaseType_t uxMessagesWaiting; //表示当前队列中存在的消息个数
UBaseType_t uxLength; //表示当前队列的总长度
List_t xTasksWaitingToSend; //等待发送的列表
List_t xTasksWaitingToReceive; //等待接受的列表
介绍一下每个成员,首先要介绍的就是两个列表项,通过观察他们两个的名字不难推断出他们两个的作用,第一个waitingToSend就是,当你想要发送消息给这个消息队列的时候,这个队列是满的,这时候你这个发送的动作就会阻塞,而你任务控制块的xEventListItem就会被挂载到这里,同时你的任务控制块的xStateListItem会相应的挂在阻塞态。一定要分清这两个列表项的作用,一个是表示任务的状态,一个表示任务在等待的事件,可以根据名字进行理解。对于任务控制块和任务调度的内容,我会在后面单独写一个给大家。
其次,FreeRTOS是怎么判断我们当前的这个发送和接受动作需不需要阻塞的呢,这就进一步引申出来另外两个元素uxMessagesWaiting和uxLength,首先说一下他们两个的含义,一个表示当前消息队列中的消息个数(这里的消息个数和前面的两个writeto和readfrom指针是没有关系的,他们是两不同的系统,两个指针只负责读和写并不负责判断阻塞,这点要分清,要不然会想不通)。另一个就是表示整个队列的长度,也就是队列能够容纳的消息个数。然后我们就可以通过判断当前uxMessagesWaiting的状态来判断是否需要阻塞,如果他等于0那就意味着当前队列没有消息给你读,这时候想要去读,就会导致读阻塞,如果他的值现在已经等于uxLength了,就意味着队列已经满了,也就不再允许再向队列中写消息了。具体的源码如下
taskENTER_CRITICAL();
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{traceQUEUE_SEND( pxQueue );xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );......
} //这个就是发送的时候判断是不是等于uxLength后面的或上的是根据当前模式的不同进行的不同操作,这里只是简单介绍原理,有关模式的部分不过多介绍,可以自己问一下ai。
else
{if( xTicksToWait == ( TickType_t ) 0 ){taskEXIT_CRITICAL(traceQUEUE_SEND_FAILED( pxQueue );return errQUEUE_FULL;}else if( xEntryTimeSet == pdFALSE ){vTaskSetTimeOutState( &xTimeOut );xEntryTimeSet = pdTRUE;}else{mtCOVERAGE_TEST_MARKER();}
} //上面这一部分就是发送条件不满足以后,会设置一个结构体xTimeOut用来控制后续的阻塞
taskEXIT_CRITICAL();vTaskSuspendAll(); // 挂起调度器,防止其他任务同时操作队列列表
prvLockQueue( pxQueue ); // 给队列上锁(记账用) /* ②-3 真正阻塞点:循环等待“队列有空”或“超时” */
while( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) != pdTRUE )
{/* 如果队列仍有空位,跳出循环去重试 */if( pxQueue->uxMessagesWaiting < pxQueue->uxLength ){break; /* 跳出去重试写入 */}/* 队列还是满的 -> 把当前任务挂到 xTasksWaitingToSend 链表 */ vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait ); //这这里也将当前任务挂载到阻塞队列中去的/* 解锁队列 + 恢复调度器,允许任务切换 */prvUnlockQueue( pxQueue );if( xTaskResumeAll() == pdFALSE ){/* 如果有更高优先级任务就绪,强制一次切换 */portYIELD_WITHIN_API();}/* 再次被唤醒 -> 重新上锁进入下一轮判断 */prvLockQueue( pxQueue );
}/* ②-4 超时或空位出现 -> 解锁 + 恢复调度器 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
同样的接受消息的处理同样如此。
taskENTER_CRITICAL();
{const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;/* ① 有消息 → 立即拷贝,不会阻塞 */if( uxMessagesWaiting > ( UBaseType_t ) 0 ){/* 记录读指针,防止仅 peek 时还要还原 */pcOriginalReadPosition = pxQueue->u.pcReadFrom;/* 把消息拷到用户缓冲区 */prvCopyDataFromQueue( pxQueue, pvBuffer );/* 如果是“真正取出”(peek=pdFALSE) 才减计数 */if( xJustPeeking == pdFALSE ){traceQUEUE_RECEIVE( pxQueue );pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;/* 队列现在有空位,唤醒一个等写任务 */if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ){/* 被唤醒任务优先级更高,需要切换 */xYieldRequired = pdTRUE;}}}else{ /* peek 模式:只读不出队,还原读指针 */pxQueue->u.pcReadFrom = pcOriginalReadPosition;}/* 退出临界区,直接成功返回 */taskEXIT_CRITICAL();return pdPASS;}/* ② 没消息 → 进入“阻塞”分支 */else{/* 用户不给阻塞时间 → 立即返回空错误 */if( xTicksToWait == ( TickType_t ) 0 ){taskEXIT_CRITICAL();traceQUEUE_RECEIVE_FAILED( pxQueue );return errQUEUE_EMPTY;}/* 需要阻塞,但第一次进函数才初始化超时结构体 */if( xEntryTimeSet == pdFALSE ){vTaskInternalSetTimeOutState( &xTimeOut );xEntryTimeSet = pdTRUE;}}
}
taskEXIT_CRITICAL();
vTaskSuspendAll();
prvLockQueue( pxQueue );/* 每轮循环先检查“用户指定的剩余时间”是否耗尽 */
while( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) != pdTRUE )
{/* 如果队列里突然有消息了,立刻跳出 while 去重读 */if( pxQueue->uxMessagesWaiting > 0 ){break;}/* 依旧空 → 把当前任务挂到“等读”链表 + 延时列表 */traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );/* 解锁队列 + 恢复调度器 → 立即切换,任务正式阻塞 */prvUnlockQueue( pxQueue );if( xTaskResumeAll() == pdFALSE ){portYIELD_WITHIN_API();}/* 再次被唤醒后,重新上锁进入下一轮判断 */prvLockQueue( pxQueue );
}/* 超时或消息到达 → 统一出口:解锁 + 恢复调度器 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();/* 最后再进一次临界区,决定返回 pdPASS 还是 errQUEUE_EMPTY */
taskENTER_CRITICAL();
{if( pxQueue->uxMessagesWaiting > 0 ){/* 消息已到 → 重试拷贝(仅一次) */xYieldRequired = prvCopyDataFromQueue( pxQueue, pvBuffer );if( xJustPeeking == pdFALSE ){pxQueue->uxMessagesWaiting--;/* 有空位,唤醒一个等写任务 */if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ){xYieldRequired = pdTRUE;}}}taskEXIT_CRITICAL();if( xYieldRequired != pdFALSE ) portYIELD_WITHIN_API();return pdPASS;}else{/* 依旧空 → 超时返回 */taskEXIT_CRITICAL();return errQUEUE_EMPTY;}
}
至此整个消息队列实现阻塞的过程就已经梳理完毕,总的来说,消息队列没有什么很高深的代码,整个功能的实现其实就是这里面介绍的几个变量的配合。其实还有两个锁变量我没有介绍,因为我暂时还没搞清楚他们两个存在的作用,以后如果用到了再回来梳理。还有一些消息队列写和读的模式不同也会导致其中的细节不同,但是大部分情况下我们用的就是这么个情况。后面我会放几张消息队列运行时的图片,方便大家更好的理解。谢谢大家的阅读。
初始化以后指针情况
正常运行时指针情况
重点放在环形缓冲区的实现和如何判断阻塞以及阻塞唤醒的时机。