当前位置: 首页 > news >正文

linux线程同步

线程互斥解决错误问题,而线程同步解决高效问题,让执行流以一定的顺序访问临界资源

1. 线程同步

1.1 同步概念与竞态条件

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免

饥饿问题,叫做同步

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

1.2 条件变量

当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如⼀个线程访问队列时,发现队列为空,它只能等待,直到其它线程将⼀个节点添加到队列

中。这种情况就需要用到条件变量。

1.2.1 初始化与销毁

初始化条件变量

int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
参数:
cond:指向要初始化的条件变量的指针
attr:条件变量属性,通常设为NULL表示默认属性返回值:成功返回0,失败返回错误码静态初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

销毁条件变量 

int pthread_cond_destroy(pthread_cond_t *cond)
参数:cond:要销毁的条件变量返回值:成功返回0,失败返回错误码

1.2.2 等待与唤醒

等待条件变量

int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量行为:
原子性地释放互斥锁并进入等待状态,被唤醒后,重新获取互斥锁返回值:成功返回0,失败返回错误码int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);功能:带超时的等待条件变量参数:
abstime:绝对时间点(不是时间间隔),超过此时间不再等待返回值:成功返回0,超时返回ETIMEDOUT,失败返回其他错误码

通知条件变量 

int pthread_cond_signal(pthread_cond_t *cond);功能:唤醒至少一个等待该条件变量的线程
参数:cond:要通知的条件变量int pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒所有等待该条件变量的线程
参数:cond:要广播通知的条件变量返回值:成功返回0,失败返回错误码
#include <iostream>
#include <string>
#include <pthread.h>
#include <vector>
#include <unistd.h>pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;int cnt = 1000;
#define NUM 5void *threadrun(void *args)
{std::string name = static_cast<char *>(args);while(true){pthread_mutex_lock(&glock);//判定本身就是访问临界资源,判定一定是在临界区内部的,判定结果也一定是在临界区内部的//所以,条件不满足需要休眠,也是在临界区内部休眠的pthread_cond_wait(&gcond, &glock); //锁在wait之前会被自动释放掉std::cout << name << "计算:" << cnt << std::endl;cnt++;pthread_mutex_unlock(&glock);sleep(1);}return nullptr;
}int main()
{std::vector<pthread_t> threads;for(int i = 0; i < NUM; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "thread-%d", i);int n = pthread_create(&tid, nullptr, threadrun, name);if(n != 0) continue;threads.push_back(tid);}sleep(3);while(true) //每隔一秒唤醒一个线程{std::cout << "唤醒一个线程"  << std::endl;pthread_cond_signal(&gcond);//std::cout << "唤醒所有线程"  << std::endl;//pthread_cond_broadcast(&gcond);sleep(1);}for(auto &id : threads){int n = pthread_join(id, nullptr);}return 0;
}

线程按顺序被唤醒 

条件变量允许线程进行等待,允许一个线程唤醒在cond等待的其他线程

2. 生产者消费者模型

2.1 简介

生产者-消费者模型(Producer-Consumer Model)是一种典型的多线程同步模型,用于处理生产者和消费者之间的协作问题。它常用于操作系统、并发编程、线程池、任务队列等场景。

(1)基本概念

生产者(Producer): 负责生产数据(或任务),放入缓冲区。

消费者(Consumer): 负责从缓冲区取出数据(或任务)进行处理。

缓冲区(Buffer): 一个用于存储生产者生成、等待被消费者处理的数据的容器。可以是队列、数组等。

(2)存在的问题

  1. 缓冲区满时,生产者需要等待。

  2. 缓冲区空时,消费者需要等待。

  3. 多线程并发时,需要保证缓冲区的线程安全,防止数据竞争或丢失。

(3)解决方案

通常借助线程同步机制解决生产者消费者问题:

互斥锁(mutex): 保证临界区(缓冲区)操作的互斥性。

条件变量(condition variable): 实现线程的等待与通知。

信号量(semaphore): 用于控制可用资源数,适合计数型控制。

阻塞队列(BlockingQueue): 高级语言中已有封装(如 Java 的 LinkedBlockingQueue)。

(4)模型示意图

            生产者线程+-------------------+|  生成产品         ||  加锁             ||  判断缓冲区是否满 ||  放入缓冲区       ||  通知消费者       |+-------------------+↓缓冲区(队列)↑+--------------------+|  消费者线程        ||  加锁              ||  判断缓冲区是否空  ||  从缓冲区取出数据  ||  处理数据          ||  通知生产者        |+--------------------+

