当前位置: 首页 > news >正文

【Linux】线程同步与生产消费者模型

一、线程同步:

什么是线程同步呢?

我们回顾上一篇文章中的自习室故事:

在一个学校有一个vip自习室,有一下规则:

1、在门外只有一把钥匙,只有一个人拿到钥匙后才能进入这个自习室
2、自习室内可以直到自己忙完后,自愿退出才将钥匙交出去

当早上的时候,小王来到了这个自习室,此时小王拿到了这个钥匙进入了自习室,那么其他学生想要进这个自习室就需要在门外等待 -----这就是多线程的阻塞等待

当小王想要出去吃饭的时候,刚一出去把钥匙挂在门口,但是此时发现有很多其他的人都在排队等待,于是又拿着钥匙回去了,此时别的人是抢不过小王的,因为小王是离钥匙最近的,其他人的竞争力比不过小王,长时间下去,外面的人进不了自习室也就是线程拿不到锁资源 ----- 这样就会导致多线程的饥饿问题

3、让所有的线程获取锁资源,按照一定的顺序性获取资源 -----同步

这个VIP自习室有一个管理员,当看到小王一直这样搞,导致其他人都没有得到很好的复习,并且当小王真的走了之后,发现外面的人一窝蜂地去抢这个钥匙,于是管理员认为不能够这样,于是增加了两条规则:

a)当里面的人把钥匙挂在门口后,不能马上拿钥匙,需要重新排队
b)外面的人需要有序排队

这样,所有的线程都能够有序地获得锁资源,这种按照一定的顺序性获取的资源叫做同步

也就是说:

同步:同步问题是保证数据安全的情况下,让我们的线程访问资源具有一定的顺序性

那么在多线程中如何快速实现同步呢?

条件变量:

在上述的自习室中,在外面的人访问自习室资源,发现没有钥匙,所以就在外面的等待队列中排好队,等到自习室里面的人出来后,会通过一个铃铛将等待队列中的人进行唤醒,接着队列中的第一个人就会拿着钥匙进去自习室,其中我们说的条件变量就可以看做这里的等待队列和一个唤醒机制

在使用条件变量的时候必须配合着锁,因为为什么会去等待队列中进行排队呢?为什么要将他们唤醒呢?这都是因为锁,因为申请锁失败就需要去等待队列中进行排队,当有人释放锁之后就需要唤醒他们来重新申请锁

条件变量接口:

这是条件变量的接口,可以看到和锁的接口使用几乎是一模一样的,条件变量类型为pthread_cond_t,在使用的时候和锁一样要进行初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr)

参数1:pthread_cond_t* 表示想要初始化的条件变量

参数2:const pthread_condattr_t* 表示初始化时的相关属性  设置为nullptr表示使用默认属性

返回值:成功返回 0,失败返回error number

在将条件变量使用完后要将条件变量销毁

int pthread_cond_destroy(pthread_cond_t *cond);

参数:pthread_cond_t* 表示想要销毁的条件变量

返回值:成功返回 0,失败返回error number

当然也可以对全局条件变量进行宏初始化

PTHREAD_COND_INITIALIZER表示自动初始化,自动销毁

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数1:进入等待队列的条件变量

参数2:这是一个互斥锁,用于辅助条件变量

返回值:成功返回0,失败返回error number

为什么这里要传锁的地址,因为进程进入等待对列的时候是带着锁进入的,此时就需要释放这个锁,不然的话,其他线程就无法申请锁资源,无法进入临界区了,所以等待队列拿到锁的地址是为了释放锁资源

int pthread_cond_signal(pthread_cond_t *cond);

参数1:将条件变量队列中的队头线程唤醒,重新申请锁

返回值:成功返回0,失败返回error number

int pthread_cond_broadcast(pthread_cond_t *cond);

参数:将条件变量队列中的所有线程唤醒,重新竞争锁

返回值:成功返回0,失败返回error number

接下来简单使用条件变量:

主要思路就是先创建多个线程,然后在临界区中直接将该线程放入等待队列,等到所有线程创建完后,每过1秒主线程将等待队列中的一个线程进行唤醒,然后依次将计数器cnt++

#include <iostream>
#include <pthread.h>
#include <unistd.h>

using namespace std;

const int N = 5;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int cnt = 0;

