Linux线程同步实战:多线程程序的同步与调度
个人主页:chian-ocean
文章专栏-Linux
Linux线程同步实战:多线程程序的同步与调度
- 个人主页:chian-ocean
- 文章专栏-Linux
- 前言:
- 为什么要实现线程同步
- 线程饥饿(Thread Starvation)
- 示例:抢票问题
- 条件变量
- 条件变量的工作原理
- 常用操作:
- `pthread_cond_wait`
- `pthread_cond_signal`
- `pthread_cond_broadcast`
- 基于条件变量的生产消费者模型(阻塞队列)
- 生产-消费者模型(也叫做游有界缓冲区)
- 模型原理
- 工作原理
- 同步机制
- 阻塞队列
- 关键点分析
- 错误与改进:
- 生产消费模型
- 基于信号量的生产-消费者模型(环形队列)
- POSIX信号量
- 模型原理
- 工作原理
- 同步机制
- 环形队列
- 详细分析:
- 类成员变量:
- 构造函数:
- 析构函数:
- `push()` 方法(生产者操作):
- `pop()` 方法(消费者操作):
- 主函数
- 消费者线程函数 (`cosumer`)
- 生产者线程函数 (`productor`)
- 主函数 (`main`)
前言:
Linux 是一个多任务操作系统,它通过提供多种线程同步机制来帮助开发人员有效地管理线程之间的协作与冲突。正确的线程同步不仅能避免这些问题,还能提升程序的可靠性和性能。
为什么要实现线程同步
线程饥饿(Thread Starvation)
线程饥饿是指在多线程程序中,某些线程因为无法获取到所需的资源,长时间被阻塞,导致无法执行,甚至永远无法执行。这种情况通常发生在低优先级线程无法获得 CPU 时间,或者无法获得必要的锁资源时。线程饥饿会导致系统资源无法得到充分利用,程序的性能和响应性也会下降。
示例:抢票问题
- 最初的抢票问题出现了读写数据不一致问题,我们通过加锁解决了问题。
#include <iostream>
#include <unistd.h>
#include <pthread.h>using namespace std;// 定义常量 NUM 表示创建线程的数量
#define NUM 10// 初始化互斥锁 mutex,用于保护共享资源 tickets
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;// 定义共享资源 tickets,用于模拟票的数量
int tickets = 1000;// 线程的工作函数,用于抢购票
void* SanpUpTichets(void* args)
{// 将当前线程与主线程分离,确保线程结束时可以自动回收资源pthread_detach(pthread_self());// 无限循环,模拟持续抢票while(true){// 加锁,保护共享资源 ticketspthread_mutex_lock(&mutex);// 如果还有票if(tickets > 0){// 打印当前线程的信息和剩余票数cout << "pthread: " <<(int64_t) args << " tickets: " << tickets << endl;tickets--; // 抢票,票数减一}else {// 如果票已经抢完,解锁并跳出循环pthread_mutex_unlock(&mutex);break;}// 解锁,允许其他线程访问pthread_mutex_unlock(&mutex);// usleep(20); // 如果需要模拟延迟,可以解开注释}return nullptr;
}int main()
{// 创建 NUM 个线程,模拟多个线程同时抢购票for(int i = 0 ; i < NUM ; i++){pthread_t tid;void* n = (void*)i; // 传递线程编号作为参数pthread_create(&tid, nullptr, SanpUpTichets, n); // 创建新线程并执行抢票函数}// 主线程休眠 10 秒,确保子线程有足够时间抢票sleep(10);return 0;
}
代码分析:
- 共享资源:
tickets
代表剩余票数,所有线程都会访问并修改它。 - 互斥锁:
pthread_mutex_t mutex
用于确保同一时刻只有一个线程能够访问和修改tickets
,避免数据冲突。 - 多线程创建: 创建了 10 个线程,每个线程执行
SanpUpTichets
函数,尝试减少票数。 - 线程同步: 每个线程在修改
tickets
前加锁,修改完后解锁,确保线程安全。 - 问题:但是仍然存在一个问题,就是饥饿问题(发现都是8号进程进行抢票环节)。
条件变量
条件变量(Condition Variable)是多线程编程中的一种同步机制,用于在线程之间传递信号或同步操作。它允许一个线程等待某个条件成立,然后再继续执行。条件变量通常与互斥锁(mutex
)一起使用,来保护共享资源和确保线程同步
条件变量的工作原理
条件变量允许线程在某些条件成立时进行“等待”操作。当某个条件不满足时,线程会进入等待状态,直到被另一个线程通知条件已经满足并可以继续执行。
常用操作:
- 等待:一个线程可以调用
pthread_cond_wait
来等待某个条件成立。调用这个函数时,它会自动释放与条件变量关联的互斥锁,并让线程进入等待状态,直到其他线程通过条件变量通知它。 - 通知:当某个条件满足时,线程可以调用
pthread_cond_signal
或pthread_cond_broadcast
来通知等待的线程。pthread_cond_signal
会唤醒一个等待的线程,而pthread_cond_broadcast
会唤醒所有等待的线程。
pthread_cond_wait
该函数使得线程在满足某个条件之前进入阻塞状态。调用此函数时,线程将等待一个条件变量上的信号通知。调用此函数时会自动释放与条件变量关联的互斥锁,并让线程进入等待状态,直到条件满足或被其他线程通知唤醒。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数:
-
cond
:这是要等待的条件变量。它是一个pthread_cond_t
类型的结构体,用来表示条件变量。 -
mutex
:与条件变量关联的互斥锁,它是一个pthread_mutex_t
类型的结构体。此互斥锁用于保护共享资源的访问,确保只有一个线程可以修改条件变量。
pthread_cond_signal
目的:此函数用于唤醒一个正在等待条件变量 cond
的线程。
语法:
int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond
:指向条件变量的指针。
pthread_cond_broadcast
目的:此函数用于唤醒所有正在等待条件变量 cond
的线程。
语法:
int pthread_cond_broadcast(pthread_cond_t *cond);
参数:
cond
:指向条件变量的指针
基于条件变量的生产消费者模型(阻塞队列)
生产-消费者模型(也叫做游有界缓冲区)
两个进程(线程)共享一块公共的缓冲区,其中一个生产者,将数据放入缓冲区;另外一个是消费者,将数据从缓冲区取走(也可以把这个问题一般化为m
个生产者和n
个消费者)
模型原理
- 生产者(Producer):生成数据、任务或产品,并将它们放入一个共享缓冲区。
- 消费者(Consumer):从共享缓冲区中取出数据并进行处理。
- 缓冲区(Buffer):充当生产者和消费者之间的中介,通常是一个有限的队列或数组。缓冲区容量有限,当缓冲区已满时,生产者需要等待;当缓冲区为空时,消费者需要等待。
工作原理
- 生产者:将生成的数据放入缓冲区。
- 消费者:从缓冲区获取数据并进行处理。
- 缓冲区:充当共享资源,在多线程环境下进行同步。生产者和消费者的速度不同,可能会出现缓冲区满或者为空的情况。
同步机制
在生产者-消费者模型中,通常需要使用同步机制来确保生产者和消费者之间不会发生冲突,常见的同步机制包括:
- 互斥锁(Mutex):确保每次只有一个线程能够访问共享资源。
- 条件变量(Condition Variable):允许线程在特定条件下被挂起和唤醒,例如生产者等待缓冲区有空间时,消费者等待缓冲区有数据时。
- 信号量(Semaphore):控制对共享资源的访问数量,通常用于限制对缓冲区的访问。
阻塞队列
#include<iostream>
#include<queue>
#include<pthread.h>#define defaultnum 5 // 定义队列的默认容量为5// 阻塞队列模板类,支持泛型T(可以存储任意类型的数据)
template<class T>
class blockqueue
{
private:std::queue<T> _q; // 用于存储数据的标准队列int _cap = defaultnum; // 队列的最大容量,默认为5pthread_mutex_t _lock; // 互斥锁,用于保证线程安全pthread_cond_t _c_cond; // 消费者等待的条件变量,当队列为空时,消费者线程会等待pthread_cond_t _p_cond; // 生产者等待的条件变量,当队列满时,生产者线程会等待
public:// 构造函数,接受队列的最大容量blockqueue(int cap = defaultnum):_cap(cap){// 初始化互斥锁和条件变量pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}// 析构函数,销毁互斥锁和条件变量~blockqueue(){pthread_mutex_destroy(&_lock); // 销毁互斥锁pthread_cond_destroy(&_c_cond); // 销毁消费者条件变量pthread_cond_destroy(&_p_cond); // 销毁生产者条件变量}// 生产者线程调用的push方法,插入数据到队列中void push(const T& in){pthread_mutex_lock(&_lock); // 上锁,确保线程安全// 如果队列已满,生产者需要等待if (_cap == _q.size()) {pthread_cond_wait(&_p_cond, &_lock); // 等待生产者条件变量}// 将数据放入队列_q.push(in);// 唤醒一个消费者线程(如果有的话)pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_lock); // 解锁}// 消费者线程调用的pop方法,从队列中取出数据T pop(){pthread_mutex_lock(&_lock); // 上锁,确保线程安全// 如果队列为空,消费者需要等待while (_q.size() == 0) {pthread_cond_wait(&_c_cond, &_lock); // 等待消费者条件变量}// 从队列中取出数据T data = _q.front();_q.pop();// 唤醒一个生产者线程(如果有的话)pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_lock); // 解锁return data; // 返回取出的数据}
};
关键点分析
- 互斥锁 (
_lock
):用于保护队列的访问,确保每次只有一个线程能访问队列。这样可以避免多个线程同时修改队列引发的数据竞争问题。 - 条件变量 (
_c_cond
和_p_cond
):_c_cond
用于消费者线程,当队列为空时,消费者会等待该条件变量。_p_cond
用于生产者线程,当队列满时,生产者会等待该条件变量。
- 生产者操作 (
push
):- 如果队列已满,生产者会等待,直到有空位可以插入新的数据。
- 插入数据后,通过
pthread_cond_signal
唤醒一个等待的消费者线程。
- 消费者操作 (
pop
):- 如果队列为空,消费者会等待,直到有新数据可以消费。
- 从队列中取出数据后,通过
pthread_cond_signal
唤醒一个等待的生产者线程。
错误与改进:
- 线程阻塞的合理性:此实现正确处理了生产者和消费者的等待机制,保证了生产者不会在队列满时继续生产,消费者不会在队列空时继续消费。
pthread_cond_wait
调用条件:使用while
而不是if
来等待消费,因为在高并发环境下,可能会出现虚假唤醒(即线程被唤醒后,队列可能仍为空或已满)。
生产消费模型
#include<iostream> // 引入标准输入输出库
#include<unistd.h> // 引入Unix标准库,提供sleep函数等
#include "blockqueue.hpp" // 引入blockqueue头文件,定义了阻塞队列
#include "task.hpp" // 引入task头文件,定义了任务类
using namespace std;void * producter(void* args) // 生产者线程函数,传入的参数为阻塞队列的地址
{blockqueue<task>* bt = static_cast<blockqueue<task>*>(args); // 将参数转换为blockqueue类型while(true){int x = rand() % 10; // 随机生成一个整数x,范围0到9int y = rand() % 10; // 随机生成一个整数y,范围0到9task t(x, y, opers[rand() % 4]); // 创建一个task对象,运算符从opers数组中随机选择t.run(); // 执行任务的run方法bt->push(t); // 将生成的任务t放入阻塞队列cout << "生产一个数据: " << t.Gettask() << endl; // 输出任务信息sleep(1); // 休眠1秒,模拟生产的间隔时间}return nullptr; // 返回空指针,结束线程函数
}void* consumer(void* args) // 消费者线程函数,传入的参数为阻塞队列的地址
{blockqueue<task>* bt = static_cast<blockqueue<task>*>(args); // 将参数转换为blockqueue类型while(true){task data = bt->pop(); // 从阻塞队列中取出一个任务cout << "消费一个数据: " << data.Getresult() << endl; // 输出任务结果}
}int main()
{blockqueue<task>* bq = new blockqueue<task>(); // 创建一个新的阻塞队列对象bqsrand(time(nullptr)); // 使用当前时间作为随机数种子,以确保每次运行的随机数不同pthread_t ctid, ptid; // 定义消费者线程和生产者线程的线程ID// 创建生产者线程,传入bq作为参数pthread_create(&ptid, nullptr, producter, bq); // 创建消费者线程,传入bq作为参数pthread_create(&ctid, nullptr, consumer, bq);// 等待消费者线程结束pthread_join(ctid, nullptr);// 等待生产者线程结束pthread_join(ptid, nullptr);return 0; // 主函数返回
}
- **生产者线程(
producter
):
- 该线程负责生产任务并将其加入到阻塞队列中。
- 每次循环中,生产者随机生成两个整数
x
和y
,并根据一个随机的运算符构造一个task
对象。 - 调用
task
对象的run
方法进行任务处理后,将任务推送到阻塞队列中。 - 输出任务的相关信息,并休眠1秒,模拟生产任务的过程。
- 消费者线程(
consumer
):
- 该线程负责从阻塞队列中获取任务并消费(处理)这些任务。
- 消费者通过
pop
方法从阻塞队列中获取任务,然后输出任务的结果。
- 主函数(
main
):
- 创建一个
blockqueue<task>
类型的对象bq
,用于存放task
对象。 - 设置随机数种子,以确保每次运行时生成的随机数不同。
- 创建并启动生产者线程和消费者线程,传递
bq
作为参数。 - 使用
pthread_join
等待生产者和消费者线程的结束。
基于信号量的生产-消费者模型(环形队列)
POSIX信号量
sem_init()
:初始化信号量。sem_wait()
:执行“等待”操作,降低信号量的值,若信号量为0,则进程会被阻塞。sem_post()
:执行“释放”操作,增加信号量的值,若有进程等待该信号量,会唤醒一个进程。sem_destroy()
:销毁信号量。
模型原理
组件 | 描述 |
---|---|
生产者(Producer) | 不断生产数据,尝试放入缓冲区 |
消费者(Consumer) | 不断消费数据,尝试从缓冲区取出 |
环形缓冲区(Circular Queue) | 共享的有限大小的队列,生产者写入,消费者读取 |
信号量(Semaphore) | 控制生产者和消费者行为,保证同步与互斥 |
工作原理
环形队列是一种先进先出(FIFO)的结构,具备“循环”特性:
- 使用一个固定大小的数组
buffer[N]
。 - 使用两个指针:
head
:消费者从这里取数据。tail
:生产者向这里放数据。
- 通过模运算(
% N
)实现循环结构:
当tail
或head
增加到数组尾部时,再次从头开始。
同步机制
empty_slots
信号量:表示环形队列中空槽的数量。生产者每次生产一个产品时,减少一个空槽;如果空槽为0,生产者将被阻塞,直到消费者取走产品,释放空槽。full_slots
信号量:表示环形队列中已填充的槽的数量。消费者每次消费一个产品时,减少一个满槽;如果满槽为0,消费者将被阻塞,直到生产者生产产品并释放满槽。mutex
信号量:保证生产者和消费者对环形队列的互斥访问,防止多个线程同时修改队列,导致数据冲突。
环形队列
这段代码实现了一个基于 信号量 和 互斥锁 的 环形队列(Ring Queue) 类 Ringqueue
,其中 T
是队列元素的类型。环形队列的大小是可配置的,并且支持生产者和消费者并发访问。
让我们逐行分析这段代码并添加详细注释:
const static int defaultcap = 6; // 默认队列大小为6template<class T> // 泛型队列,支持任意类型的元素
class Ringqueue
{
private:std::vector<T> _ringqueue; // 存储环形队列元素int _cap; // 队列容量int _c_step; // 消费者队列指针,表示下一个消费的位置int _p_step; // 生产者队列指针,表示下一个生产的位置// 信号量,用于同步生产者和消费者的操作sem_t _cdata_sem; // 消费者信号量(数据),表示可供消费者消费的数据数量sem_t _pspace_sem; // 生产者信号量(空间),表示可供生产者生产的空槽数量// 互斥锁,用于保护生产者和消费者对队列的访问,防止竞争条件pthread_mutex_t _mutex_c; // 用于消费者线程的互斥锁pthread_mutex_t _mutex_p; // 用于生产者线程的互斥锁public:// 构造函数,初始化环形队列、信号量和互斥锁Ringqueue(int cap = defaultcap): _ringqueue(cap), _cap(cap), _c_step(0), _p_step(0) // 初始化环形队列,容量和指针{sem_init(&_cdata_sem, 0, 0); // 初始化消费者信号量,初始为0,表示没有数据可消费sem_init(&_pspace_sem, 0, cap); // 初始化生产者信号量,初始为队列容量,表示有空间可以生产pthread_mutex_init(&_mutex_c, nullptr); // 初始化消费者互斥锁pthread_mutex_init(&_mutex_p, nullptr); // 初始化生产者互斥锁}// 析构函数,销毁信号量和互斥锁~Ringqueue(){sem_destroy(&_cdata_sem); // 销毁消费者信号量sem_destroy(&_pspace_sem); // 销毁生产者信号量pthread_mutex_destroy(&_mutex_c); // 销毁消费者互斥锁pthread_mutex_destroy(&_mutex_p); // 销毁生产者互斥锁}// 插入数据到队列(生产者操作)void push(const T& in){// 等待空槽信号量(即空间是否足够,如果没有空槽,生产者阻塞)sem_wait(&_pspace_sem);// 获取生产者互斥锁,确保只有一个生产者可以修改队列pthread_mutex_lock(&_mutex_p);// 将数据插入到队列的生产者指针位置_ringqueue[_p_step++] = in;// 更新生产者指针(确保指针在环形队列中循环)_p_step %= _cap;// 释放生产者互斥锁pthread_mutex_unlock(&_mutex_p);// 增加消费者信号量,表示队列中有新数据可供消费sem_post(&_cdata_sem);}// 从队列取数据(消费者操作)T pop(){// 等待数据信号量(即是否有数据,如果没有数据,消费者阻塞)sem_wait(&_cdata_sem);// 获取消费者互斥锁,确保只有一个消费者可以修改队列pthread_mutex_lock(&_mutex_c);// 从队列的消费者指针位置取出数据T data = _ringqueue[_c_step++];// 更新消费者指针(确保指针在环形队列中循环)_c_step %= _cap;// 释放消费者互斥锁pthread_mutex_unlock(&_mutex_c);// 增加生产者信号量,表示队列中有空槽可以生产数据sem_post(&_pspace_sem);// 返回消费者取出的数据return data;}
};
详细分析:
类成员变量:
_ringqueue
:这是一个std::vector<T>
,用于存储环形队列的数据。_cap
:队列的容量,表示队列的最大长度。_c_step
和_p_step
:分别是消费者和生产者指针,用于指示队列中下一个被访问的元素的位置。它们是环形队列的重要组成部分,通过模运算实现循环。_cdata_sem
和_pspace_sem
:消费者和生产者的信号量,用于同步生产者和消费者的操作,确保生产者不会在队列满时插入数据,消费者不会在队列空时取出数据。_mutex_c
和_mutex_p
:用于保护消费者和生产者线程访问队列的互斥锁,防止并发线程访问队列时发生数据竞争。
构造函数:
- 初始化队列大小、信号量和互斥锁。
sem_init(&_cdata_sem, 0, 0)
将消费者信号量初始为0,表示队列中没有数据可供消费。sem_init(&_pspace_sem, 0, cap)
将生产者信号量初始为队列的容量,表示有足够的空间可以进行生产。
析构函数:
- 销毁信号量和互斥锁,释放资源。
push()
方法(生产者操作):
sem_wait(&_pspace_sem)
:首先,生产者通过等待pspace_sem
信号量来检查是否有空槽可以插入数据。如果没有空槽(即队列满),生产者线程将阻塞,直到有空槽可用。pthread_mutex_lock(&_mutex_p)
:然后,生产者获取生产者的互斥锁,确保在插入数据时不会有其他线程同时访问队列。- 插入数据:生产者将数据插入队列,并更新
p_step
指针。 pthread_mutex_unlock(&_mutex_p)
:释放互斥锁,允许其他线程访问队列。sem_post(&_cdata_sem)
:最后,生产者通过增加cdata_sem
信号量来通知消费者队列中有新的数据可供消费。
pop()
方法(消费者操作):
sem_wait(&_cdata_sem)
:消费者通过等待cdata_sem
信号量来检查队列是否有数据可以消费。如果没有数据(即队列空),消费者线程将阻塞,直到有数据可用。pthread_mutex_lock(&_mutex_c)
:然后,消费者获取消费者的互斥锁,确保在取出数据时不会有其他线程同时访问队列。- 取出数据:消费者从队列中取出数据,并更新
c_step
指针。 pthread_mutex_unlock(&_mutex_c)
:释放互斥锁,允许其他线程访问队列。sem_post(&_pspace_sem)
:最后,消费者通过增加pspace_sem
信号量来通知生产者队列中有空槽可以插入数据。
这段代码展示了一个简单的生产者-消费者模型,其中使用了环形队列(Ringqueue
)来存储任务(task
)。生产者线程生成任务并将其放入队列,消费者线程从队列中取出任务并处理。以下是代码的详细分析和注释:
主函数
消费者线程函数 (cosumer
)
void* cosumer(void* args)
{Ringqueue<task>* rq = static_cast<Ringqueue<task>*>(args);while (true){task data = rq->pop();cout <<"消费了一个数据: " << data.Getresult() <<endl;}return nullptr;
}
cosumer
是消费者线程的入口函数。它会一直从队列中取出任务并进行处理。- 使用
pop()
方法从队列中取出一个task
对象。 Getresult()
假设是task
类中的方法,返回任务的结果。这里通过输出该结果来模拟消费操作。while (true)
使得消费者线程持续运行,直到程序结束。
生产者线程函数 (productor
)
void* productor(void* args)
{Ringqueue<task>* rq = static_cast<Ringqueue<task>*>(args);while (true){int x = rand() % 10;int y = rand() % 10;task t(x, y, opers[rand() % opers.size()]);sleep(1);t.run();rq->push(t);cout << "生产一个数据:" << t.Gettask() << endl;usleep(1);}return nullptr;
}
productor
是生产者线程的入口函数。它持续生成任务并将其插入到环形队列中。rand() % 10
生成两个随机数x
和y
,这两个数作为任务的操作数。task t(x, y, opers[rand() % opers.size()])
创建一个新的任务对象,假设opers
是一个操作符数组(例如,加法、减法等),从中随机选择一个操作符。sleep(1)
模拟任务的生成过程,表示生产一个任务需要1秒钟时间。t.run()
执行任务(假设run()
方法执行任务操作)。rq->push(t)
将任务推送到队列中。cout
用于输出生产的任务信息。usleep(1)
将线程挂起1微秒,减少CPU的占用。
主函数 (main
)
int main()
{srand(time(nullptr)); // 用当前时间作为随机数种子,确保每次运行时生成不同的随机数pthread_t ctid, pptid, ptid; // 声明线程IDRingqueue<task>* rq = new Ringqueue<task>(); // 创建一个环形队列对象,用于存储任务// 创建消费者线程pthread_create(&ctid, nullptr, cosumer, rq);// 创建两个生产者线程pthread_create(&ptid, nullptr, productor, rq);pthread_create(&pptid, nullptr, productor, rq);// 等待所有线程完成pthread_join(ctid, nullptr);pthread_join(ptid, nullptr);pthread_join(pptid, nullptr);return 0;
}
srand(time(nullptr))
:使用当前时间作为随机数种子,确保每次程序运行时生成不同的随机数序列。pthread_t ctid, pptid, ptid
:声明三个线程ID变量,分别用于消费者线程和两个生产者线程。Ringqueue<task>* rq = new Ringqueue<task>();
:创建一个环形队列对象,用于存储生产者和消费者之间传递的任务。pthread_create(&ctid, nullptr, cosumer, rq);
:创建消费者线程,传入环形队列指针rq
。pthread_create(&ptid, nullptr, productor, rq);
和pthread_create(&pptid, nullptr, productor, rq);
:创建两个生产者线程,传入同一个环形队列指针rq
。pthread_join
用于等待线程执行完毕。由于pthread_create
是异步执行的,所以在主线程中使用pthread_join
等待每个线程的结束,确保程