linux线程同步
线程互斥解决错误问题,而线程同步解决高效问题,让执行流以一定的顺序访问临界资源
1. 线程同步
1.1 同步概念与竞态条件
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免
饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
1.2 条件变量
当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如⼀个线程访问队列时,发现队列为空,它只能等待,直到其它线程将⼀个节点添加到队列
中。这种情况就需要用到条件变量。
1.2.1 初始化与销毁
初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
参数:
cond:指向要初始化的条件变量的指针
attr:条件变量属性,通常设为NULL表示默认属性返回值:成功返回0,失败返回错误码静态初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond)
参数:cond:要销毁的条件变量返回值:成功返回0,失败返回错误码
1.2.2 等待与唤醒
等待条件变量
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量行为:
原子性地释放互斥锁并进入等待状态,被唤醒后,重新获取互斥锁返回值:成功返回0,失败返回错误码int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);功能:带超时的等待条件变量参数:
abstime:绝对时间点(不是时间间隔),超过此时间不再等待返回值:成功返回0,超时返回ETIMEDOUT,失败返回其他错误码
通知条件变量
int pthread_cond_signal(pthread_cond_t *cond);功能:唤醒至少一个等待该条件变量的线程
参数:cond:要通知的条件变量int pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒所有等待该条件变量的线程
参数:cond:要广播通知的条件变量返回值:成功返回0,失败返回错误码
#include <iostream>
#include <string>
#include <pthread.h>
#include <vector>
#include <unistd.h>pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;int cnt = 1000;
#define NUM 5void *threadrun(void *args)
{std::string name = static_cast<char *>(args);while(true){pthread_mutex_lock(&glock);//判定本身就是访问临界资源,判定一定是在临界区内部的,判定结果也一定是在临界区内部的//所以,条件不满足需要休眠,也是在临界区内部休眠的pthread_cond_wait(&gcond, &glock); //锁在wait之前会被自动释放掉std::cout << name << "计算:" << cnt << std::endl;cnt++;pthread_mutex_unlock(&glock);sleep(1);}return nullptr;
}int main()
{std::vector<pthread_t> threads;for(int i = 0; i < NUM; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "thread-%d", i);int n = pthread_create(&tid, nullptr, threadrun, name);if(n != 0) continue;threads.push_back(tid);}sleep(3);while(true) //每隔一秒唤醒一个线程{std::cout << "唤醒一个线程" << std::endl;pthread_cond_signal(&gcond);//std::cout << "唤醒所有线程" << std::endl;//pthread_cond_broadcast(&gcond);sleep(1);}for(auto &id : threads){int n = pthread_join(id, nullptr);}return 0;
}
线程按顺序被唤醒
条件变量允许线程进行等待,允许一个线程唤醒在cond等待的其他线程
2. 生产者消费者模型
2.1 简介
生产者-消费者模型(Producer-Consumer Model)是一种典型的多线程同步模型,用于处理生产者和消费者之间的协作问题。它常用于操作系统、并发编程、线程池、任务队列等场景。
(1)基本概念
生产者(Producer): 负责生产数据(或任务),放入缓冲区。
消费者(Consumer): 负责从缓冲区取出数据(或任务)进行处理。
缓冲区(Buffer): 一个用于存储生产者生成、等待被消费者处理的数据的容器。可以是队列、数组等。
(2)存在的问题
-
缓冲区满时,生产者需要等待。
-
缓冲区空时,消费者需要等待。
-
多线程并发时,需要保证缓冲区的线程安全,防止数据竞争或丢失。
(3)解决方案
通常借助线程同步机制解决生产者消费者问题:
互斥锁(mutex): 保证临界区(缓冲区)操作的互斥性。
条件变量(condition variable): 实现线程的等待与通知。
信号量(semaphore): 用于控制可用资源数,适合计数型控制。
阻塞队列(BlockingQueue): 高级语言中已有封装(如 Java 的 LinkedBlockingQueue
)。
(4)模型示意图
生产者线程+-------------------+| 生成产品 || 加锁 || 判断缓冲区是否满 || 放入缓冲区 || 通知消费者 |+-------------------+↓缓冲区(队列)↑+--------------------+| 消费者线程 || 加锁 || 判断缓冲区是否空 || 从缓冲区取出数据 || 处理数据 || 通知生产者 |+--------------------+
(5)模型特点
解耦:生产者和消费者不需要知道对方的存在,只通过缓冲区交互
平衡:缓冲区的存在可以平衡生产速度和消费速度的差异
并发:生产者和消费者可以并行工作,提高系统吞吐量、效率
(6)工作流程
生产者逻辑:
1.获取互斥锁
2.检查缓冲区是否已满
3.如果满,等待"非满"条件
4.将数据放入缓冲区
5.发送"非空"信号
6.释放互斥锁
消费者逻辑:
1.获取互斥锁
2.查缓冲区是否为空
3.如果空,等待"非空"条件
4.从缓冲区取出数据
5.发送"非满"信号
6.释放互斥锁
(7)3种关系
生产者与生产者之间:竞争、互斥关系。
消费者与消费者之间:互斥关系。
生产者与消费者之间:互斥、同步关系。
(8)小故事帮助理解
将缓冲区看作超市,生产者看作工厂,消费者看作顾客,条件变量即超市客服。
🏭 工厂(生产者)
- 工厂不断生产商品(数据/任务)
- 生产速度有时快有时慢(取决于原材料、机器状态等)
🛒 超市(缓冲区)
- 货架容量有限
- 商品从工厂运来后放在货架上
- 消费者从货架取商品
👩💼 超市客服(条件变量)
- 有两位专门的客服人员:
1. "货架不满"客服:负责通知工厂可以送货了
2. "货架不空"客服:负责通知消费者可以购物了
👪 顾客(消费者)
- 不断来超市购买商品
- 购买速度因人而异(有的顾客买得快,有的买得慢)
---🎭 故事场景
早晨开业时:
1. 货架空空如也(buffer.empty()==true)
2. 顾客A想买东西,发现货架是空的 👉 去找"货架不空"客服登记等待
3. 此时工厂送来第一批货物 👉 放满货架后通知"货架不空"客服
4. 客服立即广播:"货架有货啦!"
5. 顾客A被唤醒,开始购物
营业高峰期:
- 工厂拼命生产(多个生产者线程)
- 顾客络绎不绝(多个消费者线程)
- 当货架快满时(buffer.size()>=10):
- "货架不满"客服会拦住工厂货车:"别送了!等卖出去一些再来!"
- 当货架快空时:
- "货架不空"客服会安抚排队顾客:"稍等,新货马上到!"
特殊状况处理:
- 有时客服可能会错误叫醒人(虚假唤醒)👉 被叫醒的人会再次确认货架状态(while循环检查)
- 超市有个保安(mutex锁)确保:
- 每次只有一个人能查看/修改货架状态
- 顾客拿商品时,其他人必须排队等待
---💡 关键启示
1. 客服协调机制(条件变量)避免了:
- 工厂不停白跑送货(忙等待)
- 顾客空排队(资源浪费)
2. 货架容量限制 防止:
- 商品堆积如山(内存溢出)
- 货物短缺(消费者饥饿)
3.保安的存在(互斥锁)确保:
- 不会出现多人同时搬货导致库存数量错误
- 不会发生顾客拿到破损商品(数据竞争)
传统单线程模式(串行)
[工厂] → [卡车送货] → [超市收货] → [货架补货] → [顾客购买]
-
工厂生产完才能送货
-
超市必须停止营业才能收货
-
顾客要等所有流程结束才能购物
👉 问题:大量时间浪费在等待上!
并发模式
[工厂]─┬─→[卡车A送货]→[收货区] ├─→[卡车B送货]→[收货区] ← 并行生产└─→[卡车C送货]→[收货区][顾客A]←─[收银台1]←─[货架]
[顾客B]←─[收银台2]←─[货架] ← 并行消费
[顾客C]←─[收银台3]←─[货架]
关键改进:
-
生产消费重叠:送货和购物同时进行
-
缓冲区作用:货架作为缓冲,消除等待
-
分工并发:多个收银台/送货通道并行工作
生产者消费者模型通过并发提高效率体现在:未来获取任务和处理任务时是并发的,而不是进出交易场所时的并发
2.2 基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
新角色:智能传送带(阻塞队列)
是一条自动化传送带,连接工厂和收银台
自带计数显示屏(原子计数器),精确显示商品数量
有两个智能挡板:
- 入口挡板:当传送带满时自动关闭(写阻塞)
- 出口挡板:当传送带空时自动关闭(读阻塞)
全自动阻塞:
- 写满时:工厂线程自动休眠(而不是忙等)
- 读空时:顾客线程自动等待(不浪费CPU)
Main.cc
#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <functional>// 我们定义了一个任务类型,返回值void,参数为空
using task_t = std::function<void()>;void Download()
{std::cout << "我是一个下载任务..." << std::endl;sleep(3); //假设处理比较耗时
}void *consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);while(true){// 1. 消费任务task_t t = bq->Pop();// 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了t();}
}void *productor(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);int data = 1;while(true){sleep(1); //生产慢,生产一个消费一个// 1. 获得任务std::cout << "生产了一个任务: " << std::endl;// 2. 生产任务bq->EQueue(Download);}
}int main()
{//申请阻塞队列BlockQueue<task_t> *bq = new BlockQueue<task_t>();//构建生产者消费者pthread_t c[2], p[3];pthread_create(c, nullptr, consumer, bq);pthread_create(c+1, nullptr, consumer, bq);pthread_create(p, nullptr, productor, bq);pthread_create(p+1, nullptr, productor, bq);pthread_create(p+2, nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);//单生产单消费// pthread_t c, p;// pthread_create(&c, nullptr, consumer, bq);// pthread_create(&p, nullptr, productor, bq);// pthread_join(c, nullptr);// pthread_join(p, nullptr);return 0;
}
BlockQueue.hpp
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include <queue>const int defaultcap = 5;template <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}void EQueue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){// 让生产者线程进行等待//1. pthread_cond_wait调用成功,挂起线程之前,要先自动释放锁//2. 当线程被唤醒时,默认从临界区被唤醒,从pthread_cond_wait开始//成功返回需要当前线程重新申请锁//3.如果一个线程被唤醒,但申请锁失败了,该线程就会在锁上阻塞等待_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}_q.push(in);if(_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_empty_cond); // 可以pthread_mutex_unlock(&_mutex);// pthread_cond_signal(&_empty_cond); // 可以}T Pop(){pthread_mutex_lock(&_mutex);while(IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond); // 可以pthread_mutex_unlock(&_mutex);return data;// pthread_cond_signal(&_full_cond); // 可以}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源int _cap; // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠个数int _psleep_num; // 生产者休眠个数
};
3. POSIX信号量
信号量的本质是一个计数器,是对特定资源的预定机制。
多线程使用资源,有两种场景:
1.将目标资源整体使用:mutex + 2元信号量
2.将目标按不同的块分批使用
3.1 信号量操作
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem:指向信号量变量的指针。pshared:
0:信号量在线程间共享(常用)。
非0:信号量在进程间共享(需位于共享内存中)。value:信号量的初始值(例如,环形队列的空槽数量)。返回值:成功返回 0,失败返回 -1 并设置 errno。
销毁信号量
int sem_destroy(sem_t *sem);
功能:销毁一个未命名的POSIX信号量,释放内核资源。注意:
必须确保没有线程在等待该信号量,否则行为未定义。
等待信号量(P操作)
int sem_wait(sem_t *sem);
功能:
信号量值 减1(原子操作)。
如果信号量值为 0,则调用线程 阻塞,直到信号量变为正数。
int sem_post(sem_t *sem);
发布信号量(V操作)
功能:
信号量值 加1(原子操作)。
如果有线程因该信号量阻塞,则唤醒其中一个。
3.2 基于环形队列的生产者消费者模型
环形队列(Ring Buffer):
固定大小的循环数组,避免动态内存分配。
通过 head
和 tail
指针管理读写位置。
同步机制:
互斥锁(Mutex):保护 head
和 tail
的修改。
POSIX信号量:控制资源可用性(空槽和满槽)。
信号量的另一个本质:
信号量把对临界资源是否存在、就绪等条件的判断,以原子性的形式,在访问临界资源之前就完成了。
RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"static const int gcap = 5; // for debugusing namespace SemModule;
using namespace MutexModule;template <typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _rq(cap),_cap(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}void EQueue(const T &in){// 1. 申请信号量, 空位置的信号量_blank_sem.P();{LockGuard lockguard(_pmutex);// 2. 生产_rq[_p_step] = in;// 3. 更新下表_p_step++;// 4. 维持环形特性_p_step %= _cap;}_data_sem.V();}void Pop(T *out){// 1. 申请信号量,数据信号量_data_sem.P();{LockGuard lockguard(_cmutex);// 2. 消费*out = _rq[_c_step];// 3. 更新下标_c_step++;// 4. 维持环形特性_c_step %= _cap;}_blank_sem.V();}~RingQueue() {}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem;int _p_step;// 消费者Sem _data_sem;int _c_step;Mutex _cmutex;Mutex _pmutex;
};
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
Sem.hpp
#include <iostream>
#include <semaphore.h>
#include <pthread.h>namespace SemModule
{const int defaultval = 1;class Sem{public:Sem(unsigned int sem_val = defaultval){sem_init(&_sem, 0, sem_val);}void P(){int n = sem_wait(&_sem); // 原子的}void V(){int n = sem_post(&_sem); // 原子的}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
}
Main.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"struct threaddata
{RingQueue<int> *rq;std::string name;
};void *consumer(void *args)
{threaddata *td = static_cast<threaddata*>(args);while (true){sleep(3);// 1. 消费任务int t = 0;td->rq->Pop(&t);// 2. 处理任务std::cout << td->name << " 消费者拿到了一个数据: " << t << std::endl;}
}int data = 1;void *productor(void *args)
{threaddata *td = static_cast<threaddata*>(args);while (true){sleep(1);// 1. 获得任务std::cout << td->name << " 生产了一个任务: " << data << std::endl;// 2. 生产任务td->rq->EQueue(data);data++;}
}int main()
{RingQueue<int> *rq = new RingQueue<int>();pthread_t c[2], p[3];threaddata *td = new threaddata();td->name = "cthread-1";td->rq = rq;pthread_create(c, nullptr, consumer, td);threaddata *td2 = new threaddata();td2->name = "cthread-2";td2->rq = rq;pthread_create(c + 1, nullptr, consumer, td2);threaddata *td3 = new threaddata();td3->name = "pthread-3";td3->rq = rq;pthread_create(p, nullptr, productor, td3);threaddata *td4 = new threaddata();td4->name = "pthread-4";td4->rq = rq;pthread_create(p + 1, nullptr, productor, td4);threaddata *td5 = new threaddata();td5->name = "pthread-5";td5->rq = rq;pthread_create(p + 2, nullptr, productor, td5);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}