void* Func(void* args)
{
    pthread_detach(pthread_self());//自我分离线程
    //这里要用无符号长整型,因为在Linux下,指针的大小是8字节,而int是4字节,如果使用int会丢失数据
    uint64_t number = (uint64_t)args;
    cout<<"thread-"<<number<<" 创建成功"<<endl;
    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond,&mutex);
        cnt++;
        cout<<"thread-"<<number<<" cnt = "<< cnt << endl;
        pthread_mutex_unlock(&mutex);
        usleep(100);
    }
}
int main()
{
    for(uint64_t i = 0;i<N;i++)
    {
        pthread_t tid;
        pthread_create(&tid,nullptr,Func,(void*)i);
        usleep(10);
    }
    while(true)
    {
        pthread_cond_signal(&cond);
        sleep(1);
    }
    return 0;
}

二、生产消费者模型:

我们依然通过一个实际的例子来理解:

如上,这是一个生产消费者模型,生产者生产数据放入超市,消费者消费数据

那么为什么要存在超市呢?为什么不消费者直接从生产者那里去拿呢?

当不存在超市的时候,生产者工厂就需要听从消费者的,如果消费者只要一包呢?那么生产者是不是也只生产一包呢?-----那这样的话赚的米连成本都不够,并且对于生产者在每次需要的时候都要跑到工厂中等待,这样显然效率是很慢的

当存在超市的时候,生产者工厂就不用等待消费者的了,只需要生产一批,放入超市,而消费者,也不用去工厂等了,这样,使数据流动效率变高了,能够节省生产者和消费者的时间,这样,超市作为一个缓存区域,将生产者所生产的数据和消费者所消费的数据管理起来,能够大大提高各自的效率

这样,生产者如果超市有空位的话,就只顾自己生产即可,消费者如果超市中有数据的话,就只顾自己消费即可,这样生产者和消费者就自顾自的,不需要考虑对方,这样就进行了一定程度上的解耦合

所以,生产消费者的模型的优点是支持忙闲不均,并实现了生产者和消费者的解耦

在我们的Linux系统中,扮演生产者和消费者的都是线程,扮演类似超市的缓存就是特定的数据结构

所以在Linux系统中的生产消费者模型本质上是线程间通信,更重要的是如何安全高效的进行通信,对于特定数据结构中的资源来说,这就是一个共享资源,一个线程进行生产数据,一个线程进行拿数据,既然是共享资源,就会存在并发问题,那么就要知道这两者之间的三种关系:

321原则:

生产者和生产者:互斥

这是很明显的竞争关系,就像康师傅和统一,这两者就都想占据更多的资源来将货物卖出去,所以生产者和生产者之间是很明显的互斥关系

消费者和消费者:互斥

其实这也是一种竞争互斥关系,在日常生活中,可能感受不到,这是因为资源是足够的,当如果世界末日了,超市里是只有一袋方便面了,这个时候消费者就会去抢,所以本质上,消费者和消费者之间也是一种互斥关系

生产者和消费者:同步与互斥

既存在同步又存在互斥:小王想去超市买方便面,但是超市老板告诉小王卖完了,但是小王很想吃,于是循环地问超市老板,但是这样是不行的,效率很低,并且在问的时候小王做不了其他事情,事实上不必这样,当超市老板从生产者工厂进货,当进货完成后再通知小王即可,这就证明了生产者和消费者之间是有顺序性的,所以这是同步,当老板在进货的时候小王是不能够去超市买方便面的,这是互斥关系

如上可以理解记忆为:3种关系,2种角色,1个特定数据结构的交易场所,简称321原则

生产消费者模型的高效问题:

如上,这是一个生产消费者模型,我们要理解一个问题:生产者生产的数据从何而来

在Linux系统中,生产者生产的数据是读取用户所输入的或者是网卡所传输的,生产者所生产的数据就是获取这些数据,并且获取这些数据是要花费时间的

同样的,当消费者拿到数据后,并不仅仅是拿到数据,更重要的是后续的处理数据,所以消费者在拿到数据后要进行处理也是要花费时间的

所以对生产者和消费者的理解不仅仅是生产数据消费数据

生产者:首先获得数据,在生产数据

消费者:首先消费数据,在处理数据

回到我们的问题:生产消费者模型为何高效?

1、生产者和消费者在同一个场所中进行生产消费操作

2、生产者在生产的时候是无需关心消费者的状态的,只需要关注特定数据结构的交易场所是否有存在空位

