Linux系统多线程的同步问题
Linux系统多线程的同步问题
在多线程编程里,互斥锁解决了“多个线程不能同时改共享资源”的问题,但光有互斥还不够——比如抢票时,有的线程运气好一直抢到锁,其他线程永远抢不到;再比如生产者往队列里放数据,消费者还没准备好就去拿,结果拿到空数据。这些问题的核心不是“数据不安全”,而是“顺序不对”——这就是同步要解决的核心矛盾:在保证数据安全的前提下,让多线程按预期顺序访问共享资源。
这篇博客会从最容易踩坑的死锁讲起,再深入条件变量、生产消费模型、信号量这些同步工具,最后用两个实战案例(阻塞队列、环形队列)把理论落地,带你彻底搞懂Linux多线程同步的逻辑。
一、先避坑:多线程的“死锁”陷阱
在讲同步之前,必须先解决死锁这个致命问题——一旦死锁,程序会卡住不动,排查起来还特别麻烦。我第一次遇到死锁是在做一个多线程文件处理工具时,两个线程互相拿着对方需要的锁,结果双双卡住,查了半天才发现是加锁顺序反了。
1.1 什么是死锁?
死锁是指一组线程互相持有对方需要的资源(比如锁),又都不释放自己的资源,导致所有线程永久阻塞的状态。举个生活中的例子:
张三和李四都想买1块钱的棒棒糖,但各自只有5毛钱。张三说“你把5毛给我,我买了分你一半”,李四说“你先给我,我买了分你一半”——两人都不放手自己的5毛,也拿不到对方的5毛,最后谁也买不到糖,这就是死锁。
在代码里,死锁的场景更典型:比如线程A持有锁1,想申请锁2;线程B持有锁2,想申请锁1——两者互相等待,永远卡着。
1.2 死锁的4个必要条件
死锁不是随便就能发生的,必须同时满足4个条件(只要破坏其中一个,死锁就不会发生):
1.2.1 互斥条件:资源只能被一个线程占用
这是死锁的前提。比如锁的本质就是“互斥”——同一时间只有一个线程能持有锁。如果资源可以被多个线程同时访问(比如只读数据),就不会有死锁。
1.2.2 请求与保持条件:持有资源不放,还申请新资源
线程拿到一个资源后,不释放,还继续申请其他资源。比如张三拿着自己的5毛(持有资源),还想要李四的5毛(申请新资源);李四同理,这就满足了“请求与保持”。
1.2.3 不剥夺条件:资源不能被强行抢走
线程持有的资源,只能自己释放,其他线程不能强制剥夺。比如张三的5毛不能被李四抢走,只能张三主动给,这就是“不剥夺”。如果能强行抢,比如系统强制让张三把5毛给李四,死锁就解了。
1.2.4 循环等待条件:线程间形成资源等待的环路
多个线程之间形成“你等我、我等他、他等你”的环路。比如张三等李四的5毛,李四等张三的5毛,形成一个环路;如果有3个线程,A等B的锁,B等C的锁,C等A的锁,也会形成环路。
1.3 如何解决死锁?
解决死锁的核心思路就是破坏4个必要条件中的任意一个,下面是具体的落地方法,结合代码场景讲更清楚。
1.3.1 破坏“互斥条件”:尽量不用互斥资源
如果能让资源支持多线程同时访问,就不会有互斥,自然不会死锁。比如读取配置文件的线程,因为配置文件只读,多个线程可以同时读,不需要加锁——这就破坏了互斥条件。
但很多场景必须互斥(比如写数据),所以这个方法不是万能的,只能在特定场景用。
1.3.2 破坏“请求与保持”:要么全要,要么全放
线程申请资源时,如果申请不到新资源,就释放已经持有的资源,避免“抱着资源等”。实现这个逻辑需要用非阻塞加锁接口(比如pthread_mutex_trylock
),而不是阻塞加锁(pthread_mutex_lock
)。
举个例子:线程需要同时申请锁1和锁2,逻辑如下:
// 非阻塞加锁,申请不到就释放已持有资源
bool try_lock_both(pthread_mutex_t* m1, pthread_mutex_t* m2) {// 1. 尝试申请锁1int ret1 = pthread_mutex_trylock(m1);if (ret1 != 0) {return false; // 申请不到锁1,直接返回}// 2. 尝试申请锁2int ret2 = pthread_mutex_trylock(m2);if (ret2 != 0) {// 申请不到锁2,释放已持有的锁1pthread_mutex_unlock(m1);return false;}return true; // 两个锁都申请到
}// 线程逻辑
void* thread_func(void* arg) {while (true) {// 循环尝试申请两个锁,直到成功if (try_lock_both(&lock1, &lock2)) {// 访问共享资源process_data();// 释放两个锁pthread_mutex_unlock(&lock2);pthread_mutex_unlock(&lock1);break;}// 申请失败,休眠一会儿再试,避免忙等usleep(1000);}return nullptr;
}
这样一来,线程申请不到新锁时会释放已有的锁,不会“抱着资源等”,破坏了“请求与保持”条件。
1.3.3 破坏“不剥夺条件”:允许强制释放资源
这个方法在用户态编程里很少用,更多是操作系统层面的机制——比如系统检测到死锁时,强制终止一个线程,释放它持有的资源。
在用户态,我们可以通过“定时锁”实现类似效果:比如用pthread_mutex_timedlock
,如果超时还没申请到锁,就释放已持有的资源,避免永久阻塞。
1.3.4 破坏“循环等待”:固定资源申请顺序
这是最常用、最有效的方法——所有线程申请资源时,都按同一个固定顺序来。比如所有线程都先申请锁1,再申请锁2,就不会形成环路。
举个反例:线程A先申请锁1再申请锁2,线程B先申请锁2再申请锁1——这会形成环路;如果线程B也改成先申请锁1再申请锁2,即使两者同时申请,最多只有一个线程能拿到锁1,另一个阻塞,不会死锁。
代码示例:
// 所有线程都按“锁1 → 锁2”的顺序申请
void* thread_a(void* arg) {pthread_mutex_lock(&lock1); // 先锁1pthread_mutex_lock(&lock2); // 再锁2// 处理逻辑pthread_mutex_unlock(&lock2);pthread_mutex_unlock(&lock1);return nullptr;
}void* thread_b(void* arg) {pthread_mutex_lock(&lock1); // 同样先锁1pthread_mutex_lock(&lock2); // 再锁2// 处理逻辑pthread_mutex_unlock(&lock2);pthread_mutex_unlock(&lock1);return nullptr;
}
固定顺序后,永远不会出现“你等我、我等你”的环路,彻底破坏循环等待条件。
二、同步的核心工具:条件变量
解决了死锁,终于进入同步的正题。同步的核心是“让线程按顺序走”,而Linux里实现同步最常用的工具就是条件变量(Condition Variable)。
2.1 先搞懂:同步和互斥的区别
很多人会把同步和互斥搞混,其实两者的侧重点完全不同:
- 互斥:解决“数据安全”问题,保证同一时间只有一个线程访问临界区(比如抢票时不能同时改票数);
- 同步:解决“顺序”问题,保证线程按预期顺序访问资源(比如消费者要等生产者放数据后再拿,不能一上来就拿)。
简单说:互斥是“不能同时干”,同步是“该谁干谁干”。同步必须基于互斥——如果数据都不安全,谈顺序就没意义。
2.2 条件变量的本质:“等待-唤醒”机制
条件变量的作用,就是让线程在“条件不满足”时等待,在“条件满足”时被唤醒,从而实现顺序控制。举个生活中的例子:
你去银行办业务,没轮到你时,你坐在等待区(条件变量的等待队列);叫到你的号时(条件满足),柜员会喊你(唤醒),你再去窗口办理(执行后续逻辑)。
在代码里,条件变量需要配合互斥锁使用,原因有两个:
- 判断条件时需要数据安全:线程判断“条件是否满足”(比如队列是否为空)时,必须访问共享资源(队列),所以需要加锁;
- 等待时释放锁:线程条件不满足要等待时,会自动释放互斥锁,避免其他线程永远拿不到锁(比如消费者等数据时,要释放锁让生产者能放数据)。
2.3 条件变量的核心接口
Linux的原生线程库(pthread)提供了条件变量的完整接口,核心有4个:初始化、等待、唤醒、销毁。
2.3.1 1. 初始化(pthread_cond_init)
初始化条件变量,有两种方式:
- 全局/静态变量:用宏
PTHREAD_COND_INITIALIZER
初始化,不需要手动销毁; - 局部变量:用
pthread_cond_init
函数初始化,用完后需要pthread_cond_destroy
销毁。
代码示例:
#include <pthread.h>// 1. 全局条件变量(用宏初始化)
pthread_cond_t global_cond = PTHREAD_COND_INITIALIZER;// 2. 局部条件变量(用函数初始化)
void init_cond() {pthread_cond_t local_cond;// 第二个参数是属性,NULL表示默认属性int ret = pthread_cond_init(&local_cond, NULL);if (ret != 0) {perror("pthread_cond_init failed");return;}// 用完销毁pthread_cond_destroy(&local_cond);
}
2.3.2 2. 等待(pthread_cond_wait)
线程调用这个接口时,会做两件关键的事:
- 自动释放传入的互斥锁;
- 把当前线程加入条件变量的等待队列,进入阻塞状态。
直到被其他线程唤醒后,线程会:
- 重新申请互斥锁(成功后才返回);
- 继续执行后续逻辑。
为什么要自动释放锁?举个例子:消费者判断队列为空,要等待生产者放数据——如果不释放锁,生产者永远拿不到锁,无法放数据,消费者会永久等待(死锁)。
代码示例(配合互斥锁):
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件变量
int count = 0; // 共享资源(比如队列长度)void* consumer(void* arg) {while (true) {// 1. 加锁,访问共享资源pthread_mutex_lock(&mutex);// 2. 判断条件(队列是否为空),注意用while不是if!(后面讲伪唤醒)while (count == 0) {// 条件不满足,等待:释放锁,加入等待队列pthread_cond_wait(&cond, &mutex);}// 3. 条件满足,消费数据count--;printf("消费后,count = %d\n", count);// 4. 解锁pthread_mutex_unlock(&mutex);}return nullptr;
}
2.3.3 3. 唤醒(pthread_cond_signal / pthread_cond_broadcast)
唤醒等待队列中的线程,有两种方式:
pthread_cond_signal
:唤醒等待队列中的一个线程(通常是第一个);pthread_cond_broadcast
:唤醒等待队列中的所有线程。
什么时候用哪种?
- 只需要一个线程处理时用
signal
(比如生产者放了一个数据,只需要唤醒一个消费者); - 需要所有线程重新判断条件时用
broadcast
(比如系统配置更新,所有等待配置的线程都要重新加载)。
代码示例(生产者唤醒消费者):
void* producer(void* arg) {while (true) {// 1. 加锁pthread_mutex_lock(&mutex);// 2. 生产数据count++;printf("生产后,count = %d\n", count);// 3. 唤醒一个等待的消费者(条件满足了)pthread_cond_signal(&cond);// 4. 解锁pthread_mutex_unlock(&mutex);// 模拟生产耗时sleep(1);}return nullptr;
}
2.3.4 4. 销毁(pthread_cond_destroy)
销毁条件变量,只能销毁用pthread_cond_init
初始化的局部条件变量,全局/静态变量不需要手动销毁。
代码示例:
void destroy_cond() {// 销毁条件变量前,确保所有线程都已退出等待pthread_cond_destroy(&local_cond);
}
2.4 实战:用条件变量控制线程顺序
我们来写一个完整的例子:创建5个线程,让它们先等待,主线程3秒后唤醒所有线程,让它们按顺序执行(虽然唤醒后还是竞争锁,但能看到同步效果)。
完整代码:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>// 全局互斥锁和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;// 线程函数:等待被唤醒后执行
void* thread_func(void* arg) {int thread_id = *(int*)arg;free(arg); // 释放传入的参数// 加锁pthread_mutex_lock(&mutex);printf("线程%d:进入等待\n", thread_id);// 等待条件变量唤醒pthread_cond_wait(&cond, &mutex);// 被唤醒后执行printf("线程%d:被唤醒,开始执行\n", thread_id);pthread_mutex_unlock(&mutex);return nullptr;
}int main() {pthread_t tids[5];// 创建5个线程for (int i = 0; i < 5; i++) {int* id = (int*)malloc(sizeof(int));*id = i + 1; // 线程ID:1~5int ret = pthread_create(&tids[i], NULL, thread_func, id);if (ret != 0) {perror("pthread_create failed");return -1;}}// 主线程休眠3秒,让所有子线程先进入等待sleep(3);printf("\n主线程:开始唤醒所有线程\n");// 唤醒所有等待的线程pthread_cond_broadcast(&cond);// 等待所有线程结束for (int i = 0; i < 5; i++) {pthread_join(tids[i], NULL);}// 销毁条件变量(全局互斥锁不需要手动销毁)pthread_cond_destroy(&cond);return 0;
}
运行结果:
线程1:进入等待
线程2:进入等待
线程3:进入等待
线程4:进入等待
线程5:进入等待主线程:开始唤醒所有线程
线程1:被唤醒,开始执行
线程5:被唤醒,开始执行
线程4:被唤醒,开始执行
线程3:被唤醒,开始执行
线程2:被唤醒,开始执行
可以看到:所有线程先进入等待,主线程唤醒后,它们才开始执行——这就是同步的效果。
三、同步的典型应用:生产者-消费者模型
理解了条件变量,就可以落地同步的经典场景——生产者-消费者模型。这个模型几乎是多线程同步的“万能模板”,从消息队列到任务调度,很多系统都基于它实现。
3.1 什么是生产者-消费者模型?
用生活中的场景解释最容易:
- 生产者:供货商,负责生产商品(数据),放到超市(缓冲区);
- 消费者:顾客,负责从超市(缓冲区)拿商品(数据),消费(处理);
- 缓冲区:超市,用来临时存放商品,解耦生产者和消费者(供货商不用直接找顾客,顾客也不用找供货商)。
在代码里,模型的核心是:
- 生产者只关心“缓冲区有没有空间”,有就放数据;
- 消费者只关心“缓冲区有没有数据”,有就拿数据;
- 两者通过同步机制(条件变量/信号量)配合,避免“生产者放满了还放”或“消费者拿空了还拿”。
3.2 模型的“321原则”
为了方便理解,我总结了一个“321原则”,涵盖模型的所有核心要素:
3.2.1 3种关系:线程间的交互规则
生产者和消费者之间、生产者之间、消费者之间,有3种必须维护的关系:
-
生产者之间:互斥
多个生产者竞争缓冲区的空间,同一时间只能有一个生产者放数据(比如两个供货商不能同时往超市的同一个货架放东西)。 -
消费者之间:互斥
多个消费者竞争缓冲区的数据,同一时间只能有一个消费者拿数据(比如两个顾客不能同时拿超市里最后一瓶水)。 -
生产者与消费者:同步+互斥
- 同步:生产者放数据后,要通知消费者“有数据了”;消费者拿数据后,要通知生产者“有空间了”;
- 互斥:生产者放数据和消费者拿数据,不能同时进行(比如供货商往货架放东西时,顾客不能拿)。
3.2.2 2种角色:生产者和消费者
模型里只有两种线程角色:
- 生产者:负责生成数据,放入缓冲区;
- 消费者:负责从缓冲区拿数据,处理数据。
两种角色可以有多个实例(比如10个生产者、5个消费者),也可以只有一个(1个生产者、1个消费者),根据场景灵活调整。
3.2.3 1个交易场所:缓冲区
所有数据交互都通过缓冲区进行,缓冲区可以是队列、链表、数组等数据结构。核心要求是:
- 支持“生产者放数据”和“消费者拿数据”;
- 有容量限制(避免生产者无限放数据,撑爆内存)。
3.3 实战:基于阻塞队列的生产消费模型
我们用C++实现一个阻塞队列(Blocking Queue)作为缓冲区,它的特点是:
- 队列满时,生产者阻塞(等待空间);
- 队列空时,消费者阻塞(等待数据);
- 用条件变量实现同步,互斥锁保证数据安全。
3.3.1 阻塞队列的类设计
阻塞队列需要包含以下核心成员:
- 队列容器:用C++的
std::queue
存储数据; - 最大容量:限制队列的最大长度,避免内存溢出;
- 互斥锁:保护队列的访问,保证线程安全;
- 两个条件变量:
not_empty
:队列不为空时,唤醒消费者;not_full
:队列不为满时,唤醒生产者。
类的完整实现(BlockQueue.hpp):
#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <cassert>template <typename T> // 模板类,支持任意数据类型
class BlockQueue {
public:// 构造函数:初始化队列最大容量、互斥锁、条件变量BlockQueue(int max_cap = 10) : max_capacity_(max_cap) {// 初始化互斥锁int ret = pthread_mutex_init(&mutex_, NULL);assert(ret == 0);// 初始化条件变量ret = pthread_cond_init(¬_empty_, NULL);assert(ret == 0);ret = pthread_cond_init(¬_full_, NULL);assert(ret == 0);}// 析构函数:销毁互斥锁和条件变量~BlockQueue() {pthread_mutex_destroy(&mutex_);pthread_cond_destroy(¬_empty_);pthread_cond_destroy(¬_full_);}// 生产者放数据:队列满则阻塞void Push(const T& data) {// 加锁pthread_mutex_lock(&mutex_);// 队列满,等待"not_full"条件(有空间)while (queue_.size() >= max_capacity_) {printf("队列满,生产者等待...\n");pthread_cond_wait(¬_full_, &mutex_);}// 放数据到队列queue_.push(data);printf("生产者放数据:%d,队列长度:%lu\n", data, queue_.size());// 唤醒等待"not_empty"的消费者(有数据了)pthread_cond_signal(¬_empty_);// 解锁pthread_mutex_unlock(&mutex_);}// 消费者拿数据:队列空则阻塞T Pop() {// 加锁pthread_mutex_lock(&mutex_);// 队列空,等待"not_empty"条件(有数据)while (queue_.empty()) {printf("队列空,消费者等待...\n");pthread_cond_wait(¬_empty_, &mutex_);}// 拿数据T data = queue_.front();queue_.pop();printf("消费者拿数据:%d,队列长度:%lu\n", data, queue_.size());// 唤醒等待"not_full"的生产者(有空间了)pthread_cond_signal(¬_full_);// 解锁pthread_mutex_unlock(&mutex_);return data;}private:std::queue<T> queue_; // 队列容器int max_capacity_; // 队列最大容量pthread_mutex_t mutex_; // 互斥锁pthread_cond_t not_empty_; // 队列非空条件(唤醒消费者)pthread_cond_t not_full_; // 队列非满条件(唤醒生产者)
};#endif // BLOCK_QUEUE_HPP
3.3.2 生产者和消费者线程实现
我们创建2个生产者线程和3个消费者线程,生产者不断生成整数(1~10),消费者不断拿数据并打印。
测试代码(main.cpp):
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>// 全局阻塞队列(容量5)
BlockQueue<int> bq(5);// 生产者线程函数:生成1~10的整数,放入队列
void* producer(void* arg) {int producer_id = *(int*)arg;free(arg);for (int i = 1; i <= 10; i++) {bq.Push(i);// 模拟生产耗时(0.5秒)usleep(500000);}printf("生产者%d:生产完毕\n", producer_id);return nullptr;
}// 消费者线程函数:从队列拿数据,打印
void* consumer(void* arg) {int consumer_id = *(int*)arg;free(arg);while (true) {int data = bq.Pop();// 模拟消费耗时(1秒)usleep(1000000);}return nullptr;
}int main() {pthread_t prod_tids[2]; // 2个生产者pthread_t cons_tids[3]; // 3个消费者// 创建生产者线程for (int i = 0; i < 2; i++) {int* id = (int*)malloc(sizeof(int));*id = i + 1;int ret = pthread_create(&prod_tids[i], NULL, producer, id);if (ret != 0) {perror("pthread_create producer failed");return -1;}}// 创建消费者线程for (int i = 0; i < 3; i++) {int* id = (int*)malloc(sizeof(int));*id = i + 1;int ret = pthread_create(&cons_tids[i], NULL, consumer, id);if (ret != 0) {perror("pthread_create consumer failed");return -1;}}// 等待生产者线程结束(消费者是死循环,不用等)for (int i = 0; i < 2; i++) {pthread_join(prod_tids[i], NULL);}// 生产者结束后,休眠5秒让消费者处理完剩余数据,然后退出sleep(5);printf("主线程:所有数据处理完毕,退出\n");return 0;
}
3.3.3 编译运行与结果分析
编译命令(需要链接pthread库):
g++ main.cpp -o prod_cons -lpthread
运行结果(节选):
生产者放数据:1,队列长度:1
消费者拿数据:1,队列长度:0
生产者放数据:1,队列长度:1
生产者放数据:2,队列长度:2
消费者拿数据:1,队列长度:1
生产者放数据:2,队列长度:2
消费者拿数据:2,队列长度:1
生产者放数据:3,队列长度:2
生产者放数据:3,队列长度:3
消费者拿数据:2,队列长度:2
队列满,生产者等待...
消费者拿数据:3,队列长度:1
生产者放数据:4,队列长度:2
...
生产者1:生产完毕
生产者2:生产完毕
消费者拿数据:10,队列长度:0
队列空,消费者等待...
队列空,消费者等待...
主线程:所有数据处理完毕,退出
从结果能看到:
- 队列满时(长度5),生产者会等待;
- 队列空时,消费者会等待;
- 生产者放数据后唤醒消费者,消费者拿数据后唤醒生产者;
- 多个生产者和消费者通过互斥锁保证同一时间只有一个线程操作队列。
3.3.4 扩展:支持任务对象的阻塞队列
上面的阻塞队列只支持整数,实际开发中,我们更需要传递“任务”(比如计算任务、IO任务)。我们可以定义一个Task
类,让阻塞队列存储Task
对象。
Task类的实现(Task.hpp):
#ifndef TASK_HPP
#define TASK_HPP#include <iostream>
#include <string>class Task {
public:Task(int a, int b, char op) : a_(a), b_(b), op_(op), result_(0), exit_code_(0) {}// 执行任务(比如加减乘除)void Run() {switch (op_) {case '+':result_ = a_ + b_;break;case '-':result_ = a_ - b_;break;case '*':result_ = a_ * b_;break;case '/':if (b_ == 0) {exit_code_ = 1; // 除零错误result_ = 0;} else {result_ = a_ / b_;}break;default:exit_code_ = 2; // 操作符错误result_ = 0;}}// 获取任务结果std::string GetResult() const {std::string res = std::to_string(a_) + op_ + std::to_string(b_) + " = ";if (exit_code_ == 0) {res += std::to_string(result_) + " (成功)";} else if (exit_code_ == 1) {res += "错误(除零)";} else {res += "错误(无效操作符)";}return res;}private:int a_; // 操作数1int b_; // 操作数2char op_; // 操作符(+、-、*、/)int result_; // 计算结果int exit_code_; // 退出码:0成功,1除零,2无效操作符
};#endif // TASK_HPP
修改阻塞队列的模板参数为Task
,生产者生成任务,消费者执行任务:
// 生产者生成任务
void* producer(void* arg) {int producer_id = *(int*)arg;free(arg);// 生成10个任务(加减乘除)for (int i = 1; i <= 10; i++) {char ops[] = {'+', '-', '*', '/'};char op = ops[i % 4];Task task(i, i % 5, op); // 第二个操作数是i%5,故意制造除零(i%5=0时)bq.Push(task);usleep(500000);}printf("生产者%d:生产完毕\n", producer_id);return nullptr;
}// 消费者执行任务
void* consumer(void* arg) {int consumer_id = *(int*)arg;free(arg);while (true) {Task task = bq.Pop();task.Run(); // 执行任务printf("消费者%d:%s\n", consumer_id, task.GetResult().c_str());usleep(1000000);}return nullptr;
}
运行结果(节选):
消费者1:1+1 = 2 (成功)
消费者2:2-2 = 0 (成功)
消费者3:3*3 = 9 (成功)
消费者1:4/4 = 1 (成功)
消费者2:5+0 = 5 (成功)
消费者3:6-1 = 5 (成功)
消费者1:7*2 = 14 (成功)
消费者2:8/3 = 2 (成功)
消费者3:9+4 = 13 (成功)
消费者1:10-0 = 10 (成功)
这样就实现了“生产者生成任务、消费者执行任务”的异步模型,这也是很多线程池的核心逻辑。
3.4 坑点:条件变量的“伪唤醒”
在阻塞队列的Push
和Pop
方法里,判断条件时用的是while
循环,而不是if
——这是为了避免“伪唤醒”(Spurious Wakeup)。
3.4.1 什么是伪唤醒?
伪唤醒是指:线程被唤醒后,发现条件其实并不满足(比如多个生产者被唤醒,但队列只有一个空间)。原因可能是:
- 系统内核的调度机制,偶尔会唤醒多个线程;
- 调用了
pthread_cond_broadcast
,唤醒了所有等待线程,但只有部分线程的条件满足。
举个例子:
队列满了,3个生产者都在等待“not_full”条件;消费者拿了一个数据,调用pthread_cond_broadcast
唤醒所有3个生产者。但队列只有1个空间,只有1个生产者能放数据,另外2个生产者被唤醒后,发现队列又满了——这就是伪唤醒。
3.4.2 如何避免伪唤醒?
用while
循环判断条件,而不是if
:
if
:只判断一次,伪唤醒后会直接执行后续逻辑,导致错误;while
:被唤醒后重新判断条件,不满足就继续等待,避免错误。
错误写法(if):
// 错误!伪唤醒后会直接放数据,导致队列溢出
if (queue_.size() >= max_capacity_) {pthread_cond_wait(¬_full_, &mutex_);
}
queue_.push(data); // 伪唤醒后,队列可能已经满了
正确写法(while):
// 正确!伪唤醒后重新判断,不满足继续等
while (queue_.size() >= max_capacity_) {pthread_cond_wait(¬_full_, &mutex_);
}
queue_.push(data); // 此时队列一定有空间
四、另一种同步工具:信号量
除了条件变量,Linux还提供了信号量(Semaphore)来实现同步。信号量的本质是一个计数器,用来描述“可用资源的数量”,比条件变量更灵活,尤其适合多资源的场景。
4.1 信号量的核心概念
信号量的核心是P/V操作(来自荷兰语Proberen/Verhogen,意为“尝试/增加”):
- P操作:申请资源,计数器减1;如果计数器小于0,线程阻塞;
- V操作:释放资源,计数器加1;如果计数器小于等于0,唤醒一个阻塞的线程。
信号量的初始值决定了“初始可用资源的数量”,比如:
- 初始值为1:二元信号量,相当于互斥锁(同一时间只有一个线程能申请到资源);
- 初始值为5:允许5个线程同时申请到资源(比如电影院有5个座位)。
4.2 信号量的接口
Linux的信号量接口在<semaphore.h>
头文件中,核心有4个:
4.2.1 1. 初始化(sem_init)
初始化信号量:
#include <semaphore.h>// 参数:
// sem:信号量指针
// pshared:0表示线程间共享,非0表示进程间共享
// value:信号量初始值
int sem_init(sem_t* sem, int pshared, unsigned int value);
4.2.2 2. P操作(sem_wait)
申请资源,计数器减1,阻塞直到申请成功:
// 申请信号量,失败返回-1
int sem_wait(sem_t* sem);
还有非阻塞版本semp_trywait
,申请不到资源时直接返回错误,不阻塞。
4.2.3 3. V操作(sem_post)
释放资源,计数器加1,唤醒阻塞的线程:
// 释放信号量,失败返回-1
int sem_post(sem_t* sem);
4.2.4 4. 销毁(sem_destroy)
销毁信号量:
// 销毁信号量,失败返回-1
int sem_destroy(sem_t* sem);
4.3 实战:用信号量实现环形队列生产消费模型
环形队列是另一种常用的缓冲区,用数组模拟,通过“模运算”实现环形效果(比如下标到数组末尾后,回到开头)。我们用两个信号量来控制:
sem_space
:描述“空闲空间”的数量,初始值为队列容量;sem_data
:描述“可用数据”的数量,初始值为0。
4.3.1 环形队列的类设计
#include <iostream>
#include <cstring>
#include <semaphore.h>
#include <pthread.h>template <typename T, int CAP>
class RingQueue {
public:RingQueue() {// 初始化数组memset(buf_, 0, sizeof(T) * CAP);head_ = 0; // 生产者下标(放数据的位置)tail_ = 0; // 消费者下标(拿数据的位置)// 初始化信号量:space初始值CAP,data初始值0sem_init(&sem_space_, 0, CAP);sem_init(&sem_data_, 0, 0);}~RingQueue() {sem_destroy(&sem_space_);sem_destroy(&sem_data_);}// 生产者放数据void Push(const T& data) {// P操作:申请空闲空间sem_wait(&sem_space_);// 放数据到数组buf_[head_] = data;printf("生产者放数据:%d,位置:%d\n", data, head_);head_ = (head_ + 1) % CAP; // 环形下标// V操作:释放可用数据sem_post(&sem_data_);}// 消费者拿数据T Pop() {// P操作:申请可用数据sem_wait(&sem_data_);// 拿数据T data = buf_[tail_];printf("消费者拿数据:%d,位置:%d\n", data, tail_);tail_ = (tail_ + 1) % CAP; // 环形下标// V操作:释放空闲空间sem_post(&sem_space_);return data;}private:T buf_[CAP]; // 数组模拟环形队列int head_; // 生产者下标int tail_; // 消费者下标sem_t sem_space_; // 空闲空间信号量sem_t sem_data_; // 可用数据信号量
};
4.3.2 测试代码
// 全局环形队列(容量5)
RingQueue<int, 5> rq;// 生产者
void* producer(void* arg) {int id = *(int*)arg;free(arg);for (int i = 1; i <= 10; i++) {rq.Push(i);usleep(500000);}printf("生产者%d:生产完毕\n", id);return nullptr;
}// 消费者
void* consumer(void* arg) {int id = *(int*)arg;free(arg);while (true) {int data = rq.Pop();usleep(1000000);}return nullptr;
}int main() {pthread_t prod_tids[2];pthread_t cons_tids[3];// 创建生产者for (int i = 0; i < 2; i++) {int* id = (int*)malloc(sizeof(int));*id = i + 1;pthread_create(&prod_tids[i], NULL, producer, id);}// 创建消费者for (int i = 0; i < 3; i++) {int* id = (int*)malloc(sizeof(int));*id = i + 1;pthread_create(&cons_tids[i], NULL, consumer, id);}// 等待生产者结束for (int i = 0; i < 2; i++) {pthread_join(prod_tids[i], NULL);}sleep(5);printf("主线程:退出\n");return 0;
}
4.3.3 结果分析
运行结果和阻塞队列类似,但有个关键区别:信号量不需要显式加锁。因为信号量的P/V操作本身是原子的,且“申请空间”和“放数据”、“申请数据”和“拿数据”的逻辑是连续的,不会被打断。
信号量的优势在于:
- 代码更简洁,不需要手动管理互斥锁和条件变量;
- 适合多资源场景,比如允许N个线程同时访问资源(比如线程池的最大并发数)。
五、总结:同步的核心逻辑与选型
到这里,Linux多线程同步的核心内容就讲完了。最后总结一下关键逻辑和工具选型:
5.1 同步的核心逻辑
同步的本质是“顺序控制”,所有同步工具都是围绕“让线程在正确的时机做正确的事”设计的,核心逻辑有3点:
- 基于数据安全:同步必须建立在互斥的基础上(条件变量要配合锁,信号量P/V操作是原子的);
- 条件判断:线程要判断“资源是否就绪”(队列是否有数据/空间),避免无效操作;
- 等待-唤醒:条件不满足时线程等待,条件满足时被唤醒,避免忙等(比如轮询判断条件,浪费CPU)。
5.2 工具选型:条件变量 vs 信号量
场景 | 推荐工具 | 原因 |
---|---|---|
单资源、需要精细控制顺序 | 条件变量 | 可以灵活控制唤醒单个/所有线程,配合锁能处理复杂的条件判断(比如多条件等待) |
多资源、简单同步 | 信号量 | 代码简洁,不需要手动加锁,适合控制资源数量(比如线程池并发数、环形队列) |
生产消费模型 | 条件变量/信号量 | 阻塞队列用条件变量更直观,环形队列用信号量更简洁 |
5.3 实际开发中的注意事项
- 避免死锁:固定加锁顺序、用非阻塞加锁、及时释放资源;
- 缩小临界区:互斥锁和条件变量保护的代码要尽量少,避免影响并发效率;
- 处理伪唤醒:条件变量等待时用
while
循环判断条件; - 资源泄漏:记得销毁条件变量和信号量,避免内存泄漏。
同步是多线程编程的核心,也是难点——只有理解了“为什么需要同步”“不同工具的适用场景”,才能写出高效、安全的多线程代码。后续可以尝试用这些工具实现线程池、消息队列,进一步加深理解。