当前位置: 首页 > news >正文

Linux线程同步(四)

引入

线程同步:让所有的线程访问临界资源具有了一定的顺序性。
条件变量 —— 实现线程间同步的

理解条件变量

条件变量=通知+队列

快速认识接口

pthread_cond_init 函数

用于初始化条件变量(condition variable)的函数。

#include <pthread.h>int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参数说明
cond:指向要初始化的条件变量(pthread_cond_t 类型)的指针。
attr:指向条件变量属性(pthread_condattr_t 类型)的指针,用于设置条件变量的属性(如进程共享性等)。若为 NULL,则使用默认属性。
返回值
成功时返回 0。
失败时返回非零错误码

pthread_cond_destroy 函数

用于销毁条件变量(condition variable)的函数,用于释放条件变量所占用的系统资源。

#include <pthread.h>int pthread_cond_destroy(pthread_cond_t *cond);

参数说明
cond:指向要销毁的条件变量(pthread_cond_t 类型)的指针,该条件变量必须是已通过 pthread_cond_init 初始化或静态初始化为 PTHREAD_COND_INITIALIZER 的变量。
返回值
成功时返回 0。
失败时返回非零错误码

pthread_cond_wait 函数

用于线程等待条件变量的函数,它让线程阻塞等待某个条件满足,同时释放关联的互斥锁

#include <pthread.h>int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数说明
cond:指向条件变量(pthread_cond_t 类型)的指针,线程将等待该条件变量的通知。
mutex:指向与条件变量关联的互斥锁(pthread_mutex_t 类型),调用前必须已由当前线程锁定。
返回值
成功时返回 0(通常是被 pthread_cond_signal 或 pthread_cond_broadcast 唤醒)。
失败时返回非零错误码(EINVAL:条件变量或互斥锁未初始化)。

pthread_cond_signal 函数

唤醒一个线程
用于唤醒等待条件变量的线程的函数,它可以触发一个阻塞在 pthread_cond_wait 或 pthread_cond_timedwait 上的线程继续执行

#include <pthread.h>int pthread_cond_signal(pthread_cond_t *cond);

参数说明
cond:指向条件变量(pthread_cond_t 类型)的指针,用于唤醒等待该条件变量的线程。
返回值
成功时返回 0。
失败时返回非零错误码(EINVAL:条件变量未初始化)。

pthread_cond_broadcast 函数

用于唤醒所有等待某个条件变量的线程的函数,与 pthread_cond_signal(唤醒至少一个线程)不同,它会触发所有阻塞在 pthread_cond_wait 或 pthread_cond_timedwait 上的线程,适用于需要多个线程响应同一条件变化的场景。

#include <pthread.h>int pthread_cond_broadcast(pthread_cond_t *cond);

参数说明
cond:指向条件变量(pthread_cond_t 类型)的指针,用于唤醒所有等待该条件变量的线程。
返回值
成功时返回 0。
失败时返回非零错误码(EINVAL:条件变量未初始化)。

pthread_cond_t 类型

用于表示条件变量(condition variable) 的数据类型

初始化

静态初始化:全局或静态变量可直接赋值 PTHREAD_COND_INITIALIZER,无需调用 pthread_cond_init。

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

动态初始化:通过 pthread_cond_init 函数初始化。

pthread_cond_t cond;
pthread_cond_init(&cond, NULL);  // 第二个参数为NULL表示使用默认属性

使用

与条件变量相关的核心操作函数:
pthread_cond_wait:线程阻塞等待条件变量,同时释放关联的互斥锁。
pthread_cond_signal:唤醒至少一个等待该条件变量的线程。
pthread_cond_broadcast:唤醒所有等待该条件变量的线程。

销毁

不再使用时需通过 pthread_cond_destroy 释放资源。

一个测试代码–验证线程同步机制

一个线程控制一批现场