3、消费者在消费的时候是无需关心生产者的状态的,只需要关注特定数据结构的交易场所是否有存在数据

所以,生产消费者模型的高效问题本质上是生产者和消费者一个访问临界区的代码,一个访问非临界区的代码

当在生产者在进行生产的时候消费者是不能够消费数据的,当在消费者在进行消费的时候生产者是不能够生产数据的,这是互斥,

但是当生产者在进行生产的时候消费者是可以处理其拿到的数据的,当消费者在进行消费的时候生产者是可以获得数据的

所以,高效的本质是当生产者和消费者一个访问临界区资源,一个访问非临界区资源,在这种并发访问的状态下,实现生产者和消费者之间的高效问题

这样,生产消费者模型就能够做到忙线不均,生产者,消费者,交易场所各司其职,做到了解耦,便于维护

三、基于阻塞队列实现生产消费者模型:

阻塞队列:

这是一种特殊的队列,和队列一样,具备先进先出的特点,但是其大小是固定的,也就是具备最大值的概念

所以:

当阻塞队列为满时:无法入队列,也就是无法生产数据,此时阻塞休眠
当阻塞队列为空时:无法出队列,也就是无法消费数据,此时阻塞休眠

实现阻塞队列:

这里用封装queue来实现阻塞队列:

首先是主体思路:

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>

template<class T>
class Blockqueue
{
public:
    Blockqueue()
    {}
    T pop()
    {}
    void push(T in)
    {}
    ~Blockqueue()
    {}
private:
    std::queue<T> _q;
    pthread_mutex_t _mutex;
    pthread_cond_t _c_cond;
    pthread_cond_t _p_cond;
    int _maxqueue;
};

如上,_q就是特定的数据结构,_mutex就是锁,_c_cond和_p_cond分别是生产者和消费者的条件变量,_maxqueue是特定数据结构中的最大容量

构造与析构:

    Blockqueue(int max = 5)
        :_maxqueue(max)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
    }

    ~Blockqueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

如上,在构造函数中作为初始化,其中max = 5作为缺省值,在不传参构造的时候默认队列中的大小为5

push生产者生产数据:

    void push(T in)
    {
        pthread_mutex_lock(&_mutex);
        if(_q.size() == _maxqueue)
        {
            pthread_cond_wait(&_p_cond,&_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

为了保证在生产的时候的原子性,要进行加锁操作

生产的时候当_q中的大小和设置的_maxqueue的大小一样的时候就证明该队列满了,就让该线程去生产者的等待队列中等待

当队列没满的时候,就将数据push进队列中即可,此时证明该队列中可能会存在数据,这个时候就可以唤醒消费者条件变量中的线程,进而使其消费

push消费者消费数据:

    T pop()
    {
        pthread_mutex_lock(&_mutex);
        if(_q.size() == 0)
        {
            pthread_cond_wait(&_c_cond,&_mutex);
        }
        T out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);
        return out;
    }

为了保证在消费的时候的原子性,也要进行加锁操作

生产的时候当_q中的大小为0的时候就证明该队列为空,无法继续消费数据,就让该线程去消费者的等待队列中等待

当队列不为空的时候,就将数据pop出队列中即可,此时证明该队列一定不为满,这个时候就可以唤醒生产者条件变量中的线程,进而使其生产

主函数实现单生产消费者:

主函数逻辑就是创建生产者,创建消费者,然后在后面等待线程即可

int main()
{
    Blockqueue<int> *bq = new Blockqueue<int>();
    
    pthread_t ctid;
    pthread_create(&ctid, nullptr, Productor, bq);
    
    sleep(1);

    pthread_t ptid;
    pthread_create(&ptid, nullptr, Consumer, bq);

    pthread_join(ctid, nullptr);
    pthread_join(ptid, nullptr);

    delete bq;
    return 0;
}

生产者线程执行的函数:

// 生产者
void *Productor(void *args)
{
    Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    int len = opers.size();
    int num = 0;
    while (true)
    {
        bq->push(num);
        std::cout << "生产了一个任务 : num = " << num++ <<std::endl;
        // sleep(1);
    }
}

消费者线程执行的函数:

// 消费者
void *Consumer(void *args)
{
    Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    while (true)
    {
        int num = bq->pop();
        std::cout << "处理任务 : num = " << num << std::endl;
        sleep(1);
    }
}

前面我们知道,在拿到数据后不仅仅是拿到数据,更重要的是处理数据,那么我们可以通过随机数,实现随机的计算

随机计算任务:

首先建立一个任务类:

#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    Divzero = 1,
    Modzero,
    Unknow
};

class Task
{
public:
    Task(int data1, int data2, char oper)
        : _data1(data1), _data2(data2), _oper(oper),_exitcode(0)
    {}

    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
            if (_data2 == 0)
                _exitcode = Divzero;
            else
                _result = _data1 / _data2;
            break;
        case '%':
            if (_data2 == 0)
                _exitcode = Modzero;
            else
                _result = _data1 % _data2;
            break;
        default:
            _exitcode = Unknow;
            break;
        }
    }

    void operator()()
    {
        run();
    }

    std::string Getresult()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=";
        ret += std::to_string(_result);
        ret += "[exitcode=";
        ret += std::to_string(_exitcode);
        ret += "]";
        return ret;
    }

    std::string GetTask()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=?";
        return ret;
    }
    
    ~Task()
    {}

