【Linux】线程同步和生产者消费者模型
在线程互斥【Linux】线程的互斥文章中我们演示的抢票的代码,运行结果虽然不会出现票数减到负数的问题了,但是仔细观察会发现,一段时间内抢票的线程是同一个。
相当于当前释放锁的进程,一释放又立马进到循环重新申请锁,导致别的线程申请不到锁,一直在等待,这就是其他线程饥饿问题。
如果我们在while循环里加一个休眠,申请到锁的线程就会均匀一些。
1.条件变量
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。
- 条件变量:条件变量是同步的核心机制,用于线程间传递“特定条件已满足”的信号,让等待该条件的线程从阻塞状态唤醒,避免无意义的消费CPU资源。
- 条件变量要与互斥锁配合使用,核心是解决“线程等待某个成立条件”的场景。
条件变量的类型就叫 phread_cond_t ,接口使用和互斥锁大差不差。
- 初始化和销毁:和互斥锁一样,有全局的和局部的条件变量。
- 等待:有两个接口,第一个参数都是线程要在哪个条件变量下等待,第二个参数是要传一把锁,而timedwait还有第三个参数,是设置等待时间的(此接口暂时不考虑)。
- 唤醒:唤醒的接口也有两个,signal是唤醒在该条件变量下等待的一个线程,broadcast是唤醒所有的线程。
线程等待之前,要对资源的数量进行判定,判定本身就是访问临界资源,所以判定一定要在临界区内部,所以条件变量要在互斥锁内部,判定结果也一定会是在临界区内部。
#include <iostream>
#include <pthread.h>
#include <string>
#include <vector>pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER; // 条件变量int num = 1;void *ThreadFunc(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&g_lock); // 加锁pthread_cond_wait(&g_cond, &g_lock); // 等待std::cout << name << " 修改num:" << num << std::endl;num++;pthread_mutex_unlock(&g_lock); // 解锁}return nullptr;
}int main()
{std::vector<pthread_t> threads;for (int i = 0; i < 5; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "thread - %d", i);int n = pthread_create(&tid, nullptr, ThreadFunc, name);if (n != 0)std::cerr << "pthread_create error" << std::endl;threads.push_back(tid);}for(auto &id : threads){pthread_join(id,nullptr);}return 0;
}
上面的代码让创建的所有线程都去条件变量下等待了,所以运行这个代码会在pthread_cond_wait卡住。
- 判定结果不满足的时候线程要休眠,那么休眠也一定是在临界区内部的!
- 要休眠时,线程是持有锁的,所以pthread_mutex_wait调用成功的话,在挂起当前线程之前会先自动释放锁!这就是pthread_mutex_wait函数还要传锁进去的原因。
- 当线程被唤醒时,一定是在pthread_mutex_wait内部被唤醒然后再继续向后执行,意思就是线程被唤醒的时候默认就在临界区内。
- 如果要从pthread_mutex_wait成功返回,需要当前线程重新申请锁,整个释放锁和申请锁的过程由pthread_mutex_wait内部自己完成。
- 如果线程成功被唤醒,但是申请锁失败了,抢不过别的线程,此时这个线程就会在锁上阻塞等待。
我们现在让主线程调用pthread_cond_signal,每隔一秒唤醒一个线程。
int main()
{std::vector<pthread_t> threads;for (int i = 0; i < 5; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "thread - %d", i);int n = pthread_create(&tid, nullptr, ThreadFunc, name);if (n != 0)std::cerr << "pthread_create error" << std::endl;threads.push_back(tid);}sleep(2);while (true){std::cout << "唤醒一个线程" << std::endl;pthread_cond_signal(&g_cond);sleep(1);}for (auto &id : threads){pthread_join(id, nullptr);}return 0;
}
也可以用pthread_cond_broadcast一次性把所有线程全部唤醒。
while(true)
{std::cout << "唤醒所有线程" << std::endl;pthread_cond_broadcast(&g_cond);sleep(1);
}
所以,条件变量允许线程等待,也允许一个线程唤醒在条件变量下等待的其他线程。
2.生产者消费者模型
生产者消费者模型有3个要素:生产者,消费者,一个“交易”场所。
生产者未来往交易场所里放数据,消费者未来从交易场所里拿数据,所以这个交易场所是一种临界资源。
3种关系:
- 生产者之间:互斥关系
- 消费者之间:互斥关系
- 生产者和消费者之间:互斥和同步关系
2种角色:生产者和消费者
1个交易场所:以特定结构构成的一种“内存”空间
优点:
- 生产过程和消费过程解耦
- 支持并发,提高效率
- 支持忙闲不均
3.基于阻塞队列的生产者消费者模型
3.1 阻塞队列
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构,其实就是生产者消费者模型里的一个交易场所。
3.2 单生产者、单消费者
因为生产者和消费者都只有一个,所以维护的关系就只有一个,就是生产者和消费者的互斥与同步关系。
//BlockQueue.hpp文件
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>const int defaultcap = 5;
template <typename T>
class BlockQueue
{
public:BlockQueue(const int cap = defaultcap) : _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_producer_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_producer_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _bq; // 队列int _cap; // 队列的容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _producer_cond; // 生产者的条件变量pthread_cond_t _consumer_cond; // 消费者的条件变量
};
当队列为空的时候,消费者不能再消费,会被阻塞;当队列为满的时候,生产者不能在生产,会被阻塞。为了方便实现,这里使用两个条件变量,分别给到生产者和消费者。
void Equeue(const T &data) // 生产者:入数据{pthread_mutex_lock(&_mutex); // 加锁if (IsFull()) // 队列满了{pthread_cond_wait(&_producer_cond, &_mutex); // 在条件变量里等}// 队列还有空间_bq.push(data); // 入数据pthread_mutex_unlock(&_mutex); // 解锁}
T Pop() // 消费者:拿数据{pthread_mutex_lock(&_mutex); // 加锁if (IsEmpty()) // 队列里没东西{pthread_cond_wait(&_consumer_cond, &_mutex); // 在条件变量里等}// 队列里还有数据T data = _bq.front(); // 拿数据std::cout << "消费了一个数据:" << data << std::endl;_bq.pop();pthread_mutex_unlock(&_mutex); // 解锁return data;}
现在面临的问题就是,生产者或者消费者进入等待状态后,谁来唤醒他们?答案是互相唤醒对方。
我们可以增加两个变量,记录生产者和消费者的休眠数量,生产者放入数据后队列里一定不为空,就可以唤醒消费者,消费者消费数据后队列就一定不为满,就能唤醒生产者。
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>const int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:bool IsFull() { return _bq.size() >= _cap; }bool IsEmpty() { return _bq.empty(); }public:BlockQueue(const int cap = defaultcap): _cap(cap),_p_sleep_num(0),_c_sleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_producer_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}void Equeue(const T &data) // 生产者:入数据{pthread_mutex_lock(&_mutex); // 加锁if (IsFull()) // 队列满了{_p_sleep_num++;pthread_cond_wait(&_producer_cond, &_mutex); // 在条件变量里等_p_sleep_num--;}// 队列还有空间_bq.push(data); // 入数据if (_c_sleep_num > 0) // 如果有消费者线程在休眠{pthread_cond_signal(&_consumer_cond); // 唤醒一个消费者std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex); // 解锁}T Pop() // 消费者:拿数据{pthread_mutex_lock(&_mutex); // 加锁if(IsEmpty()) // 队列里没东西{_c_sleep_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 在条件变量里等_c_sleep_num--;}// 队列里还有数据T data = _bq.front(); // 拿数据_bq.pop();if (_p_sleep_num > 0) // 如果有生产者线程在休眠{pthread_cond_signal(&_producer_cond); // 唤醒一个生产者std::cout << "唤醒生产者..." << std::endl;}pthread_mutex_unlock(&_mutex); // 解锁return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_producer_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _bq; // 队列int _cap; // 队列的容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _producer_cond; // 生产者的条件变量pthread_cond_t _consumer_cond; // 消费者的条件变量int _p_sleep_num; // 生产者休眠数量int _c_sleep_num; // 消费者休眠数量
};
- 问题一:pthread_cond_wait是一个函数,就有可能调用失败,如果调用失败,这个函数就不会让新线程阻塞,而是立即返回,线程继续往队列里push数据,但是当前队列是满的!
- 问题二:上面的代码是单生产单消费,如果是单生产多消费,队列里只有一个位置留给生产者了,但唤醒生产者的时候把所有生产者全唤醒了,如果我此时不是第一个醒来的,仅有的一个位置被第一个醒来的线程用了,但我也醒来了,然后我也去插入数据
上面的情况就叫pthread_cond_wait被伪唤醒了。所以,在条件判断的时候就不能用if,而要用while循环判断,这样可以增加代码的健壮性。
void Equeue(const T &data) // 生产者:入数据{pthread_mutex_lock(&_mutex); // 加锁while (IsFull()) // 队列满了{_p_sleep_num++;pthread_cond_wait(&_producer_cond, &_mutex); // 在条件变量里等_p_sleep_num--;}// 队列还有空间_bq.push(data); // 入数据if (_c_sleep_num > 0) // 如果有消费者线程在休眠{pthread_cond_signal(&_consumer_cond); // 唤醒一个消费者std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex); // 解锁}T Pop() // 消费者:拿数据{pthread_mutex_lock(&_mutex); // 加锁while (IsEmpty()) // 队列里没东西{_c_sleep_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 在条件变量里等_c_sleep_num--;}// 队列里还有数据T data = _bq.front(); // 拿数据_bq.pop();if (_p_sleep_num > 0) // 如果有生产者线程在休眠{pthread_cond_signal(&_producer_cond); // 唤醒一个生产者std::cout << "唤醒生产者..." << std::endl;}pthread_mutex_unlock(&_mutex); // 解锁return data;}
我们可以测试一下,让生产者没1秒生产一个数据,消费者在生产者没生产的时候只能等着。
//Main.cc文件
#include "BlockQueue.hpp"
#include <unistd.h>void *PThreadFunc(void *arg) // 生产者
{int data = 1;BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(arg);while (true){bq->Equeue(data); // 入数据std::cout << "生产了一个数据" << data << std::endl;data++;sleep(1);}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(arg);while (true){int data = bq->Pop(); // 拿数据std::cout << "消费了一个数据:" << data << std::endl;}return nullptr;
}int main()
{BlockQueue<int> bq; // 阻塞队列pthread_t p_tid, c_tid;pthread_create(&p_tid, nullptr, PThreadFunc, &bq);pthread_create(&c_tid, nullptr, CThreadFunc, &bq);pthread_join(p_tid, nullptr);pthread_join(c_tid, nullptr);return 0;
}
如果是生产者快,消费者慢,生产者一瞬间就会把队列干满。
3.3 任务
这个队列除了可以传内置类型,还可以传自定义类型,我们现在可以写一个很简单的任务。
形式1:
#include <iostream>
#include <string>class Task
{
public:Task(int a, int b) : _a(a), _b(b), _result(0){}void Excute(){_result = _a + _b;}std::string ResultToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "=" +std::to_string(_result);}std::string DebugToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "=?";}private:int _a;int _b;int _result;
};
然后把这个头文件包含在Main.cc里,往队列里传递任务。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>void *PThreadFunc(void *arg) // 生产者
{int x = 1;int y = 1;BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(1);Task t(x, y);std::cout << "生产了一个任务:" << t.DebugToString() << std::endl;bq->Equeue(t);x++, y++;}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){Task t = bq->Pop();t.Excute(); // 拿到任务执行std::cout << "完成了一个任务:" << t.ResultToString() << std::endl;}return nullptr;
}int main()
{BlockQueue<Task> bq; // 阻塞队列pthread_t p_tid, c_tid;pthread_create(&p_tid, nullptr, PThreadFunc, &bq);pthread_create(&c_tid, nullptr, CThreadFunc, &bq);pthread_join(p_tid, nullptr);pthread_join(c_tid, nullptr);return 0;
}
形式2:
#include <iostream>
#include <string>
#include <functional>//任务形式2
using Task = std::function<void()>; //返回值void,参数为空的函数类型
void Flush()
{std::cout << "我是一个刷新的任务" << std::endl;
}
void *PThreadFunc(void *arg) // 生产者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(1);std::cout << "生产了一个任务" << std::endl;bq->Equeue(Flush);}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){Task t = bq->Pop();t(); // 拿到任务执行}return nullptr;
}int main()
{BlockQueue<Task> bq; // 阻塞队列pthread_t p_tid, c_tid;pthread_create(&p_tid, nullptr, PThreadFunc, &bq);pthread_create(&c_tid, nullptr, CThreadFunc, &bq);pthread_join(p_tid, nullptr);pthread_join(c_tid, nullptr);return 0;
}
3.4 多生产多消费
前面我们实现的单生产单消费模型,其实就是多生产多消费模型,前面的代码已经维护好了生产多消费者模型中的3中关系。我们现在用2个生产者和三个消费者做实验。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>void *PThreadFunc(void *arg) // 生产者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(1);std::cout << "生产了一个任务" << std::endl;bq->Equeue(Flush);}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){Task t = bq->Pop();t(); // 拿到任务执行}return nullptr;
}int main()
{BlockQueue<Task> bq; // 阻塞队列pthread_t p1, p2, c1, c2, c3;pthread_create(&p1, nullptr, PThreadFunc, &bq);pthread_create(&p2, nullptr, PThreadFunc, &bq);pthread_create(&c1, nullptr, CThreadFunc, &bq);pthread_create(&c2, nullptr, CThreadFunc, &bq);pthread_create(&c3, nullptr, CThreadFunc, &bq);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);return 0;
}
如果一次性生产满了,生产者就会休眠,我们可以打印出来看看。
void *PThreadFunc(void *arg) // 生产者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){std::cout << "生产了一个任务" << std::endl;bq->Equeue(Flush);}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(10); //10秒之后再消费Task t = bq->Pop();t(); // 拿到任务执行}return nullptr;
}
void Equeue(const T &data) // 生产者:入数据{pthread_mutex_lock(&_mutex); // 加锁while (IsFull()) // 队列满了{_p_sleep_num++;printf("生产者休眠:%d\n", _p_sleep_num);pthread_cond_wait(&_producer_cond, &_mutex); // 在条件变量里等_p_sleep_num--;}// 队列还有空间_bq.push(data); // 入数据if (_c_sleep_num > 0) // 如果有消费者线程在休眠{pthread_cond_signal(&_consumer_cond); // 唤醒一个消费者std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex); // 解锁}
4.封装条件变量
条件变量的接口需要用到互斥锁,互斥锁我们已经封装过了,这里直接呈现代码。
//Mutex.hpp文件
#pragma once
#include <iostream>
#include <pthread.h>
#include <cstring>
#include <cstdio>namespace MyMutex
{class Mutex{public:Mutex(){pthread_mutex_init(_plock, nullptr); // 锁初始化}void Lock() // 加锁{int n = pthread_mutex_lock(_plock);if (n != 0)std::cerr << "pthread_mutex_lock fail: " << strerror(n) << std::endl;}void UnLock() // 解锁{int n = pthread_mutex_unlock(_plock);if (n != 0)std::cerr << "pthread_mutex_unlock fail: " << strerror(n) << std::endl;}pthread_mutex_t *GetMutex(){return _plock;}~Mutex(){pthread_mutex_destroy(_plock); // 锁释放}private:pthread_mutex_t *_plock;};class LockGuard{public:LockGuard(Mutex *mutex): _mutex(mutex){_mutex->Lock(); // 构造时加锁}~LockGuard(){_mutex->UnLock(); // 析构时解锁}private:Mutex *_mutex;};
}
有了锁就能封装条件变量了。
//Cond.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include "Mutex.hpp"namespace MyCond
{using namespace MyMutex;class Cond{public:Cond(){int n = pthread_cond_init(&_cond, nullptr);(void)n; }void Wait(Mutex &mutex){int n = pthread_cond_wait(&_cond, mutex.GetMutex());(void)n;}void Signal(){int n = pthread_cond_signal(&_cond);(void)n;}void Broadcast(){int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){int n = pthread_cond_destroy(&_cond);(void)n; }private:pthread_cond_t _cond;};
}
验证我们写的条件变量是否正确,可以将前面我们实现的组的队列里有关锁和条件变量的所有接口都换成现在我们自己实现的。
#pragma once
#include <iostream>
#include "Mutex.hpp"
#include "Cond.hpp"
#include <queue>using namespace MyCond;
using namespace MyMutex;const int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:bool IsFull() { return _bq.size() >= _cap; }bool IsEmpty() { return _bq.empty(); }public:BlockQueue(const int cap = defaultcap): _cap(cap),_p_sleep_num(0),_c_sleep_num(0){}void Equeue(const T &data) // 生产者:入数据{{LockGuard lg(&_mutex);while (IsFull()) // 队列满了{_p_sleep_num++;printf("生产者休眠:%d\n", _p_sleep_num);_producer_cond.Wait(_mutex); // 在条件变量里等_p_sleep_num--;}// 队列还有空间_bq.push(data); // 入数据if (_c_sleep_num > 0) // 如果有消费者线程在休眠{_consumer_cond.Signal(); // 唤醒一个消费者std::cout << "唤醒消费者..." << std::endl;}}}T Pop() // 消费者:拿数据{T data;{LockGuard lg(&_mutex);while (IsEmpty()) // 队列里没东西{_c_sleep_num++;_consumer_cond.Wait(_mutex); // 在条件变量里等_c_sleep_num--;}// 队列里还有数据data = _bq.front(); // 拿数据_bq.pop();if (_p_sleep_num > 0) // 如果有生产者线程在休眠{_producer_cond.Signal(); // 唤醒一个生产者std::cout << "唤醒生产者..." << std::endl;}}return data;}~BlockQueue(){}private:std::queue<T> _bq; // 队列int _cap; // 队列的容量Mutex _mutex; // 互斥锁Cond _producer_cond; // 生产者的条件变量Cond _consumer_cond; // 消费者的条件变量int _p_sleep_num; // 生产者休眠数量int _c_sleep_num; // 消费者休眠数量
};
本次分享就到这里,我们下篇见~