#include <iostream>
#include <vector>
#include <unistd.h>
#include <string>
#include <pthread.h>
#include <mutex>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//互斥锁
void* SlaverCore(void *args)
{std::string name = static_cast<const char*>(args);while(true){//1.加锁pthread_mutex_lock(&mutex);//2.一般条件变量是在加锁和解锁之间使用的pthread_cond_wait(&cond, &mutex);//等待条件变量满足,自动解锁mutex,阻塞等待(锁是用来被释放的)std::cout << "当前被叫醒的线程是:" << name << std::endl;//2.解锁pthread_mutex_unlock(&mutex);sleep(1);}
}
void *MasterCore(void *args)
{sleep(3);std::cout << "Master 线程准备唤醒其他线程..." << std::endl;std::string name = static_cast<const char*>(args);while(true){// std::cout << name << " is running..." << std::endl;// sleep(1);//pthread_cond_signal(&cond);//唤醒一个等待该条件变量的线程pthread_cond_broadcast(&cond);//唤醒所有等待该条件变量的线程sleep(1);std::cout << name << "Master 唤醒其中一个线程... " << std::endl;}
}
void StartMaster(std::vector<pthread_t> *tidsptr)//创建线程,并加入到线程容器中
{pthread_t tid;int n = pthread_create(&tid, nullptr, MasterCore, (void*)"Master Thread");if(n==0){std::cout << "create master success" << std::endl;}tidsptr->emplace_back(tid);
}
void StartSlaver(std::vector<pthread_t> *tidsptr, int threadnum=3)
{for(int i = 0; i < threadnum; i++){char *name = new char[64];snprintf(name, 64, "Slaver Thread-%d", i+1);pthread_t tid;int n = pthread_create(&tid, nullptr, SlaverCore, (void*)name);if(n==0){pthread_mutex_lock(&mutex);std::cout << "create " << name << " success" << std::endl;pthread_mutex_unlock(&mutex);tidsptr->emplace_back(tid);}}
}
void WaitThread(std::vector<pthread_t> &tids)
{for(auto &tid : tids){pthread_join(tid, nullptr);}
}
int main()
{std::vector<pthread_t> tids;StartMaster(&tids);StartSlaver(&tids, 5);WaitThread(tids);return 0;
}

生产消费模型

生产消费模型讨论的是如何并发地进行数据的传递的问题。

在这里插入图片描述

交易平台上的产品(数据):共享资源 – 临界资源
生产者,消费者:多个线程。
交易平台:(数据交易的场所)临时保存数据的内存空间——某种数据结构对象。
交易平台本质是商品的缓冲。

生产消费模型中的概念
3种关系
生产者与生产者—互斥(||同步)
消费者与消费者—互斥(||同步)
生产者与消费者—互斥&&同步
2种角色:生产者与消费者
1个交易场所

生产消费者模型的优势。
生产消费模型,可以提供比较好的并发度。
生产和消费数据,进行解耦。
支持忙闲不均。

阻塞队列(BlockingQueue)

在多线程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通队列区别在于:

  • 当队列为空时,从队列获取元素的操作会阻塞,直到队列中被放入了元素;
  • 当队列满时,往队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出

以上操作都是基于不同线程来说的,线程在对阻塞队列的进程操作时会被阻塞。
在这里插入图片描述

#ifndef __BLOCKING_QUEUE_HPP__
#define __BLOCKING_QUEUE_HPP__
#include<iostream>
#include<queue>
#include<string>
#include<pthread.h>template<typename T>
class BlockingQueue
{
private:bool IsFull(){return _block_queue.size() >= _capacity;}bool IsEmpty(){return _block_queue.empty();}
public:BlockingQueue(int cap){_capacity = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond_produce, nullptr);pthread_cond_init(&_cond_consume, nullptr);}void Push(const T& in)//生产者用接口{pthread_mutex_lock(&_mutex);if(IsFull())//bug:防止伪唤醒,这里应该是while{//生产者线程去等待,是在临界区中休眠的,现在还持有锁!//1.pthread_cond_wait调用是:a.让调用进程等待 b.自动释放曾经持有的_mutex锁 c.当被唤醒后,重新获得_mutex锁pthread_cond_wait(&_cond_produce, &_mutex);}//进行生产_block_queue.push(in);//通知消费者线程可以消费了pthread_cond_signal(&_cond_consume);pthread_mutex_unlock(&_mutex);}void Pop(T* out)//消费者用接口{pthread_mutex_lock(&_mutex);if(IsEmpty())//bug:防止伪唤醒,这里应该是while{//消费者线程去等待,是在临界区中休眠的,现在还持有锁!//1.pthread_cond_wait调用是:a.让调用进程等待 b.自动释放曾经持有的_mutex锁 c.当被唤醒后,重新获得_mutex锁pthread_cond_wait(&_cond_consume, &_mutex);}//进行消费*out = _block_queue.front();_block_queue.pop();//通知生产者线程可以生产了pthread_cond_signal(&_cond_produce);pthread_mutex_unlock(&_mutex);}~BlockingQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond_produce);pthread_cond_destroy(&_cond_consume);}
private:std::queue<T> _block_queue;int _capacity;//总上限pthread_mutex_t _mutex;//保护_block_queue的互斥锁pthread_cond_t _cond_produce;//生产者条件变量pthread_cond_t _cond_consume;//消费者条件变量
};#endif // __BLOCKING_QUEUE_HPP__

伪唤醒

阻塞队列中会有两处bug。

以生产者为例
这里如果阻塞队列满了,通过if判断,会进行pthread_cond_wait生产者队列等待,直到消费者进行消费,唤醒_cond_produce条件变量队列,但是有可能出现其他意外情况,导致这里生产者线程等待结束,然后继续执行代码,向阻塞队列中生产数据,导致出错(阻塞队列是满的)。

void Push(const T& in)//生产者用接口
{pthread_mutex_lock(&_mutex);if(IsFull())//bug:防止伪唤醒,这里应该是while{pthread_cond_wait(&_cond_produce, &_mutex);}_block_queue.push(in);pthread_cond_signal(&_cond_consume);pthread_mutex_unlock(&_mutex);
}

同理,以消费者为例
这里如果阻塞队列空了,通过 if 判断,会进行pthread_cond_wait消费者队列等待,直到生产者进行生产,唤醒_cond_consume 条件变量队列,但是有可能出现其他意外情况,导致这里消费者线程等待结束,然后继续执行代码,向阻塞队列中拿取数据,导致出错(阻塞队列是空的)。

void Pop(T* out)//消费者用接口
{pthread_mutex_lock(&_mutex);if(IsEmpty())//bug:防止伪唤醒,这里应该是while{pthread_cond_wait(&_cond_consume, &_mutex);}*out = _block_queue.front();_block_queue.pop();pthread_cond_signal(&_cond_produce);pthread_mutex_unlock(&_mutex);
}

伪唤醒概念
伪唤醒指的是线程在没有被其他线程显式唤醒(调用 pthread_cond_signalpthread_cond_broadcast)的情况下,从 pthread_cond_wait 中返回。

操作系统可能为了简化实现或提高性能,在某些情况下(如信号中断、资源竞争等)让等待线程提前返回。因此,不能假设线程被唤醒就一定是因为其他线程调用了 pthread_cond_signalpthread_cond_broadcast
并不是一定会发生伪唤醒,但是避免这种情况可以增加代码的健壮性。

通用解决方法:始终用 while (条件不满足) 包裹 pthread_cond_wait,而非 if。这

以下再举两个例子

例子1pthread_cond_signal 与伪唤醒
场景:多个线程等待 “数据就绪” 信号,主线程通过 signal 唤醒一个线程处理数据。但可能因伪唤醒导致线程在无数据时被唤醒。

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int data = 0; // 共享数据(0表示无数据,1表示有数据)void *worker(void *arg) {int id = *(int *)arg;//收到id1,id2while (1) {pthread_mutex_lock(&mutex);//加锁// 错误:用if判断,伪唤醒时会错误执行if (data == 0) {  //这时主线程还在休眠,并没有将data——>1,线程都进入printf("线程%d:等待数据...\n", id);pthread_cond_wait(&cond, &mutex); //两个线程走到这里都进条件变量队列等待}//被唤醒第一个线程是有数据的(data=1),但是之后的数据是没有数据的(因为data被重置)printf("线程%d:处理数据(data=%d)\n", id, data);data = 0; // 重置数据pthread_mutex_unlock(&mutex);//解锁sleep(1);}return NULL;
}int main() {pthread_t t1, t2;//两个线程int id1 = 1, id2 = 2;pthread_create(&t1, NULL, worker, &id1);pthread_create(&t2, NULL, worker, &id2);// 主线程:3秒后发送一次数据,为了让线程通过“无数据”的判断,从而进行等待sleep(3);pthread_mutex_lock(&mutex);//加锁data = 1;printf("主线程:发送数据,唤醒一个线程\n");pthread_cond_signal(&cond); // 唤醒一个线程pthread_mutex_unlock(&mutex);//解锁sleep(10); // 等待子线程执行return 0;
}

