【线程邮箱】
线程邮箱(Mailbox) 是一种线程间通信机制(IPC, Inter-Process Communication),
用于在线程之间传递消息指针或数据引用。
本质上是一个消息缓冲区(通常存储消息指针)。
线程 A 发送消息到邮箱,线程 B 从邮箱接收消息。
邮箱提供同步机制:若邮箱为空,接收线程可阻塞等待;若满,发送线程可阻塞等待。
发送消息(send)
检查邮箱是否已满:
若未满 → 将消息存入缓冲区;
若满 → 根据模式(阻塞/非阻塞)决定等待或返回错误;
更新写指针;
通知可能等待接收的线程。
接收消息(recv)
检查邮箱是否为空:
若非空 → 取出消息;
若为空 → 阻塞等待消息到来;
更新读指针;
唤醒等待发送的线程。
每个线程在系统中注册后,都拥有:
一个独立的消息队列(
SeqQueue
)一个线程私有的互斥锁(
qmutex
)全局系统则由一个全局链表与全局互斥锁管理。
消息通过 SendMSG()
从发送线程的上下文入队到目标线程的消息队列中,
目标线程使用 RecvMSG()
从自己的队列中取出消息。
MBS(Message Bus System):消息系统核心,负责线程节点注册与查找。
typedef struct {struct list_head head; // 所有线程节点链表头pthread_mutex_t mutex; // 全局互斥锁 } MBS;
MBS_NODE:线程节点,包含线程名、ID、消息队列与队列锁。
🔹 每个线程节点都有自己的消息队列和互斥锁。
typedef struct {char th_name[64]; // 线程名称pthread_t tid; // 线程IDSeqQueue *sq; // 线程私有消息队列pthread_mutex_t qmutex; // 队列锁struct list_head node; // 链表节点 } MBS_NODE;
SeqQueue:顺序队列(先进先出),用于保存线程收到的消息。
DATATYPE:消息内容结构体,保存发送者、接收者与消息数据。
typedef struct {char sendname[64];char recvname[64];char buf[256]; } DATATYPE;
1️⃣ CreateMBS()
创建整个消息系统;
初始化链表与全局互斥锁;
返回 MBS*结构体指针。
🔒 使用
pthread_mutex_init()
初始化全局锁,保证线程注册与删除的同步。
2️⃣Register_TH(MBS *mbs, char *name, PFUN th)
分配一个线程节点结构;
初始化线程名与队列;
初始化节点队列锁;
将节点加入全局链表;
创建线程执行函数th()。
⚠️ 若线程创建失败,会回滚操作(删除节点、销毁锁、释放内存)。
3️⃣SendMBS(MBS *mbs, char recvname[], char buf[], DATATYPE *data)
线程间消息发送:
加全局锁,遍历链表找到发送者与目标线程;
填充消息结构
data
;锁定目标线程的队列;
入队后解锁。
关键设计点:
使用双锁机制:
全局锁保护链表;
目标队列锁保护队列入队;
先加队列锁,再解全局锁,避免死锁。
4️⃣ RecvMSG(MBS *mbs, DATATYPE *data)
线程接收消息:
加全局锁找到当前线程节点;
加队列锁并释放全局锁;
若队列非空,取出消息;
若为空返回失败。
🔄 避免全局锁长时间占用,提升并发效率。
5️⃣wait_all_end(MBS *mbs)
遍历线程节点,调用pthread_join() 等待所有线程退出。
6️⃣get_name_bytid(MBS *mbs)
根据当前线程 ID 查找线程名称(带全局锁保护)。