(5)模型特点

解耦:生产者和消费者不需要知道对方的存在,只通过缓冲区交互

平衡:缓冲区的存在可以平衡生产速度和消费速度的差异

并发:生产者和消费者可以并行工作,提高系统吞吐量、效率

(6)工作流程

生产者逻辑

1.获取互斥锁
2.检查缓冲区是否已满
3.如果满,等待"非满"条件
4.将数据放入缓冲区
5.发送"非空"信号
6.释放互斥锁

消费者逻辑

1.获取互斥锁
2.查缓冲区是否为空
3.如果空,等待"非空"条件
4.从缓冲区取出数据
5.发送"非满"信号
6.释放互斥锁

(7)3种关系

生产者与生产者之间:竞争、互斥关系。

消费者与消费者之间:互斥关系。

生产者与消费者之间:互斥、同步关系。

(8)小故事帮助理解

将缓冲区看作超市,生产者看作工厂,消费者看作顾客,条件变量即超市客服。

 🏭 工厂(生产者)
- 工厂不断生产商品(数据/任务)
- 生产速度有时快有时慢(取决于原材料、机器状态等)

🛒 超市(缓冲区)
- 货架容量有限
- 商品从工厂运来后放在货架上
- 消费者从货架取商品

👩💼 超市客服(条件变量)
- 有两位专门的客服人员:
  1. "货架不满"客服:负责通知工厂可以送货了
  2. "货架不空"客服:负责通知消费者可以购物了

 👪 顾客(消费者)
- 不断来超市购买商品
- 购买速度因人而异(有的顾客买得快,有的买得慢)

---🎭 故事场景

早晨开业时:
1. 货架空空如也(buffer.empty()==true)
2. 顾客A想买东西,发现货架是空的 👉 去找"货架不空"客服登记等待
3. 此时工厂送来第一批货物 👉 放满货架后通知"货架不空"客服
4. 客服立即广播:"货架有货啦!"
5. 顾客A被唤醒,开始购物

营业高峰期:
- 工厂拼命生产(多个生产者线程)
- 顾客络绎不绝(多个消费者线程)
- 当货架快满时(buffer.size()>=10):
  - "货架不满"客服会拦住工厂货车:"别送了!等卖出去一些再来!"
- 当货架快空时:
  - "货架不空"客服会安抚排队顾客:"稍等,新货马上到!"

特殊状况处理:
- 有时客服可能会错误叫醒人(虚假唤醒)👉 被叫醒的人会再次确认货架状态(while循环检查)
- 超市有个保安(mutex锁)确保:
  - 每次只有一个人能查看/修改货架状态
  - 顾客拿商品时,其他人必须排队等待

---💡 关键启示

1. 客服协调机制(条件变量)避免了:
   - 工厂不停白跑送货(忙等待)
   - 顾客空排队(资源浪费)

2. 货架容量限制 防止:
   - 商品堆积如山(内存溢出)
   - 货物短缺(消费者饥饿)

3.保安的存在(互斥锁)确保:
   - 不会出现多人同时搬货导致库存数量错误
   - 不会发生顾客拿到破损商品(数据竞争)

传统单线程模式(串行)

[工厂] → [卡车送货] → [超市收货] → [货架补货] → [顾客购买]
  1. 工厂生产完才能送货

  2. 超市必须停止营业才能收货

  3. 顾客要等所有流程结束才能购物
    👉 问题:大量时间浪费在等待上!

并发模式

[工厂]─┬─→[卡车A送货]→[收货区] ├─→[卡车B送货]→[收货区]    ← 并行生产└─→[卡车C送货]→[收货区][顾客A]←─[收银台1]←─[货架] 
[顾客B]←─[收银台2]←─[货架]      ← 并行消费
[顾客C]←─[收银台3]←─[货架]

关键改进

  1. 生产消费重叠:送货和购物同时进行

  2. 缓冲区作用:货架作为缓冲,消除等待

  3. 分工并发:多个收银台/送货通道并行工作

生产者消费者模型通过并发提高效率体现在:未来获取任务和处理任务时是并发的,而不是进出交易场所时的并发

2.2 基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

新角色:智能传送带(阻塞队列)

是一条自动化传送带,连接工厂和收银台

自带计数显示屏(原子计数器),精确显示商品数量