主线程只发了一次 data=1,也只调用了一次 pthread_cond_signal正常情况下,被唤醒的那一个线程确实能拿到数据,不会出现 “唤醒后无数据”。
但需要注意的是 “伪唤醒” 的核心风险,恰恰是 “不依赖显式的 signal/broadcast 也可能唤醒”—— 哪怕你只调用了一次 signal,操作系统也可能因为内核实现细节(比如信号中断、调度优化),让其他等待的线程(或已被唤醒的线程)额外触发一次 “无理由的唤醒”,这才是需要用 while 循环防御的关键。

在上面例子中我们有两个子线程 A 和 B,都在等 data=1,主线程只做一次 data=1signal

  1. 主线程执行 data=1,调用 pthread_cond_signal,唤醒了线程 A。
  2. 线程 A 拿到锁,检查 data=1(此时用 if 判断会直接执行),处理数据后把 data 重置为 0,释放锁。
  3. 此时如果发生伪唤醒:操作系统因为某种原因,让线程 B 从 pthread_cond_wait 中返回没有任何线程调用 signal)。
  4. 线程 B 拿到锁后,因为用的是 if 判断(没重新检查 data),会直接执行 “处理数据” 的逻辑,但此时 data 已经是 0 了,打印出来的数据也是0 —— 就出现了 “唤醒后无数据” 的错误。

