【freertos-kernel】queue(发送)
文章目录
- 补充
- 各种yeild
- TCB的xStateListItem和xEventListItem
- xQueueGenericSend
- prvCopyDataToQueue
- prvNotifyQueueSetContainer
- vTaskInternalSetTimeOutState
- vTaskSuspendAll
- xTaskResumeAll
- prvLockQueue
- prvUnlockQueue
- prvIncrementQueueTxLock
- vTaskPlaceOnEventList
- prvAddCurrentTaskToDelayedList
- xQueueGenericSendFromISR
补充
各种yeild
各种 yield 宏和函数的作用都是为了让系统知道“现在应该检查是否需要切换到更高优先级任务”,它们的区别在于使用的上下文和条件不同。
名称 | 含义 | 使用场景 |
---|---|---|
portYIELD_WITHIN_API() | 强制进行一次任务调度 | 在非中断上下文中调用,比如队列操作后 |
taskYIELD() | 同上,是 portYIELD_WITHIN_API() 的别名 | 同上 |
portEND_SWITCHING_ISR(xHigherPriorityTaskWoken) | 在中断中唤醒任务后使用 | ISR 中调用 xQueueSendFromISR() 等函数后 |
vTaskMissedYield() | 标记需要调度,但不在当前上下文立即执行 | 在调度器被挂起期间唤醒任务时使用 |
queueYIELD_IF_USING_PREEMPTION() | 如果启用抢占式调度,就 yield | 多用于队列、信号量内部逻辑中 |
TCB的xStateListItem和xEventListItem
xStateListItem
用于任务状态切换,它决定了任务处于哪个系统级(全局)链表中(如就绪、延时、挂起等)。
xEventListItem
用于将任务加入到某个特定事件的等待列表中(例如队列的发送/接收等待列表、信号量的等待列表等)。
xQueueGenericSend
实现了将数据放入队列、唤醒等待任务、以及在队列满时让任务进入阻塞状态等核心功能。
过程挺啰嗦的,简单来说就是xCopyPosition 确定新数据pvItemToQueue写入队列数据缓存区的位置,如果队列没满,就复制数据进队列,唤醒该队列的接受等待任务链表xTasksWaitingToReceive 中的任务;如果满了,等待xTicksToWait。
流程如图
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,const void * const pvItemToQueue,TickType_t xTicksToWait,const BaseType_t xCopyPosition )
初始化参数和断言检查
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
traceENTER_xQueueGenericSend( xQueue, pvItemToQueue, xTicksToWait, xCopyPosition );
//检查 pxQueue 是否合法。
configASSERT( pxQueue );
//确保传入的 pvItemToQueue 非空且队列项大小不为零。
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
//如果使用 queueOVERWRITE 模式,则确保队列长度为1(因为只能覆盖已有的一项)。
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{//若调度器被挂起且需要等待时间,触发断言失败(调度器挂起时不能阻塞等待)。configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif
进入循环尝试发送数据
for( ; ; )
{taskENTER_CRITICAL();{//如果当前消息数小于最大容量,或使用的是覆盖写,则可以继续发送if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) ){traceQUEUE_SEND( pxQueue );#if ( configUSE_QUEUE_SETS == 1 )//如果启用了 configUSE_QUEUE_SETS{const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting;//将数据拷贝进队列缓冲区xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );if( pxQueue->pxQueueSetContainer != NULL )//如果这个队列属于某个队列集合{//覆盖写并且之前有数据,只是替换已有数据,不需要再通知队列集合if( ( xCopyPosition == queueOVERWRITE ) && ( uxPreviousMessagesWaiting != ( UBaseType_t ) 0 ) ){mtCOVERAGE_TEST_MARKER();}//通知队列集合else if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE ){queueYIELD_IF_USING_PREEMPTION();}...}else//如果这个队列不属于某个队列集合{//如果有任务在等待从队列接收数据(即 xTasksWaitingToReceive 不为空),唤醒一个任务if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ){queueYIELD_IF_USING_PREEMPTION();}...}else if( xYieldRequired != pdFALSE ){queueYIELD_IF_USING_PREEMPTION();}...}}#else /* configUSE_QUEUE_SETS */{//拷贝数据xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );//如果有任务等待该队列消息,就唤醒该任务if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ){queueYIELD_IF_USING_PREEMPTION();}...}else if( xYieldRequired != pdFALSE ){queueYIELD_IF_USING_PREEMPTION();}...}#endif /* configUSE_QUEUE_SETS */taskEXIT_CRITICAL(); //退出临界区traceRETURN_xQueueGenericSend( pdPASS );return pdPASS;}else//消息数量满了{if( xTicksToWait == ( TickType_t ) 0 )//不允许等{taskEXIT_CRITICAL();traceQUEUE_SEND_FAILED( pxQueue );traceRETURN_xQueueGenericSend( errQUEUE_FULL );return errQUEUE_FULL;}else if( xEntryTimeSet == pdFALSE )//允许等待{//初始化超时vTaskInternalSetTimeOutState( &xTimeOut );xEntryTimeSet = pdTRUE;}else{mtCOVERAGE_TEST_MARKER();}}}taskEXIT_CRITICAL();//挂起调度器并锁定队列vTaskSuspendAll();prvLockQueue( pxQueue );//检查是否超时if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ){if( prvIsQueueFull( pxQueue ) != pdFALSE ){ //如果未超时且仍满,则将当前任务加入发送等待列表traceBLOCKING_ON_QUEUE_SEND( pxQueue );vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );prvUnlockQueue( pxQueue );if( xTaskResumeAll() == pdFALSE ){taskYIELD_WITHIN_API();}}else//未超时队列未满{prvUnlockQueue( pxQueue );//解锁队列( void ) xTaskResumeAll();//恢复调度器}}else //超时了{prvUnlockQueue( pxQueue );//解锁队列( void ) xTaskResumeAll();//恢复调度器traceQUEUE_SEND_FAILED( pxQueue );traceRETURN_xQueueGenericSend( errQUEUE_FULL );return errQUEUE_FULL;}
}
prvCopyDataToQueue
指定位置拷贝数据进队列缓存区。
static BaseType_t prvCopyDataToQueue( Queue_t * const pxQueue,const void * pvItemToQueue,//要入队的数据const BaseType_t xPosition )//插入位置
{BaseType_t xReturn = pdFALSE;UBaseType_t uxMessagesWaiting;uxMessagesWaiting = pxQueue->uxMessagesWaiting;if( pxQueue->uxItemSize == ( UBaseType_t ) 0 )//判断是否是空队列(uxItemSize == 0){#if ( configUSE_MUTEXES == 1 ){if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )//如果是互斥锁{xReturn = xTaskPriorityDisinherit( pxQueue->u.xSemaphore.xMutexHolder );//解除优先级继承pxQueue->u.xSemaphore.xMutexHolder = NULL;//清除当前持有者}...}#endif /* configUSE_MUTEXES */}else if( xPosition == queueSEND_TO_BACK )//uxItemSize不为0,发送到队列尾部{//将数据从 pvItemToQueue 拷贝到队列的写入位置 pcWriteTo( void ) memcpy( ( void * ) pxQueue->pcWriteTo, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );pxQueue->pcWriteTo += pxQueue->uxItemSize;//写完后,pcWriteTo 指针偏移uxItemSizeif( pxQueue->pcWriteTo >= pxQueue->u.xQueue.pcTail ){pxQueue->pcWriteTo = pxQueue->pcHead;//如果写到了队列末尾,则绕回到队列头部(循环队列)}...}else //uxItemSize不为0,发送到队列头部 或 覆盖写{//数据被拷贝到 pcReadFrom 的当前位置(即下一个读取位置)( void ) memcpy( ( void * ) pxQueue->u.xQueue.pcReadFrom, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );pxQueue->u.xQueue.pcReadFrom -= pxQueue->uxItemSize;//将 pcReadFrom 指针向前偏移uxItemSizeif( pxQueue->u.xQueue.pcReadFrom < pxQueue->pcHead ){ //如果指针小于队列头部,则绕回队列末尾pxQueue->u.xQueue.pcReadFrom = ( pxQueue->u.xQueue.pcTail - pxQueue->uxItemSize );}...if( xPosition == queueOVERWRITE ){if( uxMessagesWaiting > ( UBaseType_t ) 0 ){//如果是 queueOVERWRITE 模式,表示这是替换已有元素,所以减少当前等待的消息数 uxMessagesWaiting(如果之前有数据的话)--uxMessagesWaiting;}...}...}//更新队列当前消息数pxQueue->uxMessagesWaiting = ( UBaseType_t ) ( uxMessagesWaiting + ( UBaseType_t ) 1 );return xReturn;
}
prvNotifyQueueSetContainer
static BaseType_t prvNotifyQueueSetContainer( const Queue_t * const pxQueue )
{Queue_t * pxQueueSetContainer = pxQueue->pxQueueSetContainer;//获取队列集合指针BaseType_t xReturn = pdFALSE;configASSERT( pxQueueSetContainer ); //确保集合存在configASSERT( pxQueueSetContainer->uxMessagesWaiting < pxQueueSetContainer->uxLength );//确保集合没满if( pxQueueSetContainer->uxMessagesWaiting < pxQueueSetContainer->uxLength ){const int8_t cTxLock = pxQueueSetContainer->cTxLock;//记录当前集合的发送锁状态traceQUEUE_SET_SEND( pxQueueSetContainer );xReturn = prvCopyDataToQueue( pxQueueSetContainer, &pxQueue, queueSEND_TO_BACK );//将当前队列加入集合if( cTxLock == queueUNLOCKED )//如果集合没有被锁定{ //检查是否要唤醒等待接收的任务if( listLIST_IS_EMPTY( &( pxQueueSetContainer->xTasksWaitingToReceive ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueueSetContainer->xTasksWaitingToReceive ) ) != pdFALSE ){xReturn = pdTRUE;}...}...}else{prvIncrementQueueTxLock( pxQueueSetContainer, cTxLock );//增加集合的锁计数器(防止并发访问冲突)}}...return xReturn;
}
vTaskInternalSetTimeOutState
void vTaskInternalSetTimeOutState( TimeOut_t * const pxTimeOut )
{traceENTER_vTaskInternalSetTimeOutState( pxTimeOut );pxTimeOut->xOverflowCount = xNumOfOverflows;// 溢出计数器(记录 tick 计数器回绕次数)pxTimeOut->xTimeOnEntering = xTickCount; // 开始等待的时间点(tick 值)traceRETURN_vTaskInternalSetTimeOutState();
}
关于定时器详细的信息后面再看。
vTaskSuspendAll
暂停调度器,防止任务切换 ,以确保在执行某些关键操作期间,系统状态不会被其他任务修改。
使用一个全局变量 uxSchedulerSuspended
来记录调度器被挂起的次数。
//gcc/arm-cm3
#define portASSERT_IF_IN_ISR() configASSERT( uxInterruptNesting == 0 )
#define portMEMORY_BARRIER() __asm volatile ( "" ::: "memory" )
"memory"
:告诉编译器这行代码会读写内存,不能对内存操作进行重排序
void vTaskSuspendAll( void )
{traceENTER_vTaskSuspendAll();#if ( configNUMBER_OF_CORES == 1 ){portSOFTWARE_BARRIER();//在某些模拟/仿真平台中,用来模拟“内存屏障”行为,防止编译器优化导致的指令重排序问题。uxSchedulerSuspended = ( UBaseType_t ) ( uxSchedulerSuspended + 1U );portMEMORY_BARRIER();}#else /* #if ( configNUMBER_OF_CORES == 1 ) */{UBaseType_t ulState;portASSERT_IF_IN_ISR(); // 确保不在中断中调用if( xSchedulerRunning != pdFALSE ){ulState = portSET_INTERRUPT_MASK(); // 关中断configASSERT( portGET_CRITICAL_NESTING_COUNT() == 0 ); // 确保不在临界区portSOFTWARE_BARRIER();portGET_TASK_LOCK(); // 获取任务锁(多核同步)if( uxSchedulerSuspended == 0U ){prvCheckForRunStateChange(); // 第一次挂起时可能要做一些检查}...portGET_ISR_LOCK();// 获取 ISR 锁++uxSchedulerSuspended;// 增加挂起计数portRELEASE_ISR_LOCK();// 释放 ISR 锁portCLEAR_INTERRUPT_MASK( ulState ); // 开中断}...}#endif /* #if ( configNUMBER_OF_CORES == 1 ) */traceRETURN_vTaskSuspendAll();
}
xTaskResumeAll
恢复任务调度器,唤醒等待中的任务,并处理挂起期间积压的 tick 和待调度任务
BaseType_t xTaskResumeAll( void )
{TCB_t * pxTCB = NULL;BaseType_t xAlreadyYielded = pdFALSE;traceENTER_xTaskResumeAll();#if ( configNUMBER_OF_CORES > 1 )if( xSchedulerRunning != pdFALSE )#endif{taskENTER_CRITICAL();{BaseType_t xCoreID;xCoreID = ( BaseType_t ) portGET_CORE_ID();configASSERT( uxSchedulerSuspended != 0U );uxSchedulerSuspended = ( UBaseType_t ) ( uxSchedulerSuspended - 1U );portRELEASE_TASK_LOCK();if( uxSchedulerSuspended == ( UBaseType_t ) 0U ){if( uxCurrentNumberOfTasks > ( UBaseType_t ) 0U ){ //处理挂起期间加入就绪列表的任务while( listLIST_IS_EMPTY( &xPendingReadyList ) == pdFALSE ){pxTCB = listGET_OWNER_OF_HEAD_ENTRY( ( &xPendingReadyList ) );listREMOVE_ITEM( &( pxTCB->xEventListItem ) );portMEMORY_BARRIER();listREMOVE_ITEM( &( pxTCB->xStateListItem ) );prvAddTaskToReadyList( pxTCB );#if ( configNUMBER_OF_CORES == 1 ){if( pxTCB->uxPriority > pxCurrentTCB->uxPriority ){xYieldPendings[ xCoreID ] = pdTRUE;}}}if( pxTCB != NULL ){ //更新下一个要解除阻塞的任务时间prvResetNextTaskUnblockTime();}{//处理挂起期间积累的 tick 数量TickType_t xPendedCounts = xPendedTicks; /* Non-volatile copy. */if( xPendedCounts > ( TickType_t ) 0U ){do{//每处理一个 tick,调用 xTaskIncrementTick() 检查是否有任务超时或就绪;//如果有高优先级任务就绪,标记调度需求if( xTaskIncrementTick() != pdFALSE ){xYieldPendings[ xCoreID ] = pdTRUE;}--xPendedCounts;} while( xPendedCounts > ( TickType_t ) 0U );xPendedTicks = 0;}}if( xYieldPendings[ xCoreID ] != pdFALSE ){ //如果需要调度,执行一次上下文切换#if ( configUSE_PREEMPTION != 0 ){xAlreadyYielded = pdTRUE;}#endif /* #if ( configUSE_PREEMPTION != 0 ) */#if ( configNUMBER_OF_CORES == 1 ){taskYIELD_TASK_CORE_IF_USING_PREEMPTION( pxCurrentTCB );}#endif /* #if ( configNUMBER_OF_CORES == 1 ) */}}}}taskEXIT_CRITICAL();}traceRETURN_xTaskResumeAll( xAlreadyYielded );return xAlreadyYielded;
}
prvLockQueue
进入临界区并标记队列为“已锁定”,确保当前上下文独占访问队列,防止其他任务或中断修改队列状态。
#define prvLockQueue( pxQueue ) \taskENTER_CRITICAL(); \{ \if( ( pxQueue )->cRxLock == queueUNLOCKED ) \{ \( pxQueue )->cRxLock = queueLOCKED_UNMODIFIED; \} \if( ( pxQueue )->cTxLock == queueUNLOCKED ) \{ \( pxQueue )->cTxLock = queueLOCKED_UNMODIFIED; \} \} \taskEXIT_CRITICAL()
prvUnlockQueue
释放队列的发送锁 (cTxLock) 和接收锁 (cRxLock),并唤醒等待在该队列上的任务(如果有)。
void vTaskMissedYield( void )
{traceENTER_vTaskMissedYield();xYieldPendings[ portGET_CORE_ID() ] = pdTRUE;traceRETURN_vTaskMissedYield();
}static void prvUnlockQueue( Queue_t * const pxQueue )
{taskENTER_CRITICAL();{int8_t cTxLock = pxQueue->cTxLock;while( cTxLock > queueLOCKED_UNMODIFIED )//在此期间有嵌套操作(比如中断中再次写入队列),循环处理这些嵌套操作,直到减到 queueLOCKED_UNMODIFIED;{#if ( configUSE_QUEUE_SETS == 1 ){if( pxQueue->pxQueueSetContainer != NULL ){//通知集合//集合可能会唤醒等待它的任务if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE ){vTaskMissedYield();}...}else{if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ){if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ){vTaskMissedYield();//如果唤醒的是优先级更高的任务,标记需要调度}...}else{break;}}}#else /* configUSE_QUEUE_SETS */{//不为空的话,唤醒任务,vTaskMissedYield}#endif /* configUSE_QUEUE_SETS */--cTxLock;}pxQueue->cTxLock = queueUNLOCKED;}taskEXIT_CRITICAL();//对cRxLock 做同样的操作
}
prvIncrementQueueTxLock
没啥好说的,递增队列的发送锁
#define prvIncrementQueueTxLock( pxQueue, cTxLock ) \do { \const UBaseType_t uxNumberOfTasks = uxTaskGetNumberOfTasks(); \if( ( UBaseType_t ) ( cTxLock ) < uxNumberOfTasks ) \{ \configASSERT( ( cTxLock ) != queueINT8_MAX ); \( pxQueue )->cTxLock = ( int8_t ) ( ( cTxLock ) + ( int8_t ) 1 ); \} \} while( 0 )
vTaskPlaceOnEventList
将当前任务加入指定的事件列表(如 xTasksWaitingToReceive 或 xTasksWaitingToSend),并把它添加到延时列表中,使其进入阻塞状态。
void vTaskPlaceOnEventList( List_t * const pxEventList,const TickType_t xTicksToWait )
{traceENTER_vTaskPlaceOnEventList( pxEventList, xTicksToWait );configASSERT( pxEventList );vListInsert( pxEventList, &( pxCurrentTCB->xEventListItem ) );//插入后,任务会进入阻塞状态,不再参与调度prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );//会把当前任务加入系统延时列表traceRETURN_vTaskPlaceOnEventList();
}
prvAddCurrentTaskToDelayedList
当前任务从就绪列表中移除,并根据设定的等待时间,将其插入到延时列表中,从而让任务进入阻塞状态。
static void prvAddCurrentTaskToDelayedList( TickType_t xTicksToWait,const BaseType_t xCanBlockIndefinitely )
{TickType_t xTimeToWake;const TickType_t xConstTickCount = xTickCount;List_t * const pxDelayedList = pxDelayedTaskList;List_t * const pxOverflowDelayedList = pxOverflowDelayedTaskList;#if ( INCLUDE_xTaskAbortDelay == 1 ){ //清除“延迟被中断”标志pxCurrentTCB->ucDelayAborted = ( uint8_t ) pdFALSE;}#endif//从就绪列表中移除当前任务if( uxListRemove( &( pxCurrentTCB->xStateListItem ) ) == ( UBaseType_t ) 0 ){ //如果这个优先级上没有其他任务了(即返回值为 0),就更新最高就绪优先级portRESET_READY_PRIORITY( pxCurrentTCB->uxPriority, uxTopReadyPriority );}#if ( INCLUDE_vTaskSuspend == 1 ){if( ( xTicksToWait == portMAX_DELAY ) && ( xCanBlockIndefinitely != pdFALSE ) )//无限期等待{ //将任务加入 挂起任务列表listINSERT_END( &xSuspendedTaskList, &( pxCurrentTCB->xStateListItem ) );}else{xTimeToWake = xConstTickCount + xTicksToWait;//计算唤醒时间点listSET_LIST_ITEM_VALUE( &( pxCurrentTCB->xStateListItem ), xTimeToWake );//设置任务状态项的值为唤醒时间if( xTimeToWake < xConstTickCount )//判断是否发生 tick 溢出{ //如果溢出就插入 pxOverflowDelayedListtraceMOVED_TASK_TO_OVERFLOW_DELAYED_LIST();vListInsert( pxOverflowDelayedList, &( pxCurrentTCB->xStateListItem ) );}else{//插入正常的 pxDelayedListtraceMOVED_TASK_TO_DELAYED_LIST();vListInsert( pxDelayedList, &( pxCurrentTCB->xStateListItem ) );//如果该任务是下一个要解除阻塞的任务,更新全局变量 xNextTaskUnblockTimeif( xTimeToWake < xNextTaskUnblockTime ){xNextTaskUnblockTime = xTimeToWake;}}}}#else /* INCLUDE_vTaskSuspend */{xTimeToWake = xConstTickCount + xTicksToWait;计算唤醒时间点listSET_LIST_ITEM_VALUE( &( pxCurrentTCB->xStateListItem ), xTimeToWake );//设置任务状态项的值为唤醒时间//检查xTimeToWake 是否溢出if( xTimeToWake < xConstTickCount ){traceMOVED_TASK_TO_OVERFLOW_DELAYED_LIST();vListInsert( pxOverflowDelayedList, &( pxCurrentTCB->xStateListItem ) );}else{traceMOVED_TASK_TO_DELAYED_LIST();vListInsert( pxDelayedList, &( pxCurrentTCB->xStateListItem ) );if( xTimeToWake < xNextTaskUnblockTime ){xNextTaskUnblockTime = xTimeToWake;}}( void ) xCanBlockIndefinitely;}#endif /* INCLUDE_vTaskSuspend */
}
xQueueGenericSendFromISR
与xQueueGenericSend的区别:
- 不可阻塞
- 需要手动处理调度请求