两个智能挡板

 - 入口挡板:当传送带满时自动关闭(写阻塞)

 - 出口挡板:当传送带空时自动关闭(读阻塞)

全自动阻塞

 - 写满时:工厂线程自动休眠(而不是忙等)

 -  读空时:顾客线程自动等待(不浪费CPU)

Main.cc

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <functional>// 我们定义了一个任务类型,返回值void,参数为空
using task_t = std::function<void()>;void Download()
{std::cout << "我是一个下载任务..." << std::endl;sleep(3); //假设处理比较耗时
}void *consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);while(true){// 1. 消费任务task_t t = bq->Pop();// 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了t();}
}void *productor(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);int data = 1;while(true){sleep(1); //生产慢,生产一个消费一个// 1. 获得任务std::cout << "生产了一个任务: " << std::endl;// 2. 生产任务bq->EQueue(Download);}
}int main()
{//申请阻塞队列BlockQueue<task_t> *bq = new BlockQueue<task_t>();//构建生产者消费者pthread_t c[2], p[3];pthread_create(c, nullptr, consumer, bq);pthread_create(c+1, nullptr, consumer, bq);pthread_create(p, nullptr, productor, bq);pthread_create(p+1, nullptr, productor, bq);pthread_create(p+2, nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);//单生产单消费// pthread_t c, p;// pthread_create(&c, nullptr, consumer, bq);// pthread_create(&p, nullptr, productor, bq);// pthread_join(c, nullptr);// pthread_join(p, nullptr);return 0;
}

 BlockQueue.hpp

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include <queue>const int defaultcap = 5;template <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}void EQueue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){// 让生产者线程进行等待//1. pthread_cond_wait调用成功,挂起线程之前,要先自动释放锁//2. 当线程被唤醒时,默认从临界区被唤醒,从pthread_cond_wait开始//成功返回需要当前线程重新申请锁//3.如果一个线程被唤醒,但申请锁失败了,该线程就会在锁上阻塞等待_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" <<  _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}_q.push(in);if(_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_empty_cond); // 可以pthread_mutex_unlock(&_mutex);// pthread_cond_signal(&_empty_cond); // 可以}T Pop(){pthread_mutex_lock(&_mutex);while(IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond); // 可以pthread_mutex_unlock(&_mutex);return data;// pthread_cond_signal(&_full_cond); // 可以}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源int _cap;           // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠个数int _psleep_num; // 生产者休眠个数
};

3. POSIX信号量

信号量的本质是一个计数器,是对特定资源的预定机制。

多线程使用资源,有两种场景:

1.将目标资源整体使用:mutex + 2元信号量

2.将目标按不同的块分批使用

 3.1 信号量操作

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem:指向信号量变量的指针。pshared:
0:信号量在线程间共享(常用)。
非0:信号量在进程间共享(需位于共享内存中)。value:信号量的初始值(例如,环形队列的空槽数量)。返回值:成功返回 0,失败返回 -1 并设置 errno。

 销毁信号量

int sem_destroy(sem_t *sem);
功能:销毁一个未命名的POSIX信号量,释放内核资源。注意:
必须确保没有线程在等待该信号量,否则行为未定义。

 等待信号量(P操作)

int sem_wait(sem_t *sem);
功能:
信号量值 减1(原子操作)。
如果信号量值为 0,则调用线程 阻塞,直到信号量变为正数。
int sem_post(sem_t *sem);

 发布信号量(V操作)

