以太网全双工通信的异步模型:基于队列、信号量与任务的收 / 发包标准实现
以下是参考一些例子用AI做的总结,做简单的参考:
在嵌入式以太网应用中,全双工通信(同时收发数据)的高效性与稳定性是核心需求。采用 “队列 + 信号量 + 多任务” 的异步架构,可实现接收、发送流程的解耦,避免 CPU 资源冲突、CPU 空耗及实时性不足等问题。本文以以太网全双工通信为场景,结合 RTOS(uC/OS-III 为例)详细阐述收包、发包的标准实现,包括组件设计、任务划分及代码逻辑,提供可复用的工程模型。
一、全双工异步通信的核心设计逻辑
以太网全双工通信中,接收(从网络获取数据)与发送(向网络传输数据)是独立流程,但需共享硬件资源(如 MAC 控制器)和系统资源(如 CPU)。同步处理(如在中断中直接处理数据或单任务轮询)会导致三类问题:
- 实时性冲突:收包中断被长耗时的发送操作阻塞,导致数据溢出;
- 资源浪费:单任务轮询收发状态,无数据时仍占用 CPU;
- 逻辑混乱:收发流程耦合,难以维护和扩展(如增加协议解析模块)。
异步模型通过 “分层隔离” 解决上述问题,核心逻辑为:
- 硬件交互层:中断服务程序(ISR)处理底层数据读写,仅做 “数据搬运”;
- 缓冲层:用独立队列分别缓存收 / 发数据,隔离速率差异;
- 调度层:用信号量触发收 / 发任务,实现 “有数据才处理”;
- 业务层:收 / 发任务分别处理协议解析、数据生产,与硬件解耦。
二、组件数量与功能定义
全双工模型需严格区分接收与发送通道,避免组件复用导致的优先级混乱,具体定义如下:
组件类型 | 数量 | 核心功能 | 设计依据 |
---|---|---|---|
消息队列 | 2 个 | 1 个收包队列(存储接收到的以太网帧)+ 1 个发包队列(存储待发送的以太网帧) | 独立队列确保收发流程隔离,避免数据混叠;队列深度根据带宽需求配置(如各 10~20 项)。 |
二进制信号量 | 2 个 | 1 个收包信号量(收队列有数据时唤醒收包任务)+ 1 个发包信号量(发队列有数据时唤醒发送任务) | 二进制信号量仅标识 “有无数据”,避免计数溢出;初始值均为 0(无数据时阻塞任务)。 |
任务 | 3 + 个 | 1 个收包任务(处理接收数据)+ 1 个发包任务(处理发送数据)+ N 个业务任务(生产 / 消费数据,N≥1) | 收 / 发任务独立运行,优先级高于业务任务但低于硬件中断,确保实时性;业务任务数量根据功能模块划分(如传感器采集、协议解析)。 |
三、标准实现流程(以 uC/OS-III 为例)
1. 数据结构定义(统一收发格式)
定义以太网帧通用结构,包含硬件交互所需的元信息,收 / 发任务共用该结构:
#include "os.h"
#include "enet.h" // 硬件层:MAC/PHY驱动接口(如ENET_Init、ENET_Send、ENET_Recv)// 以太网帧结构体(兼容IEEE 802.3标准)
typedef struct {uint8_t *buf; // 帧数据缓冲区(包含MAC头、 payload、FCS)uint32_t len; // 帧长度(64~1518字节,符合以太网规范)uint8_t port; // 接收/发送端口(多端口场景用,单端口可省略)bool is_dynamic; // 标记缓冲区是否动态分配(用于释放内存)
} EthFrame_t;// 接收通道组件
#define ETH_RX_QUEUE_DEPTH 10
OS_Q g_eth_rx_queue; // 收包队列
OS_SEM g_eth_rx_sem; // 收包信号量(初始0)
OS_TCB g_eth_rx_task_tcb;
CPU_STK g_eth_rx_task_stack[1024];// 发送通道组件
#define ETH_TX_QUEUE_DEPTH 10
OS_Q g_eth_tx_queue; // 发包队列
OS_SEM g_eth_tx_sem; // 发包信号量(初始0)
OS_TCB g_eth_tx_task_tcb;
CPU_STK g_eth_tx_task_stack[1024];// 业务层示例:协议解析任务(消费收包数据)
OS_TCB g_proto_parse_task_tcb;
CPU_STK g_proto_parse_task_stack[1024];
2. 组件初始化(系统启动阶段)
初始化需在 RTOS 调度器启动前完成,确保所有队列、信号量、任务处于可用状态:
/*** @brief 以太网全双工组件初始化* @return OS_ERR:初始化结果(OS_ERR_NONE为成功)*/
OS_ERR eth_full_duplex_init(void) {OS_ERR err;// 1. 初始化接收队列与信号量OSQCreate(&g_eth_rx_queue, "ETH_RX_QUEUE", ETH_RX_QUEUE_DEPTH, &err);if (err != OS_ERR_NONE) return err;OSSemCreate(&g_eth_rx_sem, "ETH_RX_SEM", 0, &err);if (err != OS_ERR_NONE) return err;// 2. 初始化发送队列与信号量OSQCreate(&g_eth_tx_queue, "ETH_TX_QUEUE", ETH_TX_QUEUE_DEPTH, &err);if (err != OS_ERR_NONE) return err;OSSemCreate(&g_eth_tx_sem, "ETH_TX_SEM", 0, &err);if (err != OS_ERR_NONE) return err;// 3. 创建收包任务(优先级高于业务任务,低于中断)OSTaskCreate(&g_eth_rx_task_tcb, "ETH_RX_TASK", eth_rx_task, NULL,7, // 收包任务优先级(高于发任务,避免漏包)&g_eth_rx_task_stack[0], g_eth_rx_task_stack[1024/10], 1024,0, 0, NULL, OS_OPT_TASK_STK_CHK | OS_OPT_TASK_STK_CLR, &err);if (err != OS_ERR_NONE) return err;// 4. 创建发包任务OSTaskCreate(&g_eth_tx_task_tcb, "ETH_TX_TASK", eth_tx_task, NULL,8, // 发任务优先级低于收任务&g_eth_tx_task_stack[0], g_eth_tx_task_stack[1024/10], 1024,0, 0, NULL, OS_OPT_TASK_STK_CHK | OS_OPT_TASK_STK_CLR, &err);if (err != OS_ERR_NONE) return err;// 5. 创建业务任务(示例:协议解析)OSTaskCreate(&g_proto_parse_task_tcb, "PROTO_PARSE_TASK", proto_parse_task, NULL,9, // 业务任务优先级最低&g_proto_parse_task_stack[0], g_proto_parse_task_stack[1024/10], 1024,0, 0, NULL, OS_OPT_TASK_STK_CHK | OS_OPT_TASK_STK_CLR, &err);// 6. 初始化以太网硬件(使能中断,配置全双工模式)ENET_InitTypeDef enet_cfg = {.mode = ENET_MODE_FULL_DUPLEX,.speed = ENET_SPEED_100M,.rx_buf_size = 1536, // 大于最大帧长1518.tx_buf_size = 1536};ENET_Init(&enet_cfg);ENET_EnableIrq(ENET_IRQ_RX | ENET_IRQ_TX); // 使能收/发中断return err;
}
3. 收包流程实现(从硬件到业务层)
收包流程由 “中断服务程序(ISR)→ 收包队列 → 收包任务 → 业务任务” 组成,核心是快速响应中断、避免阻塞。
(1)收包中断服务程序(ISR)
ISR 仅负责将硬件数据搬运到内存并触发任务,不做复杂处理:
/*** @brief 以太网接收中断服务程序*/
void ENET_RX_IRQHandler(void) {OS_ERR err;EthFrame_t *frame;// 1. 清除中断标志(硬件必需步骤)ENET_ClearIrqFlag(ENET_IRQ_FLAG_RX);// 2. 从硬件FIFO读取帧(ENET_Recv返回true表示成功)uint8_t *rx_buf = ENET_GetRxBuffer(); // 获取预分配的接收缓冲区uint32_t rx_len = 0;if (!ENET_Recv(rx_buf, &rx_len)) {OSIntExit(); // 无有效帧,退出中断return;}// 3. 封装帧数据到EthFrame_t结构体(用动态内存避免栈溢出)frame = (EthFrame_t *)OSMemAllocISR(sizeof(EthFrame_t), &err);if (err != OS_ERR_NONE) {ENET_ReleaseRxBuffer(rx_buf); // 释放缓冲区OSIntExit();return;}frame->buf = rx_buf;frame->len = rx_len;frame->port = 0; // 单端口frame->is_dynamic = false; // 缓冲区为预分配,不释放// 4. 将帧送入收包队列(中断中必须用FromISR接口)OSQPostFromISR(&g_eth_rx_queue, (void *)frame, sizeof(EthFrame_t *),OS_OPT_POST_FIFO, &err);if (err == OS_ERR_NONE) {// 5. 释放收包信号量,唤醒收包任务OSSemPostFromISR(&g_eth_rx_sem, OS_OPT_POST_1, &err);} else {// 队列满:释放帧和缓冲区(避免内存泄漏)OSMemFreeISR(frame, &err);ENET_ReleaseRxBuffer(rx_buf);}// 6. 触发任务调度(切换到收包任务)OSIntExit();
}
(2)收包任务(处理接收数据并转发给业务层)
收包任务从队列取数据,完成初步处理(如校验、过滤)后,转发给业务任务:
/*** @brief 收包任务:处理接收帧并转发给业务层*/
void eth_rx_task(void *p_arg) {OS_ERR err;OS_MSG_SIZE msg_size;EthFrame_t *frame;while (1) {// 1. 阻塞等待收包信号量OSSemPend(&g_eth_rx_sem, 0, OS_OPT_PEND_BLOCKING, NULL, &err);if (err != OS_ERR_NONE) continue;// 2. 从收包队列取帧frame = (EthFrame_t *)OSQPostGet(&g_eth_rx_queue, &msg_size, &err);if (err != OS_ERR_NONE || frame == NULL) continue;// 3. 初步处理:校验帧长度、过滤无效帧(如广播帧按需处理)if (frame->len < 64 || frame->len > 1518) {ENET_ReleaseRxBuffer(frame->buf); // 释放预分配缓冲区continue;}// 4. 转发给业务任务(示例:发送到协议解析任务的队列,此处简化为直接调用)proto_parse_input(frame); // 业务层入口函数// 5. 检查队列剩余数据,有则再次触发信号量if (OSQEntries(&g_eth_rx_queue, &err) > 0) {OSSemPost(&g_eth_rx_sem, OS_OPT_POST_1, &err);}}
}
(3)业务任务(协议解析示例)
业务任务消费收包数据,实现具体功能(如解析 TCP/UDP 协议):
/*** @brief 协议解析任务:处理接收的以太网帧*/
void proto_parse_task(void *p_arg) {OS_ERR err;// (实际场景需创建业务队列,接收收包任务转发的数据)while (1) {// 模拟处理:从收包任务获取数据并解析// ...(协议解析逻辑,如提取IP地址、端口、payload)OSTimeDlyHMSM(0, 0, 0, 10, OS_OPT_TIME_PERIODIC, &err); // 周期处理}
}/*** @brief 收包任务向业务层输入数据的接口*/
void proto_parse_input(EthFrame_t *frame) {// 示例:打印帧信息(实际需根据协议解析)APP_TRACE_INFO(("Received frame: len=%d, first 4 bytes: 0x%02X%02X%02X%02X\n",frame->len, frame->buf[0], frame->buf[1], frame->buf[2], frame->buf[3]));// 处理完成后释放缓冲区(预分配模式下归还硬件)ENET_ReleaseRxBuffer(frame->buf);
}
4. 发包流程实现(从业务层到硬件)
发包流程由 “业务任务→发包队列→发包任务→硬件发送” 组成,核心是确保数据有序发送、避免硬件冲突。
(1)业务任务(生产待发送数据)
业务任务生成数据并封装为以太网帧,送入发包队列:
/*** @brief 业务任务示例:生成传感器数据并发送*/
void sensor_data_task(void *p_arg) {OS_ERR err;EthFrame_t *frame;while (1) {// 1. 生成数据(示例:2字节温度数据)uint8_t sensor_data[2] = {0x12, 0x34};// 2. 分配帧缓冲区(动态分配,发送后释放)frame = (EthFrame_t *)OSMemAlloc(sizeof(EthFrame_t), &err);if (err != OS_ERR_NONE) goto delay;frame->buf = (uint8_t *)OSMemAlloc(1536, &err); // 足够容纳最大帧if (err != OS_ERR_NONE) {OSMemFree(frame, &err);goto delay;}// 3. 封装以太网帧(MAC头+数据+FCS)frame->buf[0] = 0xAA; // 目的MAC[0]frame->buf[1] = 0xBB; // 目的MAC[1]frame->buf[2] = 0xCC; // 目的MAC[2]frame->buf[3] = 0xDD; // 目的MAC[3]frame->buf[4] = 0xEE; // 目的MAC[4]frame->buf[5] = 0xFF; // 目的MAC[5]frame->buf[6] = 0x11; // 源MAC[0](本地MAC)frame->buf[7] = 0x22; // 源MAC[1]frame->buf[8] = 0x33; // 源MAC[2]frame->buf[9] = 0x44; // 源MAC[3]frame->buf[10] = 0x55; // 源MAC[4]frame->buf[11] = 0x66; // 源MAC[5]frame->buf[12] = 0x08; // 类型字段:IP协议(0x0800)frame->buf[13] = 0x00;memcpy(&frame->buf[14], sensor_data, 2); // 数据载荷frame->len = 16; // MAC头14字节 + 数据2字节(省略FCS,硬件生成)frame->is_dynamic = true;// 4. 送入发包队列OSQPost(&g_eth_tx_queue, (void *)frame, sizeof(EthFrame_t *),OS_OPT_POST_FIFO, &err);if (err == OS_ERR_Q_FULL) {// 队列满:释放内存OSMemFree(frame->buf, &err);OSMemFree(frame, &err);APP_TRACE_WARN(("TX queue full, drop frame\n"));} else if (err == OS_ERR_NONE) {// 触发发包任务OSSemPost(&g_eth_tx_sem, OS_OPT_POST_1, &err);}delay:OSTimeDlyHMSM(0, 0, 1, 0, OS_OPT_TIME_PERIODIC, &err); // 1秒周期}
}
(2)发包任务(从队列取数据并发送)
发包任务负责检查硬件状态,将队列中的数据通过 MAC 发送:
/*** @brief 发包任务:从队列取帧并发送*/
void eth_tx_task(void *p_arg) {OS_ERR err;OS_MSG_SIZE msg_size;EthFrame_t *frame;while (1) {// 1. 阻塞等待发包信号量OSSemPend(&g_eth_tx_sem, 0, OS_OPT_PEND_BLOCKING, NULL, &err);if (err != OS_ERR_NONE) continue;// 2. 从发包队列取帧frame = (EthFrame_t *)OSQPostGet(&g_eth_tx_queue, &msg_size, &err);if (err != OS_ERR_NONE || frame == NULL) continue;// 3. 检查硬件状态,忙则重新入队if (!ENET_CheckTxReady()) {OSQPost(&g_eth_tx_queue, (void *)frame, sizeof(EthFrame_t *),OS_OPT_POST_FIFO, &err);OSTimeDlyHMSM(0, 0, 0, 5, OS_OPT_TIME_PERIODIC, &err); // 5ms后重试continue;}// 4. 调用硬件接口发送bool ret = ENET_Send(frame->buf, frame->len);if (!ret) {APP_TRACE_ERROR(("Send failed, len=%d\n", frame->len));}// 5. 释放动态分配的内存if (frame->is_dynamic) {OSMemFree(frame->buf, &err);OSMemFree(frame, &err);}// 6. 检查队列剩余数据,有则再次触发if (OSQEntries(&g_eth_tx_queue, &err) > 0) {OSSemPost(&g_eth_tx_sem, OS_OPT_POST_1, &err);}}
}
(3)发送完成中断(可选,优化发送效率)
若硬件支持发送完成中断,可用于释放缓冲区或处理重试逻辑:
/*** @brief 以太网发送完成中断(可选)*/
void ENET_TX_IRQHandler(void) {// 清除中断标志ENET_ClearIrqFlag(ENET_IRQ_FLAG_TX);// 可在此处理发送完成后的逻辑(如统计发送成功数、释放硬件缓冲区)
}
四、关键注意事项
- 优先级设计:
- 中断优先级 > 收包任务优先级 > 发包任务优先级 > 业务任务优先级,确保收包实时性(避免漏包)。
- 内存管理:
- 收包用预分配缓冲区(避免中断中动态分配失败),发包用动态分配(灵活适应不同长度),均需严格释放避免泄漏。
- 队列深度:
- 根据最大带宽计算:如 100Mbps 以太网每秒约 14880 帧(1518 字节),队列深度需至少容纳 100ms 数据(约 1500 帧),实际可根据内存调整。
- 错误处理:
- 队列满时需丢弃数据并记录日志(便于调试);发送失败时可重试有限次(如 3 次)后丢弃,避免死循环。