private:
    int _data1;
    int _data2;
    char _oper;

    int _exitcode;
    int _result;
};

在主函数中的逻辑是一样的,不同的是生产者和消费者所执行任务不同

生产者:

// 生产者
void *Productor(void *args)
{
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);
    int len = opers.size();
    while (true)
    {
        int data1 = rand()%10+1;
        usleep(10);
        int data2 = rand()%10;
        char op = opers[rand()%len];
        Task task(data1,data2,op);
        task();
        bq->push(task);
        std::cout << "生产了一个任务 : " << task.GetTask() <<std::endl;
        sleep(1);
    }
}

如上,这就是通过随机值和随机符号来进行随机计算任务,在Task类中进行了()的重载,也就是仿函数的使用

消费者:

// 消费者
void *Consumer(void *args)
{
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);

    while (true)
    {
        Task task = bq->pop();
        std::cout << "处理任务 : " << task.Getresult() << std::endl;
    }
}

执行结果:

多生产消费者模型:

对于我们的代码,我们只需要增加多线程即可,不需要对代码进行大面积修改,为什么呢?

1、生产者、消费者都是在对同一个_q操作,用一把锁,保护一个临界资源

2、当前的 _q始终是被当作一个整体使用的,无需再增加锁区分

但是会出现如上的错误,为什么呢?

伪唤醒:

这是在多生产多消费者的情况下,如果队列已经满了,然后在消费中从队列出了一个数据,接着如果多次signal或者使用broadcast将队列中的所有线程都唤醒,此时就会有多个线程执行push,但是我们只有一个空间的位置,这样,那些push多的线程就是处于伪唤醒的状态,是会出现如上错误的

那么就需要在唤醒后重新判断,即将if修改为while

相关文章:

  • Qt6.8.2中JavaScript调用WebAssembly的js文件<2>
  • Redis JSON 用id读取content总结(sendCommand())
  • VLLM专题(二十一)—分布式推理与服务
  • Unity URPShader:实现和PS一样的色相/饱和度调整参数效果
  • MarsCode AI实战:利用DeepSeek 快速搭建你的口语学习搭子
  • HttpClient通讯时间过久
  • 计算机网络技术服务管理基于Spring Boot-SSM
  • 前端流式输出实现详解:从原理到实践
  • 模型部署实战:PyTorch生产化指南
  • git clone项目报错fatal: fetch-pack: invalid index-pack output问题
  • 红日靶场(二)——个人笔记
  • TCP心跳消息
  • Multisim学习-01 特点安装使用和第一个仿真实例
  • 计算机组成原理 第六章 总线
  • 图像分割的mask有空洞怎么修补
  • tldr命令助记
  • Qt 控件概述 QLCDNumber 和 Progressbar
  • 手动集成sqlite的方法
  • 我开发的PDF转WORD免费工具
  • 【LangChain入门 4 Prompts组件】提示词追加示例 FewShotPromptTemplate和示例选择器ExampleSelector
  • 个人简历模板word可编辑免费/优化的定义
  • 怎样把自己做的网站上传到网上/今日桂林头条新闻
  • 简述网站开发的具体流程/关键词推广方式
  • 贵阳网站建设是什么/百度点击优化
  • 天津做网站的哪家好/seo赚钱暴利
  • 深圳做生鲜食材的网站叫什么/广州seo软件