功能:
信号量值 加1(原子操作)。
如果有线程因该信号量阻塞,则唤醒其中一个。

    3.2 基于环形队列的生产者消费者模型

    环形队列(Ring Buffer)

    固定大小的循环数组,避免动态内存分配。

    通过 head 和 tail 指针管理读写位置。

    同步机制

    互斥锁(Mutex):保护 head 和 tail 的修改。

    POSIX信号量:控制资源可用性(空槽和满槽)。

    信号量的另一个本质:

    信号量把对临界资源是否存在、就绪等条件的判断,以原子性的形式,在访问临界资源之前就完成了。

    RingQueue.hpp

    #pragma once#include <iostream>
    #include <vector>
    #include "Sem.hpp"
    #include "Mutex.hpp"static const int gcap = 5; // for debugusing namespace SemModule;
    using namespace MutexModule;template <typename T>
    class RingQueue
    {
    public:RingQueue(int cap = gcap): _rq(cap),_cap(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}void EQueue(const T &in){// 1. 申请信号量, 空位置的信号量_blank_sem.P();{LockGuard lockguard(_pmutex);// 2. 生产_rq[_p_step] = in;// 3. 更新下表_p_step++;// 4. 维持环形特性_p_step %= _cap;}_data_sem.V();}void Pop(T *out){// 1. 申请信号量,数据信号量_data_sem.P();{LockGuard lockguard(_cmutex);// 2. 消费*out = _rq[_c_step];// 3. 更新下标_c_step++;// 4. 维持环形特性_c_step %= _cap;}_blank_sem.V();}~RingQueue() {}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem;int _p_step;// 消费者Sem _data_sem;int _c_step;Mutex _cmutex;Mutex _pmutex;
    };

    Mutex.hpp

    #pragma once
    #include <iostream>
    #include <pthread.h>namespace MutexModule
    {class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
    }

     Sem.hpp

    #include <iostream>
    #include <semaphore.h>
    #include <pthread.h>namespace SemModule
    {const int defaultval = 1;class Sem{public:Sem(unsigned int sem_val = defaultval){sem_init(&_sem, 0, sem_val);}void P(){int n = sem_wait(&_sem); // 原子的}void V(){int n = sem_post(&_sem); // 原子的}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
    }

    Main.cc

    #include <iostream>
    #include <pthread.h>
    #include <unistd.h>
    #include "RingQueue.hpp"struct threaddata
    {RingQueue<int> *rq;std::string name;
    };void *consumer(void *args)
    {threaddata *td = static_cast<threaddata*>(args);while (true){sleep(3);// 1. 消费任务int t = 0;td->rq->Pop(&t);// 2. 处理任务std::cout << td->name << " 消费者拿到了一个数据:  " << t << std::endl;}
    }int data = 1;void *productor(void *args)
    {threaddata *td = static_cast<threaddata*>(args);while (true){sleep(1);// 1. 获得任务std::cout << td->name << " 生产了一个任务: " << data << std::endl;// 2. 生产任务td->rq->EQueue(data);data++;}
    }int main()
    {RingQueue<int> *rq = new RingQueue<int>();pthread_t c[2], p[3];threaddata *td = new threaddata();td->name = "cthread-1";td->rq = rq;pthread_create(c, nullptr, consumer, td);threaddata *td2 = new threaddata();td2->name = "cthread-2";td2->rq = rq;pthread_create(c + 1, nullptr, consumer, td2);threaddata *td3 = new threaddata();td3->name = "pthread-3";td3->rq = rq;pthread_create(p, nullptr, productor, td3);threaddata *td4 = new threaddata();td4->name = "pthread-4";td4->rq = rq;pthread_create(p + 1, nullptr, productor, td4);threaddata *td5 = new threaddata();td5->name = "pthread-5";td5->rq = rq;pthread_create(p + 2, nullptr, productor, td5);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
    }

    相关文章:

  • ES6 扩展运算符与 Rest 参数
  • yum命令常用选项
  • nginx 基于IP和用户的访问
  • leetcode hot100刷题日记——15.岛屿数量
  • Docker 安装 Harbor 教程(搭建 Docker 私有仓库 harbor 避坑指南)【woodwhales.cn】
  • java基础(面向对象进阶)
  • STM32中的IIC协议和OLED显示屏
  • ARM笔记-ARM指令集
  • 算法学习——从零实现循环神经网络
  • 7:QT加载保存参数(读写日志)
  • 5 分钟速通密码学!
  • List<Integer> list=new ArrayList<>()
  • Nginx stub_status 指南从启用到监控落地的全流程详解
  • 廉价却有效?ESD防护中的电容
  • 企业批量处理刚需PrintPDF 网络财务办公打印 网页到 Office 一键转 PDF
  • 【PhysUnits】10 减一操作(sub1.rs)
  • css五边形
  • 无需会员可一键转换
  • 【go】多线程编程如何识别和避免死锁,常见死锁场景分析,pprof使用指引
  • 【RK3588新品】嵌入式人工智能实验箱EDU-AIoT ELF 2发布
  • 怎么做网站的代理商/百度一下百度搜索官网
  • 宁夏网站建设优化/百度一下首页网址
  • 网站排版教程/电脑培训学校网站
  • 多品牌网站建设/做网站建设公司
  • 综合门户类网站有哪些/seo怎样
  • 南宁网站设计方案/ios aso优化工具