FreeRTOS “探究任务调度机制魅力”
引入
现如今随着单片机的资源越来越多,主频越来越高,在面临更复杂的功能实现以及对MCU性能的充分压榨,会RTOS已经成为一个必要的技能,新手刚开始学习的时候就很好奇“为什么代码可以放到两个循环里同时运行?”。接下来我将简要的介绍一下FreeRTOS的任务是怎么调度的(基于 FreeRTOS Kernel V10.5.1)。
一、装载任务的火车“列表(链表)”
列表是一个很基础的数据结构,它像一列火车,火车车厢就是列表项(节点),在FreeRTOS中,列表项的定义如下:
struct xLIST_ITEM
{listFIRST_LIST_ITEM_INTEGRITY_CHECK_VALUE /*< Set to a known value if configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES is set to 1. */configLIST_VOLATILE TickType_t xItemValue; /*< The value being listed. In most cases this is used to sort the list in ascending order. */struct xLIST_ITEM * configLIST_VOLATILE pxNext; /*< Pointer to the next ListItem_t in the list. */struct xLIST_ITEM * configLIST_VOLATILE pxPrevious; /*< Pointer to the previous ListItem_t in the list. */void * pvOwner; /*< Pointer to the object (normally a TCB) that contains the list item. There is therefore a two way link between the object containing the list item and the list item itself. */struct xLIST * configLIST_VOLATILE pxContainer; /*< Pointer to the list in which this list item is placed (if any). */listSECOND_LIST_ITEM_INTEGRITY_CHECK_VALUE /*< Set to a known value if configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES is set to 1. */
};
typedef struct xLIST_ITEM ListItem_t; /* For some reason lint wants this as two separate definitions. */
在这里,xItemValue 值得是列表项的值,多用与按照升序对列表项进行排序。pxNext 指向下一个列表项。pxPrevious 指向前一个列表项。pvOwner 列表项的拥有者(通常指向包含该列表的任务的TCB),常用来快速查看TCB。pxContainer 指向该列表项所在的列表。listFIRST_LIST_ITEM_INTEGRITY_CHECK_VALUE和 listSECOND_LIST_ITEM_INTEGRITY_CHECK_VALUE则是一个空白的宏,编译以后不存在。
列表则定义在以下:
struct xMINI_LIST_ITEM{listFIRST_LIST_ITEM_INTEGRITY_CHECK_VALUE /*< Set to a known value if configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES is set to 1. */configLIST_VOLATILE TickType_t xItemValue;struct xLIST_ITEM * configLIST_VOLATILE pxNext;struct xLIST_ITEM * configLIST_VOLATILE pxPrevious;};typedef struct xMINI_LIST_ITEM MiniListItem_t;typedef struct xLIST
{listFIRST_LIST_INTEGRITY_CHECK_VALUE /*< Set to a known value if configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES is set to 1. */volatile UBaseType_t uxNumberOfItems;ListItem_t * configLIST_VOLATILE pxIndex; /*< Used to walk through the list. Points to the last item returned by a call to listGET_OWNER_OF_NEXT_ENTRY (). */MiniListItem_t xListEnd; /*< List item that contains the maximum possible item value meaning it is always at the end of the list and is therefore used as a marker. */listSECOND_LIST_INTEGRITY_CHECK_VALUE /*< Set to a known value if configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES is set to 1. */
} List_t;
uxNumberOfItems 是该列表列表项的数目(不包含xListEnd)。pxIndex 是用于指向某一个列表项,常用于遍历列表所用。xListEnd 是迷你列表项,在这里的类型是 xLIST_ITEM,迷你列表项的值一般设置为最大值,以便在排序时排在末尾,同时,xListEnd也用于挂载插入到其他列表中的列表项,说人话就是xListEnd就是链表中的哨兵节点,它的下一个列表项就是表头,前一个就是表尾。
因此,在FreeRTOS中的列表是一个带头双向环形列表,简化出来就是:
通过这种机制可以及时快速的访问到自己想要的列表项节点。
至于链表的增删查改等内容在这里便不再做过多赘述,相反的,这种双向链表的增删查改比单项不带头链表的操作简单的多,在RTOS的文件里也就寥寥一百多行。
二、RTOS中的几个列表
1.就绪列表
处于正常调度状态的列表项会被放到就绪列表。
· 在创建任务以后,新创建的任务在未开启调度器之前是被添加到就绪列表的,在源代码中找到如下:
#define prvAddTaskToReadyList( pxTCB ) \traceMOVED_TASK_TO_READY_STATE( pxTCB ); \taskRECORD_READY_PRIORITY( ( pxTCB )->uxPriority ); \listINSERT_END( &( pxReadyTasksLists[ ( pxTCB )->uxPriority ] ), &( ( pxTCB )->xStateListItem ) ); \tracePOST_MOVED_TASK_TO_READY_STATE( pxTCB )
其中 listINSERT_END( &( pxReadyTasksLists[ ( pxTCB )->uxPriority ] ), &( ( pxTCB )->xStateListItem ) ); 代表将 pxTCB ->xStateListItem 指向pxReadyTasksLists 的 pxTCB ->uxPriority 的列表的位置。函数实现如下:
#define listINSERT_END( pxList, pxNewListItem ) \{ \ListItem_t * const pxIndex = ( pxList )->pxIndex; \\/* Only effective when configASSERT() is also defined, these tests may catch \* the list data structures being overwritten in memory. They will not catch \* data errors caused by incorrect configuration or use of FreeRTOS. */ \listTEST_LIST_INTEGRITY( ( pxList ) ); \listTEST_LIST_ITEM_INTEGRITY( ( pxNewListItem ) ); \\/* Insert a new list item into ( pxList ), but rather than sort the list, \* makes the new list item the last item to be removed by a call to \* listGET_OWNER_OF_NEXT_ENTRY(). */ \( pxNewListItem )->pxNext = pxIndex; \( pxNewListItem )->pxPrevious = pxIndex->pxPrevious; \\pxIndex->pxPrevious->pxNext = ( pxNewListItem ); \pxIndex->pxPrevious = ( pxNewListItem ); \\/* Remember which list the item is in. */ \( pxNewListItem )->pxContainer = ( pxList ); \\( ( pxList )->uxNumberOfItems )++; \}
其中,pxReadyTasksLists是一个全局变量,原型如下:
PRIVILEGED_DATA static List_t pxReadyTasksLists[ configMAX_PRIORITIES ]; /*< Prioritised ready tasks. */
它的成员数量就是我们设置的RTOS支持的最大优先级(使用计算前导零指令的值应该不超过32)。从0到configMAX_PRIORITIES - 1 储存着0到configMAX_PRIORITIES - 1的优先级的列表。同时搭配 uxTopReadyPriority 可以快速定位到最高优先级的那个队列。uxTopReadyPriority是一个32位无符号整数,如果使用计算前导零指令,则不能将configMAX_PRIORITIES设置的大于32,的原因就在于此。(计算前导零指令是用来计算一个数在二进制的条件下,从最高位向最低位数起,连续0的个数,例如 0000 1011 1111 1111 0000 1111 1111 0000最后计算出的值就是4)。
2.阻塞列表
当任务遇到阻塞状态,如延时,等待某一个事件的时候,任务会被插入到阻塞列表
3.挂起列表
当任务被vTsakSuspend挂起时,会被添加到挂起列表
4.事件列表
当任务处于某个等待事件的时候,会被添加到该列表
其中,任务的TCB中只包含时间列表和状态列表(就绪,挂起,阻塞),这样做可以减少空间的占用,同时任务也不可能同时处于就绪,挂起,阻塞这几种状态。
三、任务调度的决策者
处于就绪列表中的任务要进行调度,那就肯定要有一个调度的时机,多久调度一次,调度哪一个任务都有讲究。
FreeRTOS中,任务调度由两个中断完成,系统滴答定时器中断,和PendSV中断。
系统滴答定时器中断负责处理调度的时机和给系统提供时基,PendSV中断则负责处理切换的上下文。
1.系统滴答定时器中断
void xPortSysTickHandler( void )
{/* The SysTick runs at the lowest interrupt priority, so when this interrupt* executes all interrupts must be unmasked. There is therefore no need to* save and then restore the interrupt mask value as its value is already* known - therefore the slightly faster vPortRaiseBASEPRI() function is used* in place of portSET_INTERRUPT_MASK_FROM_ISR(). */vPortRaiseBASEPRI();{/* Increment the RTOS tick. */if( xTaskIncrementTick() != pdFALSE ){/* A context switch is required. Context switching is performed in* the PendSV interrupt. Pend the PendSV interrupt. */portNVIC_INT_CTRL_REG = portNVIC_PENDSVSET_BIT;}}vPortClearBASEPRIFromISR();
}
xTaskIncrementTick的原型如下:
BaseType_t xTaskIncrementTick( void )
{TCB_t * pxTCB;TickType_t xItemValue;BaseType_t xSwitchRequired = pdFALSE;/* Called by the portable layer each time a tick interrupt occurs.* Increments the tick then checks to see if the new tick value will cause any* tasks to be unblocked. */traceTASK_INCREMENT_TICK( xTickCount );if( uxSchedulerSuspended == ( UBaseType_t ) pdFALSE ){/* Minor optimisation. The tick count cannot change in this* block. */const TickType_t xConstTickCount = xTickCount + ( TickType_t ) 1;/* Increment the RTOS tick, switching the delayed and overflowed* delayed lists if it wraps to 0. */xTickCount = xConstTickCount;if( xConstTickCount == ( TickType_t ) 0U ) /*lint !e774 'if' does not always evaluate to false as it is looking for an overflow. */{taskSWITCH_DELAYED_LISTS();}else{mtCOVERAGE_TEST_MARKER();}/* See if this tick has made a timeout expire. Tasks are stored in* the queue in the order of their wake time - meaning once one task* has been found whose block time has not expired there is no need to* look any further down the list. */if( xConstTickCount >= xNextTaskUnblockTime ){for( ; ; ){if( listLIST_IS_EMPTY( pxDelayedTaskList ) != pdFALSE ){/* The delayed list is empty. Set xNextTaskUnblockTime* to the maximum possible value so it is extremely* unlikely that the* if( xTickCount >= xNextTaskUnblockTime ) test will pass* next time through. */xNextTaskUnblockTime = portMAX_DELAY; /*lint !e961 MISRA exception as the casts are only redundant for some ports. */break;}else{/* The delayed list is not empty, get the value of the* item at the head of the delayed list. This is the time* at which the task at the head of the delayed list must* be removed from the Blocked state. */pxTCB = listGET_OWNER_OF_HEAD_ENTRY( pxDelayedTaskList ); /*lint !e9079 void * is used as this macro is used with timers and co-routines too. Alignment is known to be fine as the type of the pointer stored and retrieved is the same. */xItemValue = listGET_LIST_ITEM_VALUE( &( pxTCB->xStateListItem ) );if( xConstTickCount < xItemValue ){/* It is not time to unblock this item yet, but the* item value is the time at which the task at the head* of the blocked list must be removed from the Blocked* state - so record the item value in* xNextTaskUnblockTime. */xNextTaskUnblockTime = xItemValue;break; /*lint !e9011 Code structure here is deemed easier to understand with multiple breaks. */}else{mtCOVERAGE_TEST_MARKER();}/* It is time to remove the item from the Blocked state. */listREMOVE_ITEM( &( pxTCB->xStateListItem ) );/* Is the task waiting on an event also? If so remove* it from the event list. */if( listLIST_ITEM_CONTAINER( &( pxTCB->xEventListItem ) ) != NULL ){listREMOVE_ITEM( &( pxTCB->xEventListItem ) );}else{mtCOVERAGE_TEST_MARKER();}/* Place the unblocked task into the appropriate ready* list. */prvAddTaskToReadyList( pxTCB );/* A task being unblocked cannot cause an immediate* context switch if preemption is turned off. */#if ( configUSE_PREEMPTION == 1 ){/* Preemption is on, but a context switch should* only be performed if the unblocked task's* priority is higher than the currently executing* task.* The case of equal priority tasks sharing* processing time (which happens when both* preemption and time slicing are on) is* handled below.*/if( pxTCB->uxPriority > pxCurrentTCB->uxPriority ){xSwitchRequired = pdTRUE;}else{mtCOVERAGE_TEST_MARKER();}}#endif /* configUSE_PREEMPTION */}}}/* Tasks of equal priority to the currently running task will share* processing time (time slice) if preemption is on, and the application* writer has not explicitly turned time slicing off. */#if ( ( configUSE_PREEMPTION == 1 ) && ( configUSE_TIME_SLICING == 1 ) ){if( listCURRENT_LIST_LENGTH( &( pxReadyTasksLists[ pxCurrentTCB->uxPriority ] ) ) > ( UBaseType_t ) 1 ){xSwitchRequired = pdTRUE;}else{mtCOVERAGE_TEST_MARKER();}}#endif /* ( ( configUSE_PREEMPTION == 1 ) && ( configUSE_TIME_SLICING == 1 ) ) */#if ( configUSE_TICK_HOOK == 1 ){/* Guard against the tick hook being called when the pended tick* count is being unwound (when the scheduler is being unlocked). */if( xPendedTicks == ( TickType_t ) 0 ){vApplicationTickHook();}else{mtCOVERAGE_TEST_MARKER();}}#endif /* configUSE_TICK_HOOK */#if ( configUSE_PREEMPTION == 1 ){if( xYieldPending != pdFALSE ){xSwitchRequired = pdTRUE;}else{mtCOVERAGE_TEST_MARKER();}}#endif /* configUSE_PREEMPTION */}else{++xPendedTicks;/* The tick hook gets called at regular intervals, even if the* scheduler is locked. */#if ( configUSE_TICK_HOOK == 1 ){vApplicationTickHook();}#endif}return xSwitchRequired;
}
大致做了以下几个步骤:
1.在任务调度器未被挂起的条件下更新时基。若发生溢出,则调换溢出阻塞列表和阻塞列表(这种机制完美解决了溢出以后任务调度的问题)。
2.如果当前时基大于等于下一个任务的阻塞到期时间,那么就要进行任务切换。
a.查看阻塞列表是否为空,为空则退出。
b.得到当前阻塞列表的表头列表项,并得到他的阻塞时间到期的值,如果当前的时基值小于它,那么久更新下一个任务的阻塞到期时间为表头列表项的时间并退出。
c.倘若以上都未退出,则证明这个任务阻塞到期,将他从阻塞列表中移除,倘若该列表正在等待事件,则从事件列表中移除,最后插入到就绪列表。
d.倘若插入的任务列表优先级大于当前的任务优先级,那么将返回值设置为pdTRUE(使能抢占式调度)
3.如果就绪列表不为空,那么将返回值设为pdTRUE
4.如果xYieldPending不为pdFALSE(延时任务切换)那么设定返回值为pdTRUE
当返回值为pdTRUE的时候则会执行 “portNVIC_INT_CTRL_REG = portNVIC_PENDSVSET_BIT;”挂起PendSV中断,执行任务切换。
总的来说就是当出现任务阻塞时间到期,或者使能时间片调度,或者xYieldPending
为pdTRUE就会触发任务切换。
2.PendSV中断
PendSV中断中采用汇编代码编写,对于了解任务调度来书,知道他做了什么即可:
__asm void xPortPendSVHandler( void )
{extern uxCriticalNesting;extern pxCurrentTCB;extern vTaskSwitchContext;/* *INDENT-OFF* */PRESERVE8mrs r0, pspisbldr r3, =pxCurrentTCB /* Get the location of the current TCB. */ldr r2, [ r3 ]stmdb r0 !, { r4 - r11 } /* Save the remaining registers. */str r0, [ r2 ] /* Save the new top of stack into the first member of the TCB. */stmdb sp !, { r3, r14 }mov r0, #configMAX_SYSCALL_INTERRUPT_PRIORITYmsr basepri, r0dsbisbbl vTaskSwitchContextmov r0, #0msr basepri, r0ldmia sp !, { r3, r14 }ldr r1, [ r3 ]ldr r0, [ r1 ] /* The first item in pxCurrentTCB is the task top of stack. */ldmia r0 !, { r4 - r11 } /* Pop the registers and the critical nesting count. */msr psp, r0isbbx r14nop
/* *INDENT-ON* */
}
代码主要分两部分:
1.保存当前任务上下文
2.恢复调度的任务上下文
由于CortexM3内核寄存器的R4到R11是需要手动保存的,所以也在代码中体现出来
mrs r0, pspisbldr r3, =pxCurrentTCB /* Get the location of the current TCB. */ldr r2, [ r3 ]stmdb r0 !, { r4 - r11 } /* Save the remaining registers. */str r0, [ r2 ] /* Save the new top of stack into the first member of the TCB. */
表示保存当前任务上下文
mrs r0, psp 获取栈顶指针到r0ldr r3, =pxCurrentTCB 获取当前任务控制块的地址(二级指针)到r3ldr r2, [ r3 ] 得到当前任务控制块到r2(一级指针)stmdb r0 !, { r4 - r11 } 依次将r0到r11的数据入栈,并更新r0str r0, [ r2 ] 将新的栈顶指针赋值给r2,即TCB中的volatile StackType_t * pxTopOfStack 成员。
stmdb sp !, { r3, r14 }mov r0, #configMAX_SYSCALL_INTERRUPT_PRIORITYmsr basepri, r0dsbisbbl vTaskSwitchContextmov r0, #0msr basepri, r0ldmia sp !, { r3, r14 }ldr r1, [ r3 ]ldr r0, [ r1 ] /* The first item in pxCurrentTCB is the task top of stack. */ldmia r0 !, { r4 - r11 } /* Pop the registers and the critical nesting count. */msr psp, r0isbbx r14
表示恢复任务上下文
stmdb sp !, { r3, r14 } 保存r3,r14的值到栈mov r0, #configMAX_SYSCALL_INTERRUPT_PRIORITY
msr basepri, r0 关中断dsb
isb 确保操作已经完成bl vTaskSwitchContext 选择下一个任务mov r0, #0
msr basepri, r0 开中断ldmia sp !, { r3, r14 } 从栈中恢复r3,r14ldr r1, [ r3 ] pxCurrentTCB的地址给r1(pxCurrentTCB在vTaskSwitchContext 函数被改变)
ldr r0, [ r1 ] pxCurrentTCB给r0 此时r0为volatile StackType_t * pxTopOfStack ldmia r0 !, { r4 - r11 } 将r0的值出栈到r4到r11,即任务的恢复msr psp, r0 将r0的值给psp,恢复栈顶指针bx r14 退出
在此期间,vTaskSwitchContext为寻找下一个任务,函数原型如下:
void vTaskSwitchContext( void )
{if( uxSchedulerSuspended != ( UBaseType_t ) pdFALSE ){/* The scheduler is currently suspended - do not allow a context* switch. */xYieldPending = pdTRUE;}else{xYieldPending = pdFALSE;traceTASK_SWITCHED_OUT();#if ( configGENERATE_RUN_TIME_STATS == 1 ){#ifdef portALT_GET_RUN_TIME_COUNTER_VALUEportALT_GET_RUN_TIME_COUNTER_VALUE( ulTotalRunTime );#elseulTotalRunTime = portGET_RUN_TIME_COUNTER_VALUE();#endif/* Add the amount of time the task has been running to the* accumulated time so far. The time the task started running was* stored in ulTaskSwitchedInTime. Note that there is no overflow* protection here so count values are only valid until the timer* overflows. The guard against negative values is to protect* against suspect run time stat counter implementations - which* are provided by the application, not the kernel. */if( ulTotalRunTime > ulTaskSwitchedInTime ){pxCurrentTCB->ulRunTimeCounter += ( ulTotalRunTime - ulTaskSwitchedInTime );}else{mtCOVERAGE_TEST_MARKER();}ulTaskSwitchedInTime = ulTotalRunTime;}#endif /* configGENERATE_RUN_TIME_STATS *//* Check for stack overflow, if configured. */taskCHECK_FOR_STACK_OVERFLOW();/* Before the currently running task is switched out, save its errno. */#if ( configUSE_POSIX_ERRNO == 1 ){pxCurrentTCB->iTaskErrno = FreeRTOS_errno;}#endif/* Select a new task to run using either the generic C or port* optimised asm code. */taskSELECT_HIGHEST_PRIORITY_TASK(); /*lint !e9079 void * is used as this macro is used with timers and co-routines too. Alignment is known to be fine as the type of the pointer stored and retrieved is the same. */traceTASK_SWITCHED_IN();/* After the new task is switched in, update the global errno. */#if ( configUSE_POSIX_ERRNO == 1 ){FreeRTOS_errno = pxCurrentTCB->iTaskErrno;}#endif#if ( ( configUSE_NEWLIB_REENTRANT == 1 ) || ( configUSE_C_RUNTIME_TLS_SUPPORT == 1 ) ){/* Switch C-Runtime's TLS Block to point to the TLS* Block specific to this task. */configSET_TLS_BLOCK( pxCurrentTCB->xTLSBlock );}#endif}
}
但是关键在于函数“taskSELECT_HIGHEST_PRIORITY_TASK()”,其他都是预编译条件决定。taskSELECT_HIGHEST_PRIORITY_TASK的原型如下:
#define taskSELECT_HIGHEST_PRIORITY_TASK() \{ \UBaseType_t uxTopPriority; \\/* Find the highest priority list that contains ready tasks. */ \portGET_HIGHEST_PRIORITY( uxTopPriority, uxTopReadyPriority ); \configASSERT( listCURRENT_LIST_LENGTH( &( pxReadyTasksLists[ uxTopPriority ] ) ) > 0 ); \listGET_OWNER_OF_NEXT_ENTRY( pxCurrentTCB, &( pxReadyTasksLists[ uxTopPriority ] ) ); \} /* taskSELECT_HIGHEST_PRIORITY_TASK() */
portGET_HIGHEST_PRIORITY表示得到当前的最高优先级,通过特殊算法或者计算前导零指令实现。
listGET_OWNER_OF_NEXT_ENTRY表示获取下一个列表的拥有者,原型如下:
#define listGET_OWNER_OF_NEXT_ENTRY( pxTCB, pxList ) \{ \List_t * const pxConstList = ( pxList ); \/* Increment the index to the next item and return the item, ensuring */ \/* we don't return the marker used at the end of the list. */ \( pxConstList )->pxIndex = ( pxConstList )->pxIndex->pxNext; \if( ( void * ) ( pxConstList )->pxIndex == ( void * ) &( ( pxConstList )->xListEnd ) ) \{ \( pxConstList )->pxIndex = ( pxConstList )->pxIndex->pxNext; \} \( pxTCB ) = ( pxConstList )->pxIndex->pvOwner; \}
以上代码用一句话来说,就是将pxCurrentTCB 指向pxReadyTasksLists(就绪列表)中优先级最高的那一个任务控制块。因为pxCurrentTCB是一个任务控制块的一级指针,定义如下:
portDONT_DISCARD PRIVILEGED_DATA TCB_t * volatile pxCurrentTCB = NULL;
因此PendSV函数可以通过操作pxCurrentTCB以及寄存器,来间接保存和恢复任务上下文。
值得注意的是,就绪列表是按照如下方式组成的:
因此在这里也解答了问什么通过计算前导零指令可以快速获得就绪列表中最高优先级的任务了。