解决方法(循环检查条件)
if (data == 0) 改为 while (data == 0),确保被唤醒后重新检查条件:

// 用while循环防止伪唤醒
while (data == 0) {printf("线程%d:等待数据...\n", id);pthread_cond_wait(&cond, &mutex);
}

即使发生伪唤醒,while 循环会重新检查 data,发现仍为 0 时继续等待,避免错误执行。

例子2pthread_cond_broadcast 与伪唤醒
场景:多个线程等待 “任务通知”,主线程通过 broadcast 唤醒所有线程,但只有部分线程有任务可执行。伪唤醒会导致无任务的线程错误执行。

有问题的代码(未处理伪唤醒)

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int task_count = 0; // 任务数量void *worker(void *arg) {int id = *(int *)arg;while (1) {pthread_mutex_lock(&mutex);// 错误:用if判断,伪唤醒时会错误执行if (task_count == 0) { //先运行的2个子线程就可以跳过判断,还有一个进入等待printf("线程%d:等待任务...\n", id);pthread_cond_wait(&cond, &mutex); // 可能伪唤醒,3s后广播由于先运行的任务已经执行完毕,等待的任务唤醒后非法执行任务(task_count=-1)}//先运行的2个子线程就可以跳过判断,过来执行任务,执行完后task_count--;printf("线程%d:处理任务(剩余任务:%d)\n", id, task_count);pthread_mutex_unlock(&mutex);sleep(1);}return NULL;
}int main() {pthread_t t1, t2, t3;int id1=1, id2=2, id3=3;pthread_create(&t1, NULL, worker, &id1);pthread_create(&t2, NULL, worker, &id2);pthread_create(&t3, NULL, worker, &id3);// 主线程:添加2个任务,唤醒所有线程sleep(3);pthread_mutex_lock(&mutex);task_count = 2;//任务数printf("主线程:添加2个任务,广播唤醒所有线程\n");pthread_cond_broadcast(&cond); // 唤醒所有等待线程pthread_mutex_unlock(&mutex);sleep(10);return 0;
}

问题分析

  • 正常情况下,broadcast 唤醒 3 个线程,但只有 2 个任务,其中 1 个线程会因 task_count=0 正确等待。
  • 但如果发生伪唤醒,无任务的线程会从 pthread_cond_wait 返回,错误执行 task_count–(导致 task_count 变为负数)。

解决方法(循环检查条件)
if (task_count == 0) 改为 while (task_count == 0)

// 正确:用while循环防止伪唤醒
while (task_count == 0) {printf("线程%d:等待任务...\n", id);pthread_cond_wait(&cond, &mutex);
}

即使被 broadcast 唤醒或伪唤醒,线程会重新检查 task_count,只有当 task_count > 0 时才执行任务,避免错误。

单生产者单消费者模型

