线程邮箱(1)
线程邮箱
1.概念
- 线程邮箱本质是多线程任务中进行消息传递的编程模型
- 原理:每个线程拥有一个专属的消息队列,即邮箱。其他线程通过向该线程投递消息与他通信。线程从自己的邮箱中拿取信息来处理消息
2.优势
- 避免共享状态下产生的数据竞争和繁复的加锁
- 发送者只需要知道对方的名字即可发送消息,不用关心对方何时处理
3.使用方式
1.定义要使用的结构体
/*这是一个mail box system,里面定义一个链表的头节点和一个用于同步的互斥锁*/
typedef struct{pthread_mutex_t mutex;struct list_head head;
}MBS;
- 定义这把锁的意义:在程序运行时,可能会同时出现向链表中插入节点和遍历链表寻找目标节点两个任务,在无锁的情况下,插入与遍历会出现数据竞争
typedef void *(*PFUN)(void *);//定义一个线程
/*定义一个链表以及所属队列信息的结构体,包含链表的名字,链表代表线程的pid,函数指针,指向队列的指针,一个链表节点*/
typedef struct{char name[50];pthread_t tid;PFUN th;SeqQueue *qe;struct list_head node;
}MBS_NODE;
2.定义相关函数
-
创建一个邮箱系统
MBS *mbs_create(){MBS *m = NULL;m = malloc(sizeof(MBS));if(m == NULL){perror("error to create");return NULL;}INIT_LIST_HEAD(&m->head);pthread_mutex_init(&m->mutex,NULL);return m; }
- 先申请一个邮箱系统所需要的空间,利用内核链表初始化头节点,将链表的next和prev都指向自身,从而将链表设置为空链表,再将互斥锁初始化
-
注册一个线程
int register_th(MBS *mbs,char *name,PFUN th){MBS_NODE *newnode = malloc(sizeof(MBS_NODE));if(newnode == NULL){perror("error to register");return 1;}strcpy(newnode->name,name);newnode->qe = CreateSeqQueue(100);pthread_mutex_lock(&mbs->mutex);list_add(&newnode->node,&mbs->head);pthread_mutex_unlock(&mbs->mutex);pthread_create(&newnode->tid,NULL,th,NULL);return 0; }
- 先申请一个链表节点大小的空间,定义节点的名字,将队列大小初始化
- 加锁,再将要添加的节点加入到链表中去,将锁释放掉,这样保证在添加链表节点时不会被其他任务占用
- 创建线程
-
等待所有线程结束
int wait_all_end(MBS *mbs){MBS_NODE *p,*n;list_for_each_entry_safe(p,n,&mbs->head,node){pthread_join(p->tid,NULL);}return 0; }
- 遍历链表,结束所有线程
-
获得线程的pid
char* get_name_by_tid(MBS *mbs){pthread_t tid = pthread_self();MBS_NODE *p,*n;list_for_each_entry_safe(p,n,&mbs->head,node){if(tid == p->tid){return p->name;}}
- 遍历链表,获得和传入的pid相同的线程的名字
-
发送数据
int send_msg(MBS *msg,char *recvname,char *buf,DATATYPE *data){MBS_NODE *p,*n;char *sendname = get_name_by_tid(msg);strcpy(data->sendname,sendname);strcpy(data->recvname,recvname);strcpy(data->buf,buf);pthread_mutex_lock(&msg->mutex);list_for_each_entry_safe(p,n,&msg->head,node){if(strcmp(data->recvname,p->name) == 0){EnterSeqQueue(p->qe,data);pthread_mutex_unlock(&msg->mutex);return 0;}}pthread_mutex_unlock(&msg->mutex);printf("warning:recvname:%s not exist\n",data->recvname);return 1; }
- 传入接收方的名字和数据
- 利用函数获得发送方的名字
- 加锁,遍历链表,如果有链表名字和传入的名字相同就将数据入队到链表下的队列中,释放锁资源
- 如果没有找到相关节点,释放锁资源并打印警告
-
接收数据
int recv_msg(MBS *mbs,DATATYPE *data){MBS_NODE *p,*n;pthread_mutex_lock(&mbs->mutex);list_for_each_entry_safe(p,n,&mbs->head,node){if(p->tid == pthread_self()){if(!IsEmptySeqQueue(p->qe)){memcpy(data,GetHeadSeqQueue(p->qe),sizeof(DATATYPE));QuitSeqQueue(p->qe);pthread_mutex_unlock(&mbs->mutex);return 0;}}}pthread_mutex_unlock(&mbs->mutex);return 1; }
- 加锁,遍历链表,如果找到节点pid是本线程的pid,并且队列不为空,那么将数据赋值给data中,并将数据出队,释放锁资源
- 如果没有找到,释放锁资源