生产消费者模型 读写者模型
概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
如图就是通过一个容器来解决生产者和消费者之间的强耦合问题。
生产者和消费者之间不直接进行通讯,而是通过一个任务队列来进行通讯。比如,把任务队列比作一个超市,生产者比作是仓库的供货商,消费者就是顾客。仓库并不需要把生产好的商品卖给消费者,因为消费者的消费水平可能有限,而给超市的话一次可以供给非常多的商品;消费者也不需要到供货商那去消费,直接到超市消费就行了。即就是:
- 生产者产生数据后不需要等待消费者消费,直接扔给任务队列;
- 消费者消费数据的时候,直接去任务队列当中消费即可。
所以任务队列就相当于我们之前的“管道”一样,平衡了生产者和消费者的处理能力。
基于阻塞队列的生产消费者模型
阻塞队列
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
基本结构
通过名字就可以知道,我们先需要创建一个队列,当然队列的容量也不是无限的,当超过某个值的时候,就要进行阻塞,因此还需要一个变量来标识最大容量。这个队列可以放任何类型的数据,所以我们通过模板来进行调用。
template <class T>
class BlockQueue
{
public:private:queue<T> _q; // 共享资源int _maxcap; // 极值
};
接下来就要开始考虑线程之间的同步与互斥问题了。我们先来看几个问题
- 生产者与生产者之间可以同时访问共享资源吗?
答:当然是不可以的。生产者和生产者之间相当于是竞争关系。比如说一个超市里面有100个货架,这100个货架只能摆放方便面,而“统一”和“白象”就是2个生产者,它们不能在同一时间摆放自己的产品,只能哪一家没有生产的时候,另一家才能摆放自己的产品。 - 消费者和消费者之间可以同时访问共享资源吗?
答:也是不可以的。这也是属于竞争关系。比如说世界末日的时候,只剩下一袋方便面了,那么消费者和消费者之间不就要竞争这袋方便面了吗? - 消费者和生产者之间可以同时访问共享资源吗?
答:也是不可以的。生产者和消费者之间首先要保证数据安全,只允许一个人去访问这个资源。比如说你今天去超市买方便面,但是方便面卖完了并且供货商也放假了,第二天你又去超市买方便面还是没有,你连续问了一个月,超市一直都说没有,所以这不仅浪费了自己的时间,也浪费了超市的时间,因为可能不止你一个消费者来问。同理,供货商来问超市需不需要方便面,超市说几乎没有客人,让供货商等等;每天供货商都来问超市,而超市给的答复都是一样的,同样都浪费了2者的时间。这2种方式都没有错,都保证了数据的安全性。但是并不合理!假如你去超市买方便面,但超市暂时没有,超市把你电话留下来等有了再告诉你,然后你再来,对供货商也是一样的做法,这样不就能让供货商和消费者协同起来了嘛,生产一部分消费一部分。
综上所述,得到的结论是:
- 生产者和生产者之间是互斥关系;
- 消费者和消费者之间是互斥关系;
- 生产者和消费者之间是同步和互斥关系。
生产消费者模型只需要记住“321原则”就行。3种关系,2个角色(生产者和消费者),1个共享资源(特定的共享缓冲区)。
那么需要几把锁来完成这个任务呢?其实只需要一把就行,因为都是互斥关系,所有线程访问阻塞队列时都需要互斥。
还有什么时候生产者可以把数据放入阻塞队列,消费者什么时候可以把数据从阻塞队列中拿出来呢?答案就是当队列中还有剩余空间时,生产者可以生产数据;当队列中还有数据时,消费者可以消费数据。
还有一点是生产者和消费者不可以共用一个条件变量,如果共用条件变量的话,当唤醒等待队列当中的线程的时候,不知道唤醒的是消费者还是生产者。这样的话,我们就需要定义2个条件变量了。
template <class T>
class BlockQueue
{
public:private:queue<T> _q; // 共享资源int _maxcap; // 极值pthread_cond_t _c_cond; // 消费者条件变量pthread_cond_t _p_cond; // 消费者条件变量pthread_mutex_t _mutex; // 互斥锁
};
接下来我们就需要写构造和析构函数了。
构造函数
static const int defaultnum = 5;
BlockQueue(int maxcap = defaultnum): _maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}
我们需要先指定队列的上限,设置为5,。然后初始化锁和条件变量。
析构函数
~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}
准备工作完了之后,接下来就正式开始完成生产消费者模型了。
生成数据
我们可以使用队列的push
接口来往阻塞队列中放数据。我们还要考虑线程的安全问题:
- 访问阻塞队列前先加锁,因为判断的时候必须访问临界资源。
- 阻塞队列满了的话,生产者就要等待;没有满的话,就可以往阻塞队列中放数据了。
- 放入数据完成之后,唤醒消费者来消费。
代码:
void push(const T &in)
{pthread_mutex_lock(&_mutex);// 为什么判断要放在加锁之后呢?因为判断的时候必须访问临界资源while (_maxcap == _q.size()) // 确保生成条件满足{pthread_cond_wait(&_p_cond, &_mutex); // 调用的时候,自动释放锁}_q.push(in);pthread_cond_signal(&_c_cond); // 只要生产了,通知消费者来消费pthread_mutex_unlock(&_mutex);
}
消费数据
消费者需要到队列中获取数据,我们可以使用队列的pop
接口。步骤和刚才的类似:
- 访问阻塞队列前先加锁,因为判断的时候必须访问临界资源。
- 如果阻塞队列为空的话,消费者就要进行等待;不为空的话,就可以进行消费数据了。
- 消费完成之后,就要唤醒生产者来生成数据。消费一个生产一个。
代码:
T pop()
{pthread_mutex_lock(&_mutex);while (_q.size() == 0){pthread_cond_wait(&_c_cond, &_mutex);}T out = _q.front();_q.pop();pthread_cond_signal(&_p_cond); // 只要消费了,通知生产者来生成pthread_mutex_unlock(&_mutex);return out;
}
把模型写完之后,我们就要来测试代码了,需要写一个主函数来进行测试。如下:
main.cc
#include "BlockQueue.hpp"
#include <unistd.h>void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);// 消费while (true){sleep(2);int data = bq->pop();cout << "消费了一个数据:" << data << endl;}return nullptr;
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 0;// 生产while (true){data++;bq->push(data);cout << "生产了一个数据:" << data << endl;// sleep(2);}return nullptr;
}int main()
{BlockQueue<int> *bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, Consumer, bq);pthread_create(&p, nullptr, Productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}
BlockQueue.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class BlockQueue
{static const int defaultnum = 5;
public:BlockQueue(int maxcap = defaultnum): _maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);_low_water = _maxcap / 3; // 剩余空间只要低于这个就通知生产者生产_high_water = (_maxcap * 2) / 3; // 剩余数据只要高于这个就通知消费者消费}// 消费数据T pop(){pthread_mutex_lock(&_mutex);while (_q.size() == 0){pthread_cond_wait(&_c_cond, &_mutex);}T out = _q.front();_q.pop();pthread_cond_signal(&_p_cond); // 只要消费了,通知生产者来生成pthread_mutex_unlock(&_mutex);return out;}// 生成数据void push(const T &in){pthread_mutex_lock(&_mutex);// 为什么判断要放在加锁之后呢?因为判断的时候必须访问临界资源while (_maxcap == _q.size()) // 确保生成条件满足{pthread_cond_wait(&_p_cond, &_mutex); // 调用的时候,自动释放锁}_q.push(in);pthread_cond_signal(&_c_cond); // 只要生产了,通知消费者来消费pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}
private:queue<T> _q; // 共享资源int _maxcap; // 极值pthread_cond_t _c_cond; // 消费者条件变量pthread_cond_t _p_cond; // 消费者条件变量pthread_mutex_t _mutex; // 互斥锁
};
我们先让消费者休眠2秒,在这2秒之间,生产者没有休眠而是一直在产生数据。来看一下结果。
可以看到,消费者开始消费的时候,消费的是之前的数据,而生产者一直在产生新数据。
接着,我们让生产者先休眠2秒,消费者不休眠,看看结果。
可以看到是产生一个数据,消费一个数据。
这回让消费者和生产者都不休眠。
环形队列的生产消费者模型
POSIX信号量
信号量的本质
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量本质上是一个非负整数计数器,用于控制对共享资源的访问。它可以用来实现互斥(确保同一时间只有一个进程或线程访问共享资源)和同步(协调多个进程或线程的执行顺序)。这个计数器用于表示某种资源的可用数量或者是某种事件已经发生的次数等。例如,在一个简单的多线程环境中,信号量的值可以代表可用的共享缓冲区的数量。如果信号量初始值为 5,就意味着有 5 个缓冲区可供使用。
它是一种同步原语,用于协调多个进程或线程对共享资源的访问。进程或线程在访问共享资源之前,需要先检查信号量的值来确定是否能够获取资源。
二元信号量和计数信号量
- 二元信号量(互斥信号量):这是信号量的一种特殊情况,其值只能是 0 或者 1。它主要用于实现互斥访问,确保在同一时刻只有一个进程或线程能够访问共享资源。比如,在一个文件写入的场景中,为了避免多个进程同时写入导致文件内容混乱,使用二元信号量来控制对文件的访问权限。当一个进程想要写入文件时,它首先会尝试获取信号量(如果信号量的值为 1,获取成功后信号量的值变为 0),写完后再释放信号量(将信号量的值变回 1)。
- 计数信号量:其值可以是大于等于 0 的整数。计数信号量用于管理多个相同类型的资源。例如,在一个网络服务器中,有多个连接套接字资源,信号量的初始值可以设置为套接字的总数。当有一个新的客户端连接请求到来时,服务器进程会尝试获取一个信号量(信号量的值减 1),如果信号量的值大于等于 0,表示还有可用的套接字资源,就可以为客户端分配一个套接字;当客户端断开连接后,服务器进程会释放信号量(信号量的值加 1)。
等待(P 操作)和发布(V 操作)
- 等待(P 操作):在经典的信号量操作中,等待操作也称为 P 操作。当一个进程或线程执行 P 操作时,它会检查信号量的值。如果信号量的值大于 0,那么信号量的值减 1,进程或线程可以继续执行后续操作,这表示它成功获取了一个资源。如果信号量的值等于 0,那么执行 P 操作的进程或线程会被阻塞,进入等待状态,直到信号量的值大于 0,它才会被唤醒并执行信号量减 1 的操作。例如,在一个打印机资源共享的场景中,当一个用户(进程或线程代表)想要打印文档时,它执行 P 操作,如果打印机空闲(信号量值大于 0),信号量值减 1,用户可以使用打印机;如果打印机正在被使用(信号量值为 0),用户就会等待。
- 发布(V 操作):发布操作也称为 V 操作。当一个进程或线程执行 V 操作时,它会将信号量的值加 1。如果有其他进程或线程因为执行 P 操作而被阻塞等待这个信号量,那么其中一个等待的进程或线程会被唤醒,尝试再次执行 P 操作获取资源。例如,在打印机的场景中,当一个用户打印完文档后,执行 V 操作,信号量的值加 1,如果有其他用户在等待使用打印机,就会有一个用户被唤醒并可以使用打印机。
接口
信号量的类型是sem_t,所有对信号量的操作,都是基于一个sem_t类型的变量。头文件是<semaphore.h>
sem_init
函数原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
功能:初始化一个信号量。
参数:对于无名信号量,使用sem_init
函数进行初始化。它接受三个参数,信号量指针、共享标志(用于指定信号量是在进程间还是线程间共享,0 表示线程间共享,非 0 表示进程间共享)和初始值。
返回值:成功返回0,失败返回-1。
sem_destroy
函数原型:int sem_destroy(sem_t *sem);
功能:用于销毁一个信号量。
返回值:成功返回0,失败返回-1。
sem_wait
函数原型:int sem_wait(sem_t *sem);
功能:等待信号量,会将信号量的值减1。
sem_post
函数原型:int sem_post(sem_t *sem);
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
环形队列
环形队列采用数组模拟,用模运算来模拟环状特性,我们使用信号量来完成生产消费者模型。还是先用单消费单生产来完成。
最开始它们指向同一个位置,生产者在放数据,消费者在拿数据。对于生产者和消费者来说,它们在什么情况下会处于同一个位置呢?答案就是当队列为空或者队列为满的时候。其它情况下,它们就处于不同的位置。
下面,我们来玩一个游戏帮助理解环形队列的运行逻辑。
如图所示,在一张桌子上放满了9个盘子,刚开始我们站在同一个盘子前。游戏规则是,我不断的向盘子中放一个水果,放完之后就往下一个盘子继续放,你必须从盘子中拿起水果,但是你觉得这个水果不好吃,然后你继续追我。我只能往盘子中放一个水果,你不能跳过盘子拿自己喜欢的水果。我们在玩游戏的时候,应该怎样保证游戏的正常运行呢?需要遵循以下几个原则。
- 你不能超过我;
- 我不能套你一圈;
- 指向同一个位置时,只能一个人访问
那我们什么时候会站在一起呢?只能是当所有的盘子为空或者所有的盘子都有水果的时候才会在一起。
当盘子全为空的时候谁先运行呢?当然是生产者,也就是我先运行,不然你怎么拿水果。
当盘子都是水果的时候谁先运行呢?当然是消费者,也就是你先运行,不然我往哪放水果。
综上所述,我们得出一个结论:在环形队列当中,大部分情况下,单生产者和单消费者是可以并发执行的,只有队列为满或者为空时,才会出现互斥和同步的问题。
其实对于生产者来说,看重的是队列中还有没有空间;对于消费者来说,看重的是队列中有没有数据。所以为了更好的衡量这两者的关系,我们给空间资源定义一个信号量,给数据资源定义一个信号量。
那么我们该如何使用信号量来完成环形队列的生产消费者模型呢?我们先来看一下信号量的定义。
struct semaphore
{int value;struct PCB* queue;
}
P原语所执行的操作可用如下函数wait(s)
来表示。
void wait(semaphore s)
{s.value = s.value-1;if(s.value < 0)block(s.queue); // 将进程阻塞,并将其投入等待队列s.queue
}
V原语所执行的操作可用下面的函数signal(s)
来表示。
void signal(semaphore s)
{s.value = s.value + 1;if(s.value < 0)wakeup(s.queue); // 唤醒阻塞进程,将其从等待队列s.queue取出。投入就绪队列
}
信号量的物理意义:
- 在信号量机制中,信号量的初值
s.value
表示系统中某种资源的数目,因而又称为资源信号量。 - P操作意味着进程请求一个资源,因此描述为
s.value=s.value-1
;当s.value<0
时,表示资源已经分配完毕,因而进程所申请的资源不能够满足,进程无法继续执行,所以进醒执行block(s.queue)
自我阻塞,放弃处理机,并插人等待该信号量的等待队列。 - V操作意味着进程释放一个资源,因此描述为
s.value=s.value+1
;当s.value<0
时,表示在该信号量的等待队列中有等待该资源的进程被阻塞,故应调用wakeup(s.queue)
原语将等待队列中的一个进程唤醒。 - 当
s.value<0
时,|s.value|
表示等待队列的进程数。
基本结构
我们用vector
来模拟环形队列,因为我们需要对队列进行随机访问。我们还有要以下几个基本变量:
_cap
:队列的最大容量。_c_step
:当前消费者访问哪一个数据块,也就是消费者的下标。_p_step
:当前生产者访问哪一个数据块,也就是生产者的下标。
还有就是生产者和消费者所关注的资源_cdata_sem
:消费者关注的数据资源。_pspace_sem
:生产者关注的空间资源。
还有两把锁:_c_mutex
:消费者和消费者之间的互斥锁。_p_mutex
:生产者和生产者之间的互斥锁。
代码:
template<class T>
class RingQueue
{
private:vector<T> _ringqueue;int _cap;int _c_step; // 消费者下标int _p_step; // 生产者下标sem_t _cdata_sem; // 消费者关注的数据资源sem_t _pspace_sem; // 生产者关注的空间资源pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
构造和析构
构造函数中,要先指定队列的大小,然后初始化信号量和互斥锁。析构函数就是销毁信号量和锁。
RingQueue(int cap = defalutcap):_ringqueue(cap),_cap(cap),_c_step(0),_p_step(0)
{sem_init(&_cdata_sem,0,0); // 第2个参数为0,表示是线程共享sem_init(&_pspace_sem,0,cap);pthread_mutex_init(&_c_mutex,nullptr);pthread_mutex_init(&_p_mutex,nullptr);
}~RingQueue()
{sem_destroy(&_cdata_sem);sem_destroy(&_pspace_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);
}
生成数据
在此之前,我们先把P操作、V操作、加锁和解锁写成接口的形式。
void P(sem_t &sem) //P是等待
{sem_wait(&sem)
}void V(sem_t &sem) // V是发布
{sem_post(&sem);
}void Lock(pthread_mutex_t &mutex)
{pthread_mutex_lock(&mutex);
}void UnLock(pthread_mutex_t &mutex)
{pthread_mutex_unlock(&mutex);
}
我们生成数据的过程是先竞争信号量,再申请锁,释放锁,最后在放数据。
void Push(const T& in) // 生产
{P(_pspace_sem);Lock(_p_mutex);_ringqueue[_p_step] = in;// 位置后移,保持环状特性_p_step++;_p_step %= _cap;UnLock(_p_mutex);V(_cdata_sem);
}
先用P操作让生产者申请一个信号量_pspace_sem
,如果信号量的值大于 0,那么信号量的值减 1,线程可以继续执行后续操作,这表示它成功获取了一个资源。如果信号量的值等于 0,那么执行 P 操作的线程会被阻塞,进入等待状态,直到信号量的值大于0,它才会被唤醒。随后,就是加锁,与其它的生产者保持互斥的状态,再把数据投放给环形队列,_p_step %= _cap;
是为让数组具有环的特性。最后再执行V操作锁,让信号量的值加1,相当于队列中多了一个数据。
消费数据
void Pop(T* out) // 消费
{P(_cdata_sem);Lock(_c_mutex);*out = _ringqueue[_c_step];_c_step++;_c_step %= _cap;UnLock(_c_mutex);V(_pspace_sem);
}
跟上面的一样,首先消费者申请一个信号量_cdata_sem
,相当于让队列中的数据减1,然后进行加锁,与其它消费者保持互斥,进入临界区后,把生产者生产的数据交给out
变量,接着++和取模操作
是为了让消费者走到下一个位置上和保持环状特性,最后再执行V操作,相当于_pspace_sem++
,表示队列当中多了一个空位置。
完整代码:
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
using namespace std;const static int defalutcap = 5;template <class T>
class RingQueue
{
private:void P(sem_t &sem) // P是等待{sem_wait(&sem);}void V(sem_t &sem) // V是发布{sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void UnLock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap = defalutcap): _ringqueue(cap), _cap(cap), _c_step(0), _p_step(0){sem_init(&_cdata_sem, 0, 0); // 第2个参数为0,表示是线程共享sem_init(&_pspace_sem, 0, cap);pthread_mutex_init(&_c_mutex, nullptr);pthread_mutex_init(&_p_mutex, nullptr);}void Push(const T &in) // 生产{P(_pspace_sem);Lock(_p_mutex);_ringqueue[_p_step] = in;// 位置后移,保持环状特性_p_step++;_p_step %= _cap;UnLock(_p_mutex);V(_cdata_sem);}void Pop(T *out) // 消费{P(_cdata_sem);Lock(_c_mutex);*out = _ringqueue[_c_step];_c_step++;_c_step %= _cap;UnLock(_c_mutex);V(_pspace_sem);}~RingQueue(){sem_destroy(&_cdata_sem);sem_destroy(&_pspace_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}
private:vector<T> _ringqueue;int _cap;int _c_step; // 消费者下标int _p_step; // 生产者下标sem_t _cdata_sem; // 消费者关注的数据资源sem_t _pspace_sem; // 生产者关注的空间资源pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
其它常见的锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁,公平锁,非公平锁。
读者写者模型
读写锁
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
- 注意:写独占,读共享,读锁优先级高
“读者” 线程主要是读取共享资源中的数据,“写者” 线程则是修改共享资源中的数据。
在读者——写者问题中,任何时刻要求写者最多只允许有一个,读者可以有很多,因为写者要改变数据对象的内容,如果它们同时操作,则数据对象的内容则会变得不可知。
我们先讲一个故事来理解。
我们都要通过写博客来帮助自己理解并加深所学的内容,博客的作者就是写者,看你博客的同学叫做读者。比如说当你写博客的时候,你的同学在旁边看你正在写的内容,当你写了一部分的时候,同学1说你正在写C语言部分的知识点,同学2说你写的是C++部分的知识点,当你写完之后,才看出来你写的是C和C++不同处的知识点,在这里其他人读到的数据都是内容的局部东西,都猜的不对,拿的数据都不正确。所以你为了保证其它人能够正确的读到完整的内容,你给其他人出了一个规定:当你写博客的时候,其他人不能在旁边看,要么你就不写,要么就是写完博客才允许其它人看。这里写好的博客并不会规定读者排好队一个一个的进行观看,其他人读完才能轮到下一个人观看,这是不对的;还有就是写博客的时候,并不会要求只能一个人写,其它写者也可以参与进来,但是当一个人在写的时候不会让另一个人也参与进来编写,还是要让写者一个一个的写。
故事讲完了,分析读者写者问题思路还是遵循生产者消费者模型的321原则。写博客的作者就是写者,看博客的人就是读者。
至此,我们可以来分析一下同步与互斥的关系:
- 写者和写者:是互斥关系。一个人写的时候,其他人不能写。
- 读者和读者:即不互斥,也不同步。读者可以同时观看你写的内容,彼此之间没有影响。
- 写者和读者:互斥和同步。写者正在写的时候,读者不能读,不然读到的数据就是不完整的,这就是互斥;你写完博客之后,要是没人看,那么你写的博客就没有意义,同样,要是发现你的博客数据有点不对,需要让写者更新数据,这就是同步关系。
读者写者模型和生产消费者模型之间的本质区别是什么?
写者和生产者一样,主要就是读者和消费者的区别,消费者可以消费你的数据,但是读者并不会修改你的数据,因为读者之间可以同时访问。
读者写者有2种不同实现的策略。
- 读者优先策略:只要有一个读者在访问共享资源,写者就不能访问,直到所有读者都完成访问。这种策略优先考虑读者的并发访问。
- 写者优先策略:写者一旦请求访问共享资源,就会阻止后续的读者和写者访问,直到写者完成操作。这种策略优先保证写者对资源的独占访问。
应用场景:
- 数据库系统:在数据库管理系统中,当多个用户(读者)查询数据库中的数据时,这些查询操作可以同时进行。但是,当一个用户(写者)执行插入、更新或删除操作时,需要对相应的数据表进行排他性的访问,以防止数据不一致。例如,在一个在线购物网站的数据库中,多个顾客可以同时查看商品信息(读者操作),但当库存管理员更新商品库存(写者操作)时,需要保证数据的准确性,防止其他操作干扰。
- 文件系统访问:多个进程可能需要读取一个文件的内容(读者),而在文件被修改(写者)时,需要保证修改操作的独占性。例如,在一个多人协作的文档编辑场景中,多个用户可以同时查看文档(读者),但当一个用户进行编辑(写者)时,需要阻止其他用户同时编辑和查看正在编辑中的部分,以避免出现混乱。
接口介绍
读写锁的数据类型是pthread_rwlock_t
,使用方式与互斥锁类似,包含在<pthread.h>
头文件中,它与互斥锁一样,可以全部和局部进行定义。
即pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
初始化
函数原型:int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);
销毁
函数原型:int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
读者加锁:int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
写者加锁:int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
解锁:int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
解锁可以同时释放读者和写者的锁。
接下来我们通过伪代码来理解rwlock
的实现原理。
该伪代码的意思是先给读者加锁,如果读者进来了,那么执行if语句,写者就无法进入,只能阻塞等待;读取数据的时候,写者不能写入,当没有读者的时候,那就释放写者。
在读者写者模型中,大多数的情况都是读者多,写者少,这样会很容易造成写者饥饿的问题。 比如说,有10和读者和1个写者,这10个读者都会不断的访问共享资源,使得写者长时间无法获取对共享资源的访问权,从而导致写者被 “饿死”,即写者的请求一直被推迟或无法执行。所以这个模型就是天然具备写者“饥饿”问题,默认就是读者优先。要想解决“饥饿”问题,可以用以下几种方法:
- 公平策略:可以使用一个先进先出(FIFO)的队列来管理访问请求,无论是读者还是写者的请求,都按照它们到达的顺序排队。这样可以避免写者因为读者的频繁访问而一直无法获取资源。
- 优先级调整:给写者赋予一定的优先级。当写者等待时间过长时,可以适当提高其优先级,使得写者能够在一定程度上优先于读者获取资源。不过,这种方法需要谨慎使用,因为过度提高写者优先级可能会导致读者饥饿。
- 时间片机制:为读者和写者分配时间片。例如,规定读者在一个时间片内可以连续访问共享资源,时间片结束后,如果有写者等待,就把资源访问权交给写者。通过这种方式来平衡读者和写者的访问,避免写者长期无法访问的情况。