先简化一下模型,不用考虑生产者与生产者关系,与消费者与消费者的关系。

#include "BlockingQueue.hpp"
#include "Thread.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
void Consumer(BlockingQueue<int>* data)
{while(true){sleep(1);int data_out;data->Pop(&data_out);std::cout << "consume data: " << data_out << std::endl;    }}
void Producer(BlockingQueue<int>* data)
{int cnt = 0;while(true){//sleep(1);data->Push(cnt);std::cout << "produce data: " << cnt << std::endl;cnt++;}
}
void StartComm(std::vector<Thread<BlockingQueue<int>*>>* threads, int num, BlockingQueue<int>* bq, func_t<BlockingQueue<int>*> func)
{for(int i = 0; i<num; i++){std::string threadname = "comm-" + std::to_string(i+1);//Thread<int> t(Consumer, 10, threadname);threads->emplace_back(func, bq, threadname);threads->back().start();}
}
void StartConsumer(std::vector<Thread<BlockingQueue<int>*>>* threads, int num, BlockingQueue<int>* bq)
{StartComm(threads, num, bq, Consumer);
}void StartProducer(std::vector<Thread<BlockingQueue<int>*>>* threads, int num, BlockingQueue<int>* bq)
{StartComm(threads, num, bq, Producer);
}void WaitAllThread(std::vector<Thread<BlockingQueue<int>*>>& threads)
{for(auto& t : threads){t.join();}
}
int main()
{BlockingQueue<int> *bq = new BlockingQueue<int>(5);std::vector<Thread<BlockingQueue<int>*>> threads;StartConsumer(&threads,1,bq);//一个消费者StartProducer(&threads,1,bq);//一个生产者WaitAllThread(threads);delete bq;return 0;
}

生产者与消费者接口

void Push(const T& in)//生产者用接口
{pthread_mutex_lock(&_mutex);if(IsFull())//临界区//bug{//生产者线程去等待,是在临界区中休眠的,现在还持有锁!//1.pthread_cond_wait调用是:a.让调用进程等待 b.自动释放曾经持有的_mutex锁 c.当被唤醒后,重新获得_mutex锁pthread_cond_wait(&_cond_produce, &_mutex);}//进行生产_block_queue.push(in);//通知消费者线程可以消费了pthread_cond_signal(&_cond_consume);pthread_mutex_unlock(&_mutex);
}

情况1生产者每隔1s生产一次,消费者一直消费
无论怎样都是先生产一条在消费一条

void Consumer(BlockingQueue<int>* data)
{while(true){//sleep(1);int data_out;data->Pop(&data_out);std::cout << "consume data: " << data_out << std::endl;    }}
void Producer(BlockingQueue<int>* data)
{int cnt = 0;while(true){sleep(1);data->Push(cnt);std::cout << "produce data: " << cnt << std::endl;cnt++;}
}

无论一开始是哪一方执行,都会有如下结果。生产者每一秒生成一个,填充到队列中,消费者立即消费,之后进行阻塞等待生产。若消费者先消费,此时队列中无数据,消费者阻塞,直到生产者生产数据放入队列,所以顺序一定是先生产一个后消费一个交替进行
在这里插入图片描述

情况2消费者每隔1s执行一次,生产者一直生产

void Consumer(BlockingQueue<int>* data)
{while(true){sleep(1);int data_out;data->Pop(&data_out);std::cout << "consume data: " << data_out << std::endl;    }}
void Producer(BlockingQueue<int>* data)
{int cnt = 0;while(true){//sleep(1);data->Push(cnt);std::cout << "produce data: " << cnt << std::endl;cnt++;}
}

生产者迅速填满队列,然后进行阻塞,1s后消费者进行pop数据,只要阻塞队列中被消费者“使用”(pop)数据,生产者就会立即填补,结果如下。
在这里插入图片描述

多生产者多消费者模型

只需要在StartConsumer函数与StartProducer函数中,更改线程数量即可

int main()
{BlockingQueue<int> *bq = new BlockingQueue<int>(5);std::vector<Thread<BlockingQueue<int>*>> threads;StartConsumer(&threads,2,bq);//2个消费者StartProducer(&threads,3,bq);//3个生产者WaitAllThread(threads);delete bq;return 0;
}

补充:打印当前线程名

通过定义一个新的类的方式打印当前线程名——比较冗余

#include "BlockingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
using blockingqueue_t = BlockingQueue<Task>;class ThreadData
{
public:ThreadData(): _bq(nullptr), _who("none-name"){};ThreadData(blockingqueue_t* bq, const std::string& who): _bq(bq), _who(who){}blockingqueue_t* GetBQ(){return _bq;}const std::string& GetWho() const{return _who;}   
private:blockingqueue_t* _bq;std::string _who = "none-name";
};void Consumer(ThreadData* data)
{while(true){   // 1.从阻塞队列中获取任务Task task;data->GetBQ()->Pop(&task);// 2.执行任务task.Execute();std::cout << "[" << data->GetWho() << "] consume data: " << task.ResultToString() << std::endl;}
}
void Producer(ThreadData* data)
{srand(time(nullptr)^pthread_self());//随机数int cnt = 0;while(true){// 1.获取任务int a = rand()%10 + 1;usleep(1000);int b = rand()%20 + 1;Task task(a, b);// 2.把获取的任务放入阻塞队列data->GetBQ()->Push(task);std::cout << "[" << data->GetWho() << "] produce data: " << task.DebugToString() << std::endl;}
}
void StartComm(std::vector<Thread<ThreadData*>>* threads, int num, blockingqueue_t* bq, func_t<ThreadData*> func, const std::string& prefix)
{for(int i = 0; i<num; i++){std::string threadname = prefix + std::to_string(i+1);//创建ThreadData对象,传递给线程函数ThreadData* td = new ThreadData(bq, threadname);threads->emplace_back(func, td, threadname);threads->back().start();}
}
void StartConsumer(std::vector<Thread<ThreadData*>>* threads, int num, blockingqueue_t* bq)
{StartComm(threads, num, bq, Consumer, "consumer-");
}void StartProducer(std::vector<Thread<ThreadData*>>* threads, int num, blockingqueue_t* bq)
{StartComm(threads, num, bq, Producer, "producer-");
}void WaitAllThread(std::vector<Thread<ThreadData*>>& threads)
{for(auto& t : threads){t.join();}
}
int main()
{blockingqueue_t *bq = new blockingqueue_t(5);std::vector<Thread<ThreadData*>> threads;StartConsumer(&threads,3,bq);StartProducer(&threads,1,bq);WaitAllThread(threads);delete bq;return 0;
}

这种方式比较冗余,不够简洁,因为Thread中已经保存了当前线程的名字,直接用这个更好。

更简洁的方式:直接使用类Thread中已经保存的成员变量_threadname

#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <ctime>using namespace ThreadModule;
using blockingqueue_t = BlockingQueue<Task>;void Consumer(Thread<blockingqueue_t*>* thread)//传类,利用成员中的name函数获取线程名
{const std::string& thread_name = thread->name();  // 获取线程名blockingqueue_t* bq = thread->data();             // 获取共享队列while (true){Task task;bq->Pop(&task);  task.Execute();  std::cout << "[" << thread_name << "] 消费: " << task.ResultToString() << std::endl;}
}void Producer(Thread<blockingqueue_t*>* thread)//传类,利用成员中的name函数获取线程名
{const std::string& thread_name = thread->name();  // 获取线程名blockingqueue_t* bq = thread->data();             // 获取共享队列srand(time(nullptr) ^ pthread_self());           while (true){// 生成随机任务(1-10和1-20的加法)int a = rand() % 10 + 1;int b = rand() % 20 + 1;Task task(a, b);bq->Push(task);  // 放入队列std::cout << "[" << thread_name << "] 生产: " << task.DebugToString() << std::endl;}
}void StartThreads(std::vector<Thread<blockingqueue_t*>>* threads, int num,blockingqueue_t* bq, func_t<Thread<blockingqueue_t*>*> func,const std::string& prefix)
{for (int i = 0; i < num; ++i){std::string thread_name = prefix + std::to_string(i + 1);  threads->emplace_back(func, bq, thread_name);             threads->back().start();                                   
}void StartConsumers(std::vector<Thread<blockingqueue_t*>>* threads, int num, blockingqueue_t* bq)
{StartThreads(threads, num, bq, Consumer, "consumer-");
}void StartProducers(std::vector<Thread<blockingqueue_t*>>* threads, int num, blockingqueue_t* bq)
{StartThreads(threads, num, bq, Producer, "producer-");
}void WaitForAll(std::vector<Thread<blockingqueue_t*>>& threads)
{for (auto& t : threads){t.join();}
}int main()
{blockingqueue_t* bq = new blockingqueue_t(5);std::vector<Thread<blockingqueue_t*>> threads;StartConsumers(&threads, 3, bq);StartProducers(&threads, 2, bq);WaitForAll(threads);delete bq;return 0;
}

生产者,消费者函数中将BlockingQueue<Task>* data换成Thread<blockingqueue_t*>* thread,这个Thread中既有阻塞队列的信息,又有当前线程名的信息。

生产消费模型用的是一个阻塞队列,生产者与消费者中放入任务读取任务的过程都是被加锁的,所以这个过程是串行的,那么生产消费模型为什么可以提供比较好的并发度?

角色流程是否需要加锁 / 串行耗时特征
生产者1. 生产任务(如计算数据、读取文件)否(并行)耗时较长(核心工作)
2. 向阻塞队列插入任务是(串行)耗时极短(仅队列操作)
消费者1. 从阻塞队列取出任务是(串行)耗时极短(仅队列操作)
2. 执行任务(如处理数据、写入数据库)否(并行)耗时较长(核心工作)

在生产消费模型中,阻塞队列的操作(任务的入队与出队)因需保证线程安全而通过同步机制(如加锁)实现串行化,但这一串行性仅局限于对队列本身的瞬时操作。从整体流程看,模型的高并发特性源于 “任务生产 / 执行的并行性”
生产者:任务的生产过程(如数据计算、资源准备等耗时操作)是多线程并行执行的;仅当任务生产完成后,生产者才会申请锁执行入队操作(瞬时串行)。在此期间,未获得锁的生产者无需等待,可立即投入下一个任务的生产过程。因此,生产者的核心工作(任务生产)始终处于并行状态,仅在入队这一短耗时环节存在串行等待,整体时间利用率极高。
消费者:消费者获取任务时,仅在出队操作(瞬时串行)阶段需要竞争锁;一旦成功获取任务,便进入任务执行阶段(如业务处理、结果计算等耗时操作),此过程为多线程并行执行。当部分消费者在并行执行任务时,其他消费者可竞争锁获取新任务,使得任务执行这一核心工作始终保持并行,串行仅存在于出队这一短环节。

http://www.dtcms.com/a/553349.html

相关文章:

  • 全新升级!山海鲸4.6.3版本正式亮相
  • 图像压缩-将8bit数据压缩为2bit
  • 海宁网站怎么做seo企业网站能起到什么作用
  • 手机在网上怎么创建自己的网站网站开发人员职位
  • LineSlam线特征投影融合(Fuse) 中pML->GetLineNormalVector()的理解代码理解
  • 【图像处理基石】多波段图像融合算法入门:从概念到实践
  • Docker核心文件:DockerCompose文件
  • 企业网站 自助建站网站做竞价经常会被攻击吗
  • Android ble理解
  • 深入理解 Spring Boot Web 开发中的静态资源处理机制
  • 网站建设费记什么科目产品ui设计是什么
  • 衡水企业网站设计国内app开发公司
  • 【java EE】IDEA 中创建或迁移 Spring 或 Java EE 项目的核心步骤和注意事项
  • 如何保证缓存与数据库更新时候的一致性
  • 【Spring Boot Starter 设计思考:分离模式是否适用于所有场景】
  • HTTP 头部参数数据注入测试sqlilabs less 18
  • 网站速度慢的原因做网站建设优化的电话话术
  • 【数据结构】单链表 练习记录
  • mac 安装 jdk17
  • 【项目实战1-瑞吉外卖|day22】
  • 怎么用dw做响应式网站网站主持人制作网站代言人
  • Android开发自学笔记 --- Kotlin
  • 从VB到PyCharm:编程工具跨越时代的传承与革命
  • 网站建设创新成果四年级写一小段新闻
  • 生产环境用Go语言完成微服务搭建和业务融入
  • 第九课 四川料理は辛いです
  • DevEco Studio在模拟器中改变运行的 ets 文件
  • 第5讲:项目依赖管理与资源管理
  • 网站定制案例微安电力wordpress 分类合并
  • Orleans 的异步