读者写者问题
读者写者问题
本文介绍操作系统经典的 读者-写者问题(Reader-Writers Problem)
并且用 C++20 的标准库提供的工具(如互斥锁、信号量等)来实现解决方案。
下面的注意事项是计算机操作系统的规定(也就是无论linux windows 还是什么都是这样)
注意,二值信号量和锁是有区别的!
Mutex “锁”,用于保护共享资源,强调“谁持有,谁释放”。
而二值信号量,无所有者,任何线程均可释放信号量。
什么是“读者-写者问题”?
问题背景:
假设有一个共享资源(比如一块内存、一个文件),多个线程可以访问它:
- 读者线程(Readers):只读取数据,不修改;
- 写者线程(Writers):会修改数据;
冲突点:
- 多个读者同时读取是可以的(不会破坏数据);
- 但一个写者在写时,不能有其他任何线程(包括读者和写者)访问该资源;
- 否则会导致数据竞争或一致性错误。
额外提一下,这个共享资源应当“比较大”,访问有一定的时间开销,如果是一个变量这种,加个锁,多个线程等待访问就可以了,反正访问几乎瞬间完成。
目标
读取不会破坏原本数据,因此,多个读者可以同时读取,但是只要有人开始写入,其它人既不能读取,也不能写入(否则数据就乱套了)
因此我们要设计一种机制,使得:
- 多个读者可以同时读;
- 写者独占访问资源;
解决方法
读者优先
首先说明解决读者写者问题最简单的方法,也就是“读者优先”
读者优先策略含义:只要还有读者在读,新的读者可以继续进来读,而写者必须等待。
算法说明
核心变量说明:
int readers_count = 0; // 当前正在读的读者数量
std::mutex readers_count_mutex; // 保护 readers_count 的互斥锁
std::binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量
流程图说明
👤 读者逻辑:
[开始]↓
获取 mtx_count 锁↓
read_count 增加 1↓
如果是第一个读者 → 获取 wrt 锁(阻塞写者)↓
释放 mtx_count 锁↓
【执行读操作】↓
获取 mtx_count 锁↓
read_count 减少 1↓
如果是最后一个读者 → 释放 wrt 锁(允许写者进入)↓
释放 mtx_count 锁
[结束]
✍️ 写者逻辑:
[开始]↓
获取 wrt 锁(如果已有读者在读,则等待)↓
【执行写操作】↓
释放 wrt 锁
[结束]
参考代码
reader_first.cpp
/* 读者优先 */#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>int readers_count = 0;//读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)uint8_t resource[1024*1024] = { 0 };//共享的资源,这里以一块内存为例
binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立 线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << " 读者" << readerNumer << " 读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟对读取数据以及处理花费时间/*** @note 当它比较大时就可以模拟“读者源源不断”的到达* @note 读者可以同时读,所以读取总时间小于各个读者读取时间之和* @note 对于读者优先当这个时间超过所有写者需要的时间之和,写者就会饥饿*///this_thread::sleep_for(chrono::milliseconds(55));//延时,模拟对读取数据以及处理花费时间/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数 readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立 线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */ resource_semaphore.acquire();//获取资源信号量 /* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << "写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟大量数据写入花费的时间/* 写者写入完毕应当执行的内容 */ resource_semaphore.release();//释放资源信号量this_thread::sleep_for(chrono::milliseconds(1));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[2];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 2; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析
读者优先比较简单不再赘述
显然,当有源源不断的读者到达时, resource_semaphore 不会被读者释放,写者进程饥饿。
读写公平
读写公平的核心思想是:遵循到达顺序,无论是读者还是写者,先到者先获得资源,彻底避免饥饿问题。
算法说明
1. 全局变量与同步工具
变量/工具 | 作用 |
---|---|
int readers_count = 0 | 当前正在读取的读者数量 |
std::mutex readers_count_mutex | 保护 readers_count 的互斥锁 |
std::binary_semaphore resource_semaphore(1) | 保护共享资源的二值信号量(读者和写者竞争) |
std::mutex service_mutex | 服务信锁,用来实现“先来先服务”,强制所有线程按顺序请求访问(公平性的关键) |
服务锁 (service_mutex
)
- 所有线程(读者和写者)必须首先获取
service
,确保严格按到达顺序排队。 - 这是公平性的核心机制,防止读者或写者插队。
service_mutex
的作用:强制所有线程按顺序排队,类似于现实中的“取号机”。- 无优先级差异:读者和写者完全平等竞争,先到者先服务。
进一步的讨论请看后面的 分析
2. 流程图说明
👤读者线程流程
读者逻辑
- 第一个读者获取
resource_sem
,最后一个读者释放。 - 多个读者可并发读取,但必须通过
service_queue
排队。
[开始]↓
lock(service) // 进入服务↓
lock(read_mutex) // 保护读者计数↓
if (read_count == 0) // 第一个读者?↓resource_sem.acquire() // 获取资源锁↓
read_count++↓
unlock(read_mutex)↓
unlock(service) // 退出服务(允许后续线程被服务)↓
[读取共享资源] // 临界区(可并发读)↓
lock(read_mutex)↓
read_count--↓
if (read_count == 0) // 最后一个读者?↓resource_sem.release() // 释放资源锁↓
unlock(read_mutex)↓
[结束]
✍写者线程流程
写者逻辑
- 写者直接请求
resource_sem
,但必须通过service_queue
排队。 - 写者执行时会独占资源,阻塞所有后续读者和写者。
[开始]↓
lock(service) // 进入服务↓
resource_sem.acquire() // 直接请求资源锁(独占)↓
unlock(service) // 退出服务↓
[写入共享资源] // 临界区(独占)↓
resource_sem.release()↓
[结束]
参考代码
/* 读写公平
* 严格按到达顺序服务,无论是读者还是写者,先到者先获得资源
*/#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>uint8_t resource[1024 * 1024] = { 0 };//共享的资源,这里以一块内存为例
binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量int readers_count = 0;//读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)mutex service_mutex;//服务锁,用来实现“先来先服务”/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立 线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */service_mutex.lock();//等待服务readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁service_mutex.unlock();/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << "读者 " << readerNumer << "读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟对读取到的数据进行某些处理/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数 readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立 线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */service_mutex.lock();//等待服务resource_semaphore.acquire();//获取资源信号量service_mutex.unlock();/* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << "写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟大量数据写入花费的时间/* 写者写入完毕应当执行的内容 */resource_semaphore.release();//释放资源信号量 this_thread::sleep_for(chrono::milliseconds(1));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[2];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 2; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析
这样想比较清晰,读写公平的算法分成了两部分
1、共享资源的获取,由 resource_semaphore 控制
2、调度服务的获取,由 service_mutex 控制
注意:
标准库中 信号量的 acquare 是“一直等,等到后获取”
标准库中 锁的 lock 是“一直等,等到后上锁”
分析以下并发执行情况
读者1 正在读取时,写者1到达,随后读者2到达
这是体现 service_mutex 作用的重要情况!
当 读者1 正在读取时,service_mutex 已经被 读者1 解锁, resource_semaphore 仍然被 读者1 获取。也就是说, 读者1 用完了 调度服务 正在占用 共享资源 。
当 写者1 到达后, 写者1 service_mutex.lock() 加锁,占用 调度服务 ,然后 resource_semaphore.acquare() 等待共享资源
当 读者2 到达后, 读者2 service_mutex.lock() 在这里挂起,等待 调度服务
所以,这种情况,读者1读取完后,写者1写入,然后读者2读取,“遵循到达顺序”体现了“读写公平”
假如没有 service_mutex(其实就是前面的读者优先算法)
读者1正在读取时,写者1到达后等待,然后,读者2到达不需要等待就可以开始读取!
读者2”插入“到了写者1的前面!
写者优先
算法说明
1. 全局变量与同步工具
变量/工具 | 作用 |
---|---|
int read_count = 0 | 当前正在读取的读者数量 |
int write_count = 0 | 当前等待和正在写入的写者总数 |
std::mutex read_mutex | 保护 read_count 的互斥锁 |
std::mutex write_mutex | 保护 write_count 的互斥锁 |
std::binary_semaphore resource_sem(1) | 保护共享资源的二值信号量 |
std::binary_semaphore read_allow_semaphore(1) | 是否允许读取的信号量(写者优先的关键) |
2. 流程图
👤读者线程流程
读者线程需要做:
- 通过
read_allow_semaphore
检查是否允许读取 - 如果是第一个读者,获取
resource_sem
- 执行读取操作
- 如果是最后一个读者,释放
resource_sem
开始
↓
read_allow_semaphore.acquire() 检查是否有写者等待
↓
read_mutex.lock() 保护读者计数
↓
if (read_count == 0) 如果是第一个读者resource_sem.acquire() 获取资源锁
↓
read_count++ 增加读者计数
↓
read_mutex.unlock() 释放读者计数锁
↓
read_allow_semaphore.release() 允许其他读者尝试
↓
[读取共享资源] 临界区操作
↓
read_mutex.lock() 保护读者计数
↓
read_count-- 减少读者计数
↓
if (read_count == 0) 如果是最后一个读者resource_sem.release() 释放资源锁
↓
read_mutex.unlock() 释放读者计数锁
↓
结束
✍写者线程流程
写者线程需要做:
- 如果是第一个写者,获取
read_allow_semaphore
(阻止新读者) - 获取
resource_sem
进行独占写入 - 如果是最后一个写者,释放
read_allow_semaphore
(允许新读者)
开始
↓
write_mutex.lock() 保护写者计数
↓
if (write_count == 0) 如果是第一个写者read_allow_semaphore.acquire() 阻塞新读者
↓
write_count++ 增加写者计数
↓
write_mutex.unlock() 释放写者计数锁
↓
resource_sem.acquire() 获取资源锁(独占)
↓
[写入共享资源] 临界区操作
↓
resource_sem.release() 释放资源锁
↓
write_mutex.lock() 保护写者计数
↓
write_count-- 减少写者计数
↓
if (write_count == 0) 如果是最后一个写者read_allow_semaphore.release() 允许新读者
↓
write_mutex.unlock() 释放写者计数锁
↓
结束
参考代码
/*
* 写者优先
* 当有写者等待时,新到达的读者必须等待,直到所有写者完成。
*/
#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>uint8_t resource[1024 * 1024] = { 0 };//共享的资源,这里以一块内存为例/*** @brief 保护共享资源的二值信号量* @note 多个读者可以同时读,读者a获得后可能由读者b释放,因此必须使用信号量*/
binary_semaphore resource_semaphore(1);int readers_count = 0;//正在读取的读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)/*** @brief 正在写入以及等待写入的写者数量* @note 注意,是正在写和等着写的写者总数*/
int writers_count = 0;
mutex writing_mutex;//保护正在写的互斥锁(使用原子操作 atomic 也可以)binary_semaphore read_allow_semaphore(1);//是否允许读取的信号量/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立 线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */read_allow_semaphore.acquire();//等待直到所有写者完毕readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁read_allow_semaphore.release();/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << "读者 " << readerNumer << " 读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(5));//延时,模拟对读取到的数据进行某些处理/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数 readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立 线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */writing_mutex.lock();if (writers_count == 0)//第一个写入或等待的写者{read_allow_semaphore.acquire();//阻塞读者 }writers_count++;//正在写入或者等待的读者数量自增 writing_mutex.unlock();resource_semaphore.acquire();//获取资源信号量/* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << " 写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(20));//延时,模拟大量数据写入的时间消耗/* 写者写入完毕应当执行的内容 */resource_semaphore.release();//释放资源信号量 writing_mutex.lock();writers_count--;if (writers_count == 0)//如果不再有正在写入或等待的写者{read_allow_semaphore.release();//允许读者 } writing_mutex.unlock();this_thread::sleep_for(chrono::milliseconds(85));//延时/*** @note 当这个延时小于所有写者写入时间总和,就可以模拟“写者源源不断到来”* @note 对于写者优先,写者源源不断到来,则读者饥饿*///this_thread::sleep_for(chrono::milliseconds(75));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[4];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 4; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析
以下是一种典型情况
写者1正在写入,然后读者1到达,再然后写者2道到达
写者1 正在写入, read_allow_semaphore 已经被写者获取,也就是不允许读取, resource_semaphore 被 写者 获取,也就是资源被 写者1 获取。
读者1 到达后, read_allow_semaphore.acquire(); 死等,直到允许读取。
写者2 到达后,writers_count自增变为2, resource_semaphore.acquire(); 等待资源,直到资源被释放。
假设某个时刻, 写者1 写入完毕
writers_count减少1,变为1,resource_semaphore 不会被写者释放,读者1 继续等待。
resource_semaphore 被 写者1 释放,写者2 获取后就会开始写入
虽然读者1先于写者2到达,但是写者2先开始操作。
读者-写者问题的三种策略对比
1. 读者优先(Readers-Preferred)
现象:
- 只要有一个读者正在读,后续读者可以直接进入,无需等待。
- 写者必须等待所有读者完成后才能写入。
- 极端情况:如果读者持续到达,写者可能永远无法执行(写者饥饿)。
比喻:
图书馆里,只要有人在看书(读者),新来的人可以直接进去看书;想修改图书的人(写者)必须等所有人离开才能动笔,但不断有新读者进来,导致写者一直等不到机会。
2. 读写公平(Fair / No Starvation)
现象:
- 读者和写者按到达顺序排队
- 新到达的读者不会插队到等待中的写者前面。
- 无饥饿:写者和读者均能公平执行。
比喻:
图书馆门口有一个取号机,读者和写者按号码排队。即使当前有人在看书,新来的写者也会排在后续读者前面。
3. 写者优先(Writers-Preferred)
现象:
- 当至少有一个写者在等待时,新到达的读者会被阻塞,直到所有写者完成。
- 写者能更快获得资源,但读者可能被延迟。
- 极端情况:如果写者持续到达,读者可能长时间等待(读者饥饿)。
比喻:
图书馆规定,只要有人申请修改图书(写者),新来的读者必须等待,直到所有修改完成。但若写者不断到来,读者可能一直无法进入。
三者的核心区别总结
策略 | 优先级 | 饥饿风险 | 典型应用场景 |
---|---|---|---|
读者优先 | 读者 > 写者 | 写者可能饥饿 | 读多写少,容忍写延迟 |
读写公平 | 先到先服务 | 无饥饿 | 需要公平性(如数据库调度) |
写者优先 | 写者 > 读者 | 读者可能饥饿 | 写操作需及时响应(如日志系统) |
四、参考资料
王道计算机考研 操作系统
2.3.5_2 读者-写者问题_哔哩哔哩_bilibili
C++11 多线程编程-小白零基础到手撕线程池
C++11 多线程编程-小白零基础到手撕线程池_哔哩哔哩_bilibili