线程间-数据缓存机制(线程邮箱)
1、线程邮箱概念
2、线程邮箱作用
3、详细代码
3.1 数据结构定义(小-大)
(1)
typedef void*(*the_fun)(void* arg);
typedef char DATATYPE[256];
(2)定义邮件数据结构,包含发送者和接收者信息以及消息内容
typedef struct mail_data
{pthread_t id_send;char name_send[256];pthread_t id_recver;char name_recver[256];DATATYPE data;
}MAIL_DATA;
(3)定义队列节点结构,用于构建邮件队列
typedef struct Queue
{MAIL_DATA data;struct Queue* next;
}SeqQueue;
(4) 定义线程节点结构,包含线程信息和邮件队列
typedef struct thread_node
{pthread_t id;char name[256];SeqQueue *quehead ,*quetail;// 邮件队列头尾指针the_fun th;
}LIST_DATA;
(5)定义链表节点结构,用于管理所有线程
typedef struct node
{LIST_DATA elem;struct node *next;
}LINKLIST;
(6)声明链表操作函数:初始化、遍历查找和添加节点
void list_add( LINKLIST*head, LINKLIST*info);
LINKLIST *list_for_each(LINKLIST *head,char *name);
3.2 链表:用于管理线程
(1)初始化链表,分配内存并设置next为NULL
LIST_LINK * list_init()
{LIST_LINK *temp = malloc(sizeof(LIST_LINK));temp->next = NULL;return temp;
}
(2) 在链表头部添加新节点
void list_add( LINKLIST*head, LINKLIST*info)
{info->next=head->next;head->next=info;
}
(3) 遍历链表查找指定名称的节点,使用strncmp比较名称
LINKLIST *list_for_each(LINKLIST *head,char *name)
{LINKLIST *tmp=NULL;tmp=head;while(tmp!=NULL){if(strncmp(tmp->elem.name,name,strlen(name))==0){return tmp;}tmp=tmp->next;}return NULL;
}
3.3 队列:每个线程有自己的消息队列
(1)queue.h队列声明
#ifndef __SEQQUE_H__
#define __SEQQUE_H__
#include "list.h"int init_que(LINKLIST *list_head);
int in_queue(LINKLIST *list_head,MAIL_DATA *data);
int out_queue(LINKLIST *list_head,MAIL_DATA *data);#endif
(2)初始化队列,创建空节点并设置头尾指针
int init_que(LINKLIST *list_head)
{SeqQueue *sq=malloc(sizeof(SeqQueue));if(NULL==sq){perror("malloc error");}sq->next=NULL;list_head->elem.quehead=sq;list_head->elem.quetail=sq;return 0;
}
(3) 入队操作,分配新节点并复制数据,更新尾指针
int in_queue(LINKLIST *list_head,MAIL_DATA *data)
{SeqQueue *sq=malloc(sizeof(SeqQueue));memcpy(&sq->data,data,sizeof(MAIL_DATA));sq->next=NULL;list_head->elem.quetail->next=sq;list_head->elem.quetail=list_head->elem.quetail->next;return 0;
}
3.4 主函数
(1)定义进入和退出临界区的宏,使用互斥锁保护共享资源
全局变量,指向链表末尾
#define ENTER_CRITICAL_AREA(mutex) do{pthread_mutex_lock(mutex);}while(0)
#define QUIT_CRITICAL_AREA(mutex) do{pthread_mutex_unlock(mutex);}while(0)LINKLIST *end_list=NULL;
(2)定义邮件系统结构体和全局实例
typedef struct mail_box_system {pthread_mutex_t mutex; // 保护邮件系统的互斥锁LIST_LINK *thread_list; // 线程链表
}MBS;
MBS* mbs;
(3)创建邮件系统,初始化互斥锁和线程链表
MBS* create_mail_box_system(void)
{int ret=0;MBS *tmp=malloc(sizeof(MBS));if(NULL==tmp){perror("fail mail");return NULL;}//初始化互斥锁ret = pthread_mutex_init(&tmp->mutex, NULL);if (0 != ret){perror("fail to pthread_mutex_init");return NULL;}//创建线程链表tmp->thread_list = malloc(sizeof(LINKLIST));tmp->thread_list->next = NULL;printf("mail box create successfully!\n");return tmp;
}
(4)销毁邮件系统,释放所有资源
int destroy_mail_box_system(MBS *mbs)
{pthread_mutex_destroy(&mbs->mutex);LINKLIST *ptmp = NULL;LINKLIST *find = mbs->thread_list;while (find != NULL){ptmp = find;find = find->next;free(ptmp);}free(find);return 0;
}
(5)数据收集线程,定期发送消息给show和sock线程
void *data_collect_th(void *arg)
{while (1){printf("this is the show th\n");sleep(3);send_msg(mbs, "show", "aabb");send_msg(mbs, "show", "1111");send_msg(mbs, "show", "2222");send_msg(mbs, "sock", "3333");send_msg(mbs, "sock", "4444");send_msg(mbs, "sock", "5555");}return NULL;
}
(6)显示线程,接收并打印消息
void *show_th(void *arg)
{while(1){printf("this is the show th\n");char sendname[256];DATATYPE data;recv_msg(mbs, sendname, data);printf("show recv msg from %s msg is %s\n", sendname, data);sleep(1);}return NULL;
}
(7) 获取当前线程名称
//通过线程tid找到相应的名字
char *get_th_name(MBS *mbs)
{pthread_t tid = pthread_self();LINKLIST *find = mbs->thread_list;LINKLIST *end = end_list; //全局变量初始为空while (find != end){if (find->elem.id == tid){break;}find = find->next;}if (find->elem.id == tid){return find->elem.name;}else{return NULL;}
}
(8)注册线程到邮件系统,创建线程并初始化队列
int register_to_mail_system(MBS *mbs, char *name, the_fun th)
{LINKLIST *tmp = malloc(sizeof(LINKLIST));if (NULL == tmp){perror("fail to register");return -1;}strcpy(tmp->elem.name, name);tmp->elem.th = th;init_que(tmp); //创建邮箱需要的队列list_add(mbs->thread_list, tmp); //将注册的邮箱插入链表int ret = pthread_create(&tmp->elem.id, NULL, th, NULL);if (0 != ret){perror("fail to pthread_create");return -1;}printf("register mail system |%s| ok !\n", tmp->elem.name);return 0;
}
(9)发送消息,查找接收者并将消息加入其队列
int send_msg(MBS *mbs, char *recvname, DATATYPE data)
{MAIL_DATA *ptmp = malloc(sizeof(MAIL_DATA));strcpy(ptmp->data, data);//获取线程的tidptmp->id_send = pthread_self();//链表中遍历查找接收方名字LINKLIST *find = list_for_each(mbs->thread_list, recvname); if (NULL == find){printf("can't find recv mailbox\n");}//查找发送方的名字并保存char *name = get_th_name(mbs);strcpy(ptmp->name_send, name);strcpy(ptmp->name_recver, recvname);//入队上锁防止此时被资源竞争ENTER_CRITICAL_AREA(&mbs->mutex);in_queue(find, ptmp);QUIT_CRITICAL_AREA(&mbs->mutex);return 0;
}
(10)接收消息,从当前线程的队列中获取消息
//接收消息
int recv_msg(MBS *mbs, char *sendname, DATATYPE data)
{MAIL_DATA *ptmp = malloc(sizeof(MAIL_DATA));pthread_t tid = pthread_self();//链表中遍历查找自身tid(找到自己所在的地方)LINKLIST *find = mbs->thread_list->next;while (find != NULL){if (find->elem.id == tid){break;}find = find->next;}if (find != NULL && find->elem.id == tid) //如果找到了{while (1){//出队拿数据if (find->elem.quehead != find->elem.quetail){ENTER_CRITICAL_AREA(&mbs->mutex);out_queue(find, ptmp);QUIT_CRITICAL_AREA(&mbs->mutex);break;}}}strcpy(sendname, ptmp->name_send);strcpy(data, ptmp->data);free(ptmp);return 0;
}
(11)等待并销毁邮件系统中所有注册的线程。
//遍历pthread_join销毁线程
int wait_all_end(MBS *mbs)
{LINKLIST *find = mbs->thread_list->next;//MAILBOX *end = end_list;//while (find != end)while (find->next != NULL){pthread_join(find->elem.id, NULL);find = find->next;}pthread_join(find->elem.id, NULL);return 0;
}
(12)主函数,创建邮件系统并注册线程
int main(void)
{//MBS *mbs = NULL;mbs = create_mail_box_system();//printf("mbs = %p", mbs);register_to_mail_system(mbs, "show", show_th);register_to_mail_system(mbs, "sock", sock_th);register_to_mail_system(mbs, "data", data_collect_th);wait_all_end(mbs);destroy_mail_box_system(mbs);printf("Hello World!");return 0;}