线程中信号量与条件变量详解
1. 信号量(semaphore)
信号量用于互斥的方式:
信号量用于互斥,仅需要一个信号量,申请初始化信号量将信号量的初值设为1(资源总数),在共享资源访问前,对信号量做P操作,访问结束后对信号量做V操作。
Posix标准信号量:
1. 申请初始化信号量
1.1 有名信号量 (主要应用于多进程环境)
sem_open
头文件: #include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
函数原型: sem_t* sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag,
mode_t mode, unsigned int value);
函数功能: 创建或者打开Posix有名信号量
函数参数: name: 有名信号量名称
oflag:有名信号量标志:
mode: 若创建,mode代表新创建的信号量权限
value:新创建信号量的初值
函数返回值: 成功 : 返回新创建信号量地址
失败 : 返回 SEM_FAILED。错误码放在errno中
示例:
#include <unistd.h> // 提供系统调用
#include <fcntl.h> /* For O_* constants */ // 文件控制常量
#include <sys/stat.h> /* For mode constants */ // 文件模式常量
#include <semaphore.h> // 信号量
#include <stdio.h> // 标准输入输出#define SEM_NAME "/mysem" // 命名信号量的名称int main(int argc, char** argv)
{// 创建或打开一个命名信号量sem_t *sem = sem_open(SEM_NAME, // 信号量名称O_RDWR | O_CREAT, // 标志:读写模式,如果不存在则创建0600, // 权限:用户读写 (rw-------)1); // 初始值:1(二进制信号量)// 检查信号量创建是否成功if(sem == SEM_FAILED){perror("sem_open"); // 打印错误信息return -1;}// 以下操作被注释掉了,但展示了完整的命名信号量使用流程:// sem_wait(sem); // P操作:等待信号量(获取锁)// // 临界区代码...// sem_post(sem); // V操作:释放信号量(释放锁)// sem_close(sem); // 关闭信号量(但不会删除)// sem_unlink(SEM_NAME); // 删除系统中的命名信号量return 0;
}
关键机制详解
1. 命名信号量 vs 未命名信号量
特性 | 命名信号量 (Named) | 未命名信号量 (Unnamed) |
---|---|---|
创建 | sem_open() | sem_init() |
标识 | 通过名称标识 | 通过内存地址标识 |
共享范围 | 进程间共享 | 线程间共享 |
持久性 | 系统级持久 | 进程生命周期 |
销毁 | sem_unlink() + sem_close() | sem_destroy() |
2. sem_open
参数详解
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
name
:信号量名称,必须以/
开头,如"/mysem"
oflag
:O_CREAT
:如果不存在则创建O_EXCL
:与O_CREAT
一起使用,如果已存在则失败O_RDWR
:读写模式
mode
:权限位,如0600
(用户读写)value
:信号量初始值
完整的命名信号量使用流程
1. 创建和初始化
sem_t *sem = sem_open("/mysem", O_CREAT | O_RDWR, 0600, 1);
if (sem == SEM_FAILED) {perror("sem_open");return -1;
}
2. 使用信号量
// 进程1
sem_wait(sem); // 获取信号量
// 临界区代码...
sem_post(sem); // 释放信号量// 进程2(可以是不同的进程)
sem_wait(sem); // 同样可以访问同一个信号量
// 临界区代码...
sem_post(sem);
3. 清理资源
sem_close(sem); // 关闭信号量引用
sem_unlink("/mysem"); // 从系统中删除信号量
实际应用示例
示例1:多进程同步
process1.c:
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>#define SEM_NAME "/my_named_sem"int main() {sem_t *sem = sem_open(SEM_NAME, O_CREAT | O_RDWR, 0600, 1);if (sem == SEM_FAILED) {perror("sem_open");return -1;}printf("Process 1: Waiting for semaphore...\n");sem_wait(sem);printf("Process 1: Got semaphore! Working...\n");sleep(3);printf("Process 1: Releasing semaphore\n");sem_post(sem);sem_close(sem);return 0;
}
process2.c:
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>#define SEM_NAME "/my_named_sem"int main() {sem_t *sem = sem_open(SEM_NAME, O_RDWR, 0, 0); // 不创建,只打开if (sem == SEM_FAILED) {perror("sem_open");return -1;}printf("Process 2: Waiting for semaphore...\n");sem_wait(sem);printf("Process 2: Got semaphore! Working...\n");sleep(2);printf("Process 2: Releasing semaphore\n");sem_post(sem);sem_close(sem);return 0;
}
示例2:检查信号量值
#include <semaphore.h>
#include <stdio.h>int main() {sem_t *sem = sem_open("/test_sem", O_CREAT | O_RDWR, 0600, 3);int value;sem_getvalue(sem, &value);printf("Semaphore value: %d\n", value); // 输出: Semaphore value: 3sem_wait(sem);sem_getvalue(sem, &value);printf("After wait, value: %d\n", value); // 输出: After wait, value: 2sem_close(sem);sem_unlink("/test_sem");return 0;
}
系统级查看命名信号量
在Linux系统中,可以查看创建的命名信号量:
# 查看系统中的POSIX信号量
ls -l /dev/shm | grep sem# 或者使用ipcs命令(System V信号量)
ipcs -s
常见错误处理
1. 信号量已存在
// 如果信号量已存在,使用O_EXCL会失败
sem_t *sem = sem_open("/mysem", O_CREAT | O_EXCL | O_RDWR, 0600, 1);
if (sem == SEM_FAILED) {perror("sem_open"); // 可能输出: File exists
}
2. 权限问题
// 权限不足时创建失败
sem_t *sem = sem_open("/mysem", O_CREAT | O_RDWR, 0000, 1); // 无权限
3. 名称格式错误
// 名称必须以/开头
sem_t *sem = sem_open("mysem", O_CREAT | O_RDWR, 0600, 1); // 错误!
资源清理的重要性
必须正确清理:
sem_close(sem); // 关闭当前进程的引用
sem_unlink("/mysem"); // 删除系统级的信号量对象
如果不调用sem_unlink()
,信号量会一直存在于系统中,即使所有进程都退出了。
总结
这个程序演示了:
命名信号量的创建 - 使用
sem_open()
创建系统级信号量进程间同步 - 命名信号量可以在不同进程间共享
权限控制 - 通过文件模式控制访问权限
持久性 - 信号量存在于文件系统中,独立于进程生命周期
命名信号量是进程间同步的重要工具,特别适用于:
多个独立进程需要协调访问共享资源
守护进程需要提供同步服务
需要持久化存在的同步对象
当前程序只是创建了信号量,要看到完整效果需要配合其他进程一起使用。
1.2 无名信号量 (主要应用于多线程环境) *****
sem_init
头文件: #include <semaphore.h>
函数原型: int sem_init(sem_t *sem,int psharead, unsigned int value );
函数功能: 申请初始化无名信号量
函数参数: sem: 待初始化的信号量
pshared: 一般写为0,表示无名信号量是在一个进程的多个线程间共享
value: 信号量初值
函数返回值: 成功返回 0
失败返回 -1 错误码存放在errno
2. p操作
sem_wait
头文件: #include <semaphore.h>
函数原型: int sem_wait(sem_t *sem);
函数功能: P操作无名信号量
函数参数: sem: 待操作的信号量
函数返回值: 成功返回 0
失败返回 -1 错误码存放在errno
3. v操作
sem_post
头文件: #include <semaphore.h>
函数原型: int sem_post(sem_t *sem);
函数功能: V操作无名信号量
函数参数: sem: 待操作的信号量
函数返回值: 成功返回 0
失败返回 -1 错误码存放在errno
4. 回收
sem_destroy
头文件: #include <semaphore.h>
函数原型: int sem_destroy(sem_t *sem);
函数功能: 回收无名信号量
函数参数: sem: 待操作的信号量
函数返回值: 成功返回 0
失败返回 -1 错误码存放在errno
信号量的实际使用:
1. 由主线程负责申请和回收
2. 线程在共享资源访问前,对信号量做P操作, 访问结束后V操作。
使用场景:
1. 资源数量不唯一
示例:
#include <unistd.h> // 提供系统调用
#include <pthread.h> // 多线程编程
#include <stdio.h> // 标准输入输出
#include <sched.h> // 线程调度控制
#include <semaphore.h> // 信号量// 打印数字的线程函数
void* PrintDigital(void* argp)
{sem_t *sem = (sem_t*)argp; // 获取信号量指针for(int i = 0; i < 5; i++) // 循环5次{sem_wait(sem); // P操作:等待信号量(获取锁)// 临界区开始for(int j = 0; j < 10; j++){printf("%c", '0' + j); // 打印数字0-9sched_yield(); // 主动让出CPU} printf("\n"); // 换行// 临界区结束sem_post(sem); // V操作:释放信号量(释放锁)}return NULL;
}// 打印字母的线程函数
void* PrintAlpha(void* argp)
{sem_t *sem = (sem_t*)argp; // 获取信号量指针for(int i = 0; i < 5; i++) // 循环5次{sem_wait(sem); // P操作:等待信号量(获取锁)// 临界区开始for(int j = 0; j < 26; j++){printf("%c", 'A' + j); // 打印字母A-Zsched_yield(); // 主动让出CPU} printf("\n"); // 换行// 临界区结束sem_post(sem); // V操作:释放信号量(释放锁)}return NULL;
}int main(int argc, char** argv)
{pthread_t id[2]; // 两个线程ID// 初始化信号量,初始值为1(作为二进制信号量/互斥锁使用)sem_t sem;sem_init(&sem, 0, 1); // 参数:0表示线程间共享,1表示初始值// 创建两个线程pthread_create(&id[0], NULL, PrintDigital, &sem);pthread_create(&id[1], NULL, PrintAlpha, &sem);// 等待两个线程结束pthread_join(id[0], NULL);pthread_join(id[1], NULL);// 销毁信号量sem_destroy(&sem);return 0;
}
关键机制详解
1. 信号量作为互斥锁
sem_init(&sem, 0, 1); // 初始值为1的二进制信号量
工作原理:
初始值 = 1:表示锁可用
sem_wait(sem)
:如果值>0则减1并继续,否则阻塞(P操作)sem_post(sem)
:值加1,唤醒等待线程(V操作)
2. 与互斥锁的对比
特性 | 互斥锁 (Mutex) | 信号量 (Semaphore=1) |
---|---|---|
初始化 | pthread_mutex_init(&mutex, NULL) | sem_init(&sem, 0, 1) |
加锁 | pthread_mutex_lock(&mutex) | sem_wait(&sem) |
解锁 | pthread_mutex_unlock(&mutex) | sem_post(&sem) |
销毁 | pthread_mutex_destroy(&mutex) | sem_destroy(&sem) |
所有权 | 有(只能由加锁线程解锁) | 无(任何线程可操作) |
程序执行流程
预期的输出模式:
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
执行时间线示例:
时间点 | 线程0(数字) | 线程1(字母) | 信号量值 | 输出
------|-------------------|-------------------|---------|-------------------
t0 | sem_wait()成功 | | 0→1 |
t1 | 开始输出0-9 | sem_wait()阻塞 | 1 | 0123456789
t2 | sem_post() | | 0→1 |
t3 | | sem_wait()成功 | 1→0 |
t4 | sem_wait()阻塞 | 开始输出A-Z | 0 | ABCDEFGHIJKLMNOPQRSTUVWXYZ
t5 | | sem_post() | 0→1 |
t6 | sem_wait()成功 | | 1→0 |
... | ... | ... | ... | ...
信号量的不同类型
1. 二进制信号量(Binary Semaphore)
sem_init(&sem, 0, 1); // 值只能是0或1,用作互斥锁
2. 计数信号量(Counting Semaphore)
sem_init(&sem, 0, 5); // 值可以是0-5,用于资源池
3. 命名信号量(Named Semaphore)
sem_t *sem = sem_open("/my_sem", O_CREAT, 0644, 1); // 进程间共享
程序特点分析
1. 正确的同步
使用信号量确保输出完整性
不会出现数字字母混合输出
2. sched_yield()
的作用
sched_yield(); // 主动让出CPU,增加线程切换频率
这使竞争情况更明显,便于观察同步效果。
3. 信号量的灵活性
虽然这里用作互斥锁,但信号量更强大的功能在于:
// 控制并发访问数量
sem_init(&sem, 0, 3); // 允许最多3个线程同时访问// 在多个线程中:
sem_wait(&sem); // 如果有空位则进入,否则等待
// 访问受限资源...
sem_post(&sem); // 释放位置
潜在问题与改进
1. 信号量没有所有权概念
// 线程A
sem_wait(&sem);
// 临界区...// 线程B(错误的!但语法允许)
sem_post(&sem); // 可以释放其他线程获取的信号量
改进:对于严格的互斥,使用互斥锁更安全。
2. 更典型的信号量使用场景
生产者-消费者问题:
sem_t empty, full, mutex;
sem_init(&empty, 0, BUFFER_SIZE); // 空槽位数
sem_init(&full, 0, 0); // 满槽位数
sem_init(&mutex, 0, 1); // 互斥锁// 生产者
sem_wait(&empty);
sem_wait(&mutex);
// 生产数据...
sem_post(&mutex);
sem_post(&full);// 消费者
sem_wait(&full);
sem_wait(&mutex);
// 消费数据...
sem_post(&mutex);
sem_post(&empty);
总结
这个程序演示了:
信号量的基本用法 - 作为二进制信号量实现互斥
线程同步 - 保护临界区,确保输出完整性
信号量操作 -
sem_wait()
和sem_post()
的配对使用与互斥锁的对比 - 信号量更灵活但没有所有权概念
虽然在这个简单例子中,使用互斥锁可能更合适,但程序很好地展示了信号量作为同步原语的基本原理。信号量的真正优势在于处理更复杂的同步模式,如资源池、生产者-消费者等问题。
2. 同步问题及解决方案
同步:对共享资源的有序访问。
2.1 条件变量
条件变量工作机制: 条件变量不像互斥锁是竞争访问共享资源的,条件变量是用来阻塞线程的,直到某条件的成立,条件变量是要搭配互斥锁一起使用的。原因是条件的判断是要在互斥锁的保护下进行的。换句话说,在条件判断的同时是不允许其他者修改条件的,条件变量在阻塞线程的同时,会解开用于保护条件的互斥锁,更改条件的线程, 可以向条件变量发送通知,唤醒被条件变量阻塞的线程,线程重新获取互斥锁,重新评估条件,再决定是否继续进行。
条件变量的操作:
1. 申请初始化条件变量
1.1 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
1.2 动态初始化
pthread_cond_init
头文件: #include <pthread.h>
函数原型: int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t* attr);
函数功能: 初始化条件变量
函数参数: cond: 待初始化的条件变量
attr: 条件变量属性,NULL代表缺省属性:
函数返回值: 成功 : 返回0
失败 : 返回 错误码。
2. 条件变量阻塞
pthread_cond_wait
头文件: #include <pthread.h>
函数原型: int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t* mutex);
函数功能: 用条件变量阻塞调用线程,同时解锁mutex
函数参数: cond: 待操作的条件变量
mutex:待解锁的互斥锁:
函数返回值: 成功 : 返回0
失败 : 返回 错误码。
3. 条件变量通知
pthread_cond_signal
pthread_cond_broadcast
头文件: #include <pthread.h>
函数原型: int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
函数功能: 通知条件变量,唤醒被条件变量阻塞的线程
函数参数: cond: 待操作的条件变量
函数返回值: 成功 : 返回0
失败 : 返回 错误码。
4. 回收条件变量
pthread_cond_destroy()
头文件: #include <pthread.h>
函数原型: int pthread_cond_destroy(pthread_cond_t *cond);
函数功能: 回收条件变量
函数参数: cond: 待操作的条件变量
函数返回值: 成功 : 返回0
失败 : 返回 错误码。
条件变量的使用场景: 对共享资源进行有条件访问的时候,优先考虑使用条件变量
条件变量示例:
#include <unistd.h> // 提供系统调用
#include <pthread.h> // 多线程编程
#include <stdio.h> // 标准输入输出
#include <stdlib.h> // 标准库函数
#include <sched.h> // 线程调度控制int a = 2, b = 8; // 全局共享变量// 定义包含互斥锁和条件变量的结构体
typedef struct {pthread_mutex_t mutex; // 互斥锁,保护共享数据pthread_cond_t cond; // 条件变量,用于线程间通信
} param_t;// 写线程函数:修改共享变量
void *writeAcess(void* argp) {param_t* p = (param_t*)argp; // 获取参数pthread_mutex_lock(&p->mutex); // 获取互斥锁// 修改共享变量a++; // a从2变为3b--; // b从8变为7printf("a=%d, b=%d\n", a, b); // 输出:a=3, b=7// 发送条件信号,唤醒等待的线程pthread_cond_signal(&p->cond);pthread_mutex_unlock(&p->mutex); // 释放互斥锁sched_yield(); // 主动让出CPUreturn NULL; // 注意:这里缺少返回值,应该返回具体值
}// 读线程函数:等待特定条件
void* readAcess(void* argp) {param_t* p = (param_t*)argp; // 获取参数pthread_mutex_lock(&p->mutex); // 获取互斥锁// 等待条件满足:a == bwhile(a != b) {// 条件不满足时等待// pthread_cond_wait会:1.释放互斥锁 2.等待信号 3.被唤醒后重新获取互斥锁pthread_cond_wait(&p->cond, &p->mutex);}printf("\ta=b\n"); // 条件满足时的输出pthread_mutex_unlock(&p->mutex); // 释放互斥锁return NULL;
}int main(int argc, char** argv) {// 初始化互斥锁和条件变量(使用静态初始化)param_t param = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};pthread_t id, id2;// 创建两个线程pthread_create(&id, NULL, writeAcess, ¶m);pthread_create(&id2, NULL, readAcess, ¶m);// 等待两个线程结束pthread_join(id, NULL);pthread_join(id2, NULL);return 0;
}
关键机制详解
1. 条件变量(Condition Variable)机制
pthread_cond_wait
的工作流程:
while(条件不满足) {pthread_cond_wait(&cond, &mutex);// 1. 原子地释放mutex并进入等待// 2. 被唤醒后重新获取mutex// 3. 继续检查条件
}
2. 程序执行流程分析
初始状态: a = 2
, b = 8
可能的执行顺序:
情况1:写线程先执行(大概率)
写线程:获取锁 → a=3, b=7 → 发送信号 → 释放锁
读线程:获取锁 → 检查a!=b(3!=7) → 进入等待(释放锁)
读线程:被唤醒 → 重新检查a!=b(3!=7) → 继续等待
程序卡死!因为条件永远不会满足
情况2:读线程先执行(小概率)
读线程:获取锁 → 检查a!=b(2!=8) → 进入等待(释放锁)
写线程:获取锁 → a=3, b=7 → 发送信号 → 释放锁
读线程:被唤醒 → 重新检查a!=b(3!=7) → 继续等待
程序卡死!
程序存在的问题
1. 逻辑错误:条件永远不会满足
初始:a=2, b=8
写线程:a++, b-- → a=3, b=7
a和b的差值始终保持为4,永远不会相等
2. writeAcess
函数缺少返回值
void *writeAcess(void* argp) { // 声明返回void*// ... // 缺少return语句
}
修正版本
方案1:修改条件使其可能满足
void *writeAcess(void* argp) {param_t* p = (param_t*)argp;for(int i = 0; i < 6; i++) { // 多次修改,使a和b可能相等pthread_mutex_lock(&p->mutex);a++;b--;printf("a=%d, b=%d\n", a, b);// 每次修改后都发送信号pthread_cond_signal(&p->cond);pthread_mutex_unlock(&p->mutex);sched_yield();}return NULL; // 添加返回值
}// 执行结果可能:
// a=3, b=7
// a=4, b=6
// a=5, b=5 ← 条件满足!
// a=b
// a=6, b=4
// a=7, b=3
// a=8, b=2
方案2:使用更合理的条件
void *writeAcess(void* argp) {param_t* p = (param_t*)argp;pthread_mutex_lock(&p->mutex);a = 5; // 直接设置为相等的值b = 5;printf("a=%d, b=%d\n", a, b);pthread_cond_signal(&p->cond);pthread_mutex_unlock(&p->mutex);return NULL;
}
方案3:完整的生产者和消费者模式
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>int a = 2, b = 8;typedef struct {pthread_mutex_t mutex;pthread_cond_t cond;int ready; // 添加就绪标志
} param_t;void *producer(void* argp) {param_t* p = (param_t*)argp;pthread_mutex_lock(&p->mutex);// 修改数据使其满足条件a = 5;b = 5;p->ready = 1; // 设置就绪标志printf("Producer: a=%d, b=%d\n", a, b);pthread_cond_signal(&p->cond); // 通知消费者pthread_mutex_unlock(&p->mutex);return NULL;
}void* consumer(void* argp) {param_t* p = (param_t*)argp;pthread_mutex_lock(&p->mutex);// 等待条件满足while(!p->ready || a != b) {printf("Consumer: waiting... a=%d, b=%d\n", a, b);pthread_cond_wait(&p->cond, &p->mutex);}printf("Consumer: condition satisfied! a=b=%d\n", a);pthread_mutex_unlock(&p->mutex);return NULL;
}int main() {param_t param = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0};pthread_t producer_id, consumer_id;pthread_create(&consumer_id, NULL, consumer, ¶m);sleep(1); // 确保消费者先等待pthread_create(&producer_id, NULL, producer, ¶m);pthread_join(producer_id, NULL);pthread_join(consumer_id, NULL);return 0;
}
条件变量的正确使用模式
// 消费者线程
pthread_mutex_lock(&mutex);
while (condition_is_false) {pthread_cond_wait(&cond, &mutex);
}
// 处理数据
pthread_mutex_unlock(&mutex);// 生产者线程
pthread_mutex_lock(&mutex);
// 修改数据,使条件为真
condition_make_true();
pthread_cond_signal(&cond); // 或 pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
总结
这个程序虽然存在逻辑错误,但它很好地演示了:
条件变量的基本用法
线程间同步的机制
互斥锁与条件变量的配合使用
关键教训:使用条件变量时,必须确保等待的条件有可能被满足,否则会导致线程永久等待。
在上述代码中pthread_cond_wait(&p->cond, &p->mutex)
是条件变量机制中最核心且容易误解的函数。让我详细解释:
函数原型
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
原子操作的三步过程
pthread_cond_wait
实际上是一个原子操作,包含三个步骤:
步骤1:释放互斥锁
// 原子操作开始
pthread_mutex_unlock(mutex); // 先释放锁,让其他线程可以工作
步骤2:进入等待状态
// 线程进入休眠,等待条件变量信号
// 线程被挂起,不消耗CPU资源
步骤3:重新获取互斥锁
// 被唤醒后,在返回前重新获取互斥锁
pthread_mutex_lock(mutex); // 重新获得锁,然后函数才返回
// 原子操作结束
为什么需要这样设计?
1. 避免竞争条件
如果没有原子性,可能会出现:
// 错误的非原子操作:
pthread_mutex_unlock(&mutex); // 步骤1:释放锁
// ⚠️ 在这里,其他线程可能修改条件并发送信号,但当前线程还没开始等待!
pthread_cond_wait(&cond, &mutex); // 步骤2:开始等待(信号已丢失!)
2. 保证条件检查的原子性
正确的模式:
pthread_mutex_lock(&mutex);
while (condition_is_false) {// 在检查条件后和进入等待前,没有其他线程能修改条件pthread_cond_wait(&cond, &mutex);// 被唤醒后,条件被重新检查(因为条件可能再次变为false)
}
// 处理数据...
pthread_mutex_unlock(&mutex);
在原始代码中的具体应用
void* readAcess(void* argp) {param_t* p = (param_t*)argp;pthread_mutex_lock(&p->mutex); // ① 获取锁while(a != b) { // ② 检查条件// ③ 条件不满足,进入等待pthread_cond_wait(&p->cond, &p->mutex);// ⑥ 被唤醒后,自动重新获得锁,回到循环检查条件}printf("\ta=b\n"); // ⑦ 条件满足,处理数据pthread_mutex_unlock(&p->mutex); // ⑧ 释放锁return NULL;
}
完整的时间线示例
假设有两个线程:读线程(等待a==b)和写线程(修改a,b)
时间点 | 读线程 | 写线程 | 共享数据
------|------------------------------|------------------------------|-----------
t0 | pthread_mutex_lock() | | a=2, b=8
t1 | 检查 a!=b → true | | a=2, b=8
t2 | pthread_cond_wait()开始 | | a=2, b=8| → 释放mutex | |
t3 | → 进入等待状态 | pthread_mutex_lock()成功 | a=2, b=8
t4 | 等待中... | 修改: a=3, b=7 | a=3, b=7
t5 | 等待中... | pthread_cond_signal() | a=3, b=7
t6 | 等待中... | pthread_mutex_unlock() | a=3, b=7
t7 | 被唤醒 | | a=3, b=7| → 重新获取mutex | |
t8 | → pthread_cond_wait()返回 | | a=3, b=7
t9 | 回到while循环检查 a!=b → true | | a=3, b=7
t10 | 再次进入pthread_cond_wait() | | a=3, b=7
为什么使用 while
而不是 if
这是条件变量使用中最常见的错误!
错误用法(if):
pthread_mutex_lock(&mutex);
if (condition_is_false) { // ❌ 使用ifpthread_cond_wait(&cond, &mutex);
}
// 假设条件现在为true...(但可能已经被其他线程修改)
pthread_mutex_unlock(&mutex);
正确用法(while):
pthread_mutex_lock(&mutex);
while (condition_is_false) { // ✅ 使用whilepthread_cond_wait(&cond, &mutex);
}
// 确定条件为true
pthread_mutex_unlock(&mutex);
为什么需要while循环:
虚假唤醒:某些系统可能在没有信号的情况下唤醒线程
多个等待者:另一个线程可能抢先处理了条件
条件再次变化:在获得锁之前,条件可能又变为false
相关的条件变量函数
1. pthread_cond_signal()
唤醒至少一个等待的线程
如果有多个线程在等待,只唤醒其中一个
2. pthread_cond_broadcast()
唤醒所有等待的线程
适用于多个线程等待同一个条件的情况
完整的生产者-消费者示例
#include <pthread.h>#define BUFFER_SIZE 10typedef struct {pthread_mutex_t mutex;pthread_cond_t cond_not_full; // 缓冲区不满的条件pthread_cond_t cond_not_empty; // 缓冲区不空的条件int buffer[BUFFER_SIZE];int count, in, out;
} buffer_t;// 生产者
void* producer(void* arg) {buffer_t* buf = (buffer_t*)arg;for(int i = 0; i < 100; i++) {pthread_mutex_lock(&buf->mutex);// 等待缓冲区不满while(buf->count == BUFFER_SIZE) {pthread_cond_wait(&buf->cond_not_full, &buf->mutex);}// 生产数据buf->buffer[buf->in] = i;buf->in = (buf->in + 1) % BUFFER_SIZE;buf->count++;// 通知消费者pthread_cond_signal(&buf->cond_not_empty);pthread_mutex_unlock(&buf->mutex);}return NULL;
}// 消费者
void* consumer(void* arg) {buffer_t* buf = (buffer_t*)arg;for(int i = 0; i < 100; i++) {pthread_mutex_lock(&buf->mutex);// 等待缓冲区不空while(buf->count == 0) {pthread_cond_wait(&buf->cond_not_empty, &buf->mutex);}// 消费数据int item = buf->buffer[buf->out];buf->out = (buf->out + 1) % BUFFER_SIZE;buf->count--;// 通知生产者pthread_cond_signal(&buf->cond_not_full);pthread_mutex_unlock(&buf->mutex);printf("Consumed: %d\n", item);}return NULL;
}
总结
pthread_cond_wait(&cond, &mutex)
的关键要点:
原子操作:释放锁 + 等待 + 重新获取锁是一个原子操作
必须与互斥锁配合使用:在调用前必须持有互斥锁
总是使用while循环:检查条件,防止虚假唤醒
自动释放和重新获取锁:程序员不需要手动操作
线程安全:保证了条件检查和进入等待之间的原子性
这是多线程编程中最精妙的设计之一,确保了线程间同步的正确性和效率。
2.2 信号量
信号量用于同步的方式:
信号量用于同步问题,往往需要多个信号量。让其中一个信号量初值设为资源总数, 其他信号量初值设为0。如果想让某个线程先访问共享资源,则在共享资源访问前让其对初值非0的信号量做P操作,其余线程在共享资源访问前让其对初值为0的信号量做P操作,共享资源访问结束后,先访问共享资源的线程对初值为0的信号量做V操作,其余线程在共享资源访问结束后,对初值非0的信号量做V操作。
信号量的典型应用:
1. 哲学家就餐问题 (互斥)
经典的哲学家就餐问题
原始问题描述:
5个哲学家围坐在圆桌旁
每个哲学家需要2根筷子才能就餐
筷子放在哲学家之间,共5根
哲学家交替进行思考和就餐
死锁场景:
如果所有哲学家同时拿起左边的筷子,就会发生死锁:
每个哲学家都持有一根筷子,等待另一根
但所有筷子都被占用,无人能就餐
改进版本:完整的哲学家就餐问题
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#include <stdlib.h>#define PHILOSOPHER_COUNT 5
#define EAT_COUNT 3// 哲学家状态
typedef enum {THINKING,HUNGRY,EATING
} state_t;state_t states[PHILOSOPHER_COUNT];
pthread_mutex_t mutex;
sem_t chopsticks[PHILOSOPHER_COUNT];// 获取哲学家左右筷子的编号
int left(int philosopher) {return philosopher;
}int right(int philosopher) {return (philosopher + 1) % PHILOSOPHER_COUNT;
}// 检查哲学家是否可以就餐
void test(int philosopher) {if (states[philosopher] == HUNGRY &&states[left(philosopher)] != EATING &&states[right(philosopher)] != EATING) {states[philosopher] = EATING;sem_post(&chopsticks[philosopher]);}
}// 拿起筷子
void take_chopsticks(int philosopher) {pthread_mutex_lock(&mutex);states[philosopher] = HUNGRY;printf("哲学家 %d 饿了\n", philosopher);test(philosopher);pthread_mutex_unlock(&mutex);sem_wait(&chopsticks[philosopher]);
}// 放下筷子
void put_chopsticks(int philosopher) {pthread_mutex_lock(&mutex);states[philosopher] = THINKING;printf("哲学家 %d 放下筷子,开始思考\n", philosopher);// 检查左右邻居是否可以就餐test(left(philosopher));test(right(philosopher));pthread_mutex_unlock(&mutex);
}// 哲学家线程函数
void* philosopher(void* arg) {int id = *((int*)arg);for (int i = 0; i < EAT_COUNT; i++) {// 思考printf("哲学家 %d 正在思考...\n", id);usleep(rand() % 1000000);// 饥饿,尝试拿筷子take_chopsticks(id);// 就餐printf("哲学家 %d 正在就餐 🍽️\n", id);usleep(rand() % 1000000);// 放下筷子put_chopsticks(id);}printf("哲学家 %d 完成了就餐\n", id);return NULL;
}int main() {pthread_t philosophers[PHILOSOPHER_COUNT];int ids[PHILOSOPHER_COUNT];// 初始化互斥锁和信号量pthread_mutex_init(&mutex, NULL);for (int i = 0; i < PHILOSOPHER_COUNT; i++) {sem_init(&chopsticks[i], 0, 0);states[i] = THINKING;ids[i] = i;}// 创建哲学家线程for (int i = 0; i < PHILOSOPHER_COUNT; i++) {pthread_create(&philosophers[i], NULL, philosopher, &ids[i]);}// 等待所有线程结束for (int i = 0; i < PHILOSOPHER_COUNT; i++) {pthread_join(philosophers[i], NULL);}// 清理资源pthread_mutex_destroy(&mutex);for (int i = 0; i < PHILOSOPHER_COUNT; i++) {sem_destroy(&chopsticks[i]);}return 0;
}
解决死锁的方案
方案1:资源分级(破坏循环等待)
// 让哲学家按特定顺序拿筷子
void take_chopsticks_safe(int philosopher) {if (philosopher % 2 == 0) {// 偶数编号哲学家先拿左边再拿右边sem_wait(&chopsticks[left(philosopher)]);sem_wait(&chopsticks[right(philosopher)]);} else {// 奇数编号哲学家先拿右边再拿左边sem_wait(&chopsticks[right(philosopher)]);sem_wait(&chopsticks[left(philosopher)]);}
}
方案2:限制同时就餐人数
sem_t room; // 房间信号量,限制最多4个哲学家同时尝试就餐
sem_init(&room, 0, 4); // 5个哲学家,最多4个同时尝试void take_chopsticks_limited(int philosopher) {sem_wait(&room); // 进入房间sem_wait(&chopsticks[left(philosopher)]);sem_wait(&chopsticks[right(philosopher)]);
}void put_chopsticks_limited(int philosopher) {sem_post(&chopsticks[left(philosopher)]);sem_post(&chopsticks[right(philosopher)]);sem_post(&room); // 离开房间
}
方案3:使用AND信号量
// 一次性获取所有需要的资源
void take_both_chopsticks(int philosopher) {// 伪代码:同时等待左右筷子// 实际实现需要更复杂的同步机制
}
原始程序的修正版本
如果要修正原始程序,可以这样改:
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>#define PHILOSOPHER_COUNT 5 // 改为5个哲学家
#define CHOPSTICK_COUNT 5 // 5根筷子int available_chopsticks = CHOPSTICK_COUNT;
sem_t sem;void* philosopher(void* argp) {int num = *((int*)argp);sem_wait(&sem);if (available_chopsticks >= 2) { // 只有至少2根筷子时才分配available_chopsticks -= 2;printf("哲学家 %d 获取了2根筷子,剩余%d根\n", num, available_chopsticks);// 模拟就餐usleep(100000);available_chopsticks += 2;printf("哲学家 %d 释放了2根筷子,剩余%d根\n", num, available_chopsticks);} else {printf("哲学家 %d 无法获取足够的筷子\n", num);}sem_post(&sem);return NULL;
}int main() {pthread_t id[PHILOSOPHER_COUNT];int philosopher_ids[PHILOSOPHER_COUNT];sem_init(&sem, 0, 1); // 二进制信号量for(int i = 0; i < PHILOSOPHER_COUNT; i++) {philosopher_ids[i] = i + 1;pthread_create(&id[i], NULL, philosopher, &philosopher_ids[i]);}for(int i = 0; i < PHILOSOPHER_COUNT; i++) {pthread_join(id[i], NULL);}sem_destroy(&sem);return 0;
}
总结
原始程序存在的问题:
资源竞争:6个线程竞争5个资源
逻辑不完整:没有模拟真正的就餐过程
缺少死锁处理:没有预防经典的死锁场景
哲学家就餐问题是并发编程中的经典案例,主要教会我们:
死锁的识别和预防
资源分配策略
同步原语的正确使用
并发程序的设计原则
2. 生产者消费者问题 (同步)
生产者消费者问题: 也叫有限缓冲问题:
该问题描述了两个线程(生产者/消费者)对共享缓冲区的访问, 生产者是向缓冲区中写入数据,消费者是读取数据该问题的核心是:保证生产者线程在共享缓冲区满容量的时候不允许访问,保证消费者线程在共享缓冲区空容量的时候不允许访问。
#include <unistd.h> // 提供 pipe(), read(), write(), usleep()
#include <fcntl.h> // 文件控制选项
#include <pthread.h> // 多线程编程
#include <stdio.h> // 标准输入输出
#include <stdlib.h> // 提供 rand(), srand()
#include <string.h> // 字符串处理
#include <semaphore.h> // 信号量#define RESOURCE_NUM 6 // 资源数量(缓冲区大小)// 参数结构体,包含所有同步原语和通信管道
typedef struct {sem_t s1, s2; // 信号量:s1-空槽位,s2-已使用槽位pthread_mutex_t mutex; // 互斥锁,保护临界区int fd[2]; // 管道文件描述符
} param_t;// 生产者线程函数
void* productor(void* argp) {param_t *p = (param_t*)argp;const char* fruit[] = {"apple ", "banana", "orange", "tomato"}; // 可生产的水果int n = sizeof fruit / sizeof fruit[0]; // 水果种类数量for(register int i = 0; i < 20; i++) { // 生产20次sem_wait(&p->s1); // 等待空槽位(P操作)pthread_mutex_lock(&p->mutex); // 获取互斥锁int j = rand() % n; // 随机选择一种水果printf("put:%s\n", fruit[j]); // 打印生产信息write(p->fd[1], fruit[j], strlen(fruit[j])); // 写入管道pthread_mutex_unlock(&p->mutex); // 释放互斥锁sem_post(&p->s2); // 增加已使用槽位(V操作)usleep(j * 1e5); // 模拟生产时间}return NULL;
}// 消费者线程函数
void* consumer(void* argp) {param_t *p = (param_t*)argp;for(register int i = 0; i < 20; i++) { // 消费20次sem_wait(&p->s2); // 等待有数据可消费(P操作)pthread_mutex_lock(&p->mutex); // 获取互斥锁char buf[7] = {0}; // 缓冲区,初始化为0read(p->fd[0], buf, 6); // 从管道读取数据printf("\tget:%s\n", buf); // 打印消费信息pthread_mutex_unlock(&p->mutex); // 释放互斥锁sem_post(&p->s1); // 增加空槽位(V操作)int j = rand() % 10; // 随机延迟usleep(j * 1e5); // 模拟消费时间}return NULL;
}int main(int argc, char** argv) {param_t param;// 创建管道if(-1 == pipe(param.fd))return -1;// 初始化信号量和互斥锁sem_init(¶m.s1, 0, RESOURCE_NUM); // s1初始为6(空槽位数)sem_init(¶m.s2, 0, 0); // s2初始为0(已使用槽位数)pthread_mutex_init(¶m.mutex, NULL);// 创建生产者和消费者线程pthread_t id[2];pthread_create(id, NULL, productor, ¶m);pthread_create(id+1, NULL, consumer, ¶m);// 等待线程结束pthread_join(id[0], NULL);pthread_join(id[1], NULL);// 清理资源sem_destroy(¶m.s1);sem_destroy(¶m.s2);pthread_mutex_destroy(¶m.mutex);close(param.fd[0]); // 关闭管道读端close(param.fd[1]); // 关闭管道写端return 0;
}
关键机制详解
1. 信号量设计
sem_t s1, s2;
sem_init(¶m.s1, 0, RESOURCE_NUM); // 空槽位信号量
sem_init(¶m.s2, 0, 0); // 已使用槽位信号量
工作原理:
s1
:表示可用空槽位数,初始为6(缓冲区大小)s2
:表示已存放数据的槽位数,初始为0
2. 生产者-消费者同步流程
生产者:
sem_wait(&s1); // 申请空槽位(如果没有空位则等待)
锁住mutex; // 进入临界区
生产数据并放入缓冲区;
释放mutex; // 离开临界区
sem_post(&s2); // 通知消费者有新数据
消费者:
sem_wait(&s2); // 申请数据(如果没有数据则等待)
锁住mutex; // 进入临界区
从缓冲区取出数据消费;
释放mutex; // 离开临界区
sem_post(&s1); // 通知生产者有空槽位了
3. 管道作为缓冲区
int fd[2];
pipe(fd); // 创建管道
fd[0]
:读端(消费者使用)fd[1]
:写端(生产者使用)管道本身提供FIFO(先进先出)缓冲区
程序执行流程
初始状态:
空槽位信号量 s1 = 6
已使用信号量 s2 = 0
管道为空
典型输出示例:
put:apple get:apple
put:banana
put:orangeget:banana
put:tomatoget:orangeget:tomato
...(继续交替执行)
同步原语的作用
1. 信号量(控制数量)
s1
:防止生产者生产过快,缓冲区溢出s2
:防止消费者消费空缓冲区
2. 互斥锁(保护临界区)
pthread_mutex_lock(&mutex);
// 临界区:对管道的读写操作
pthread_mutex_unlock(&mutex);
确保同一时间只有一个线程访问管道。
3. 管道(数据缓冲区)
提供线程间通信通道
自动处理数据的存储和检索
程序特点
1. 正确的同步顺序
// 生产者:先申请资源,再获取锁
sem_wait(&s1); // 1. 申请空槽位
pthread_mutex_lock(...); // 2. 获取锁
// 生产...
pthread_mutex_unlock(...);
sem_post(&s2); // 3. 释放数据槽位
2. 避免死锁
信号量和互斥锁的使用顺序正确
不会出现循环等待
3. 流量控制
通过信号量值控制生产消费速率
缓冲区大小限制为6,防止内存过度使用
潜在改进
1. 添加错误检查
if (write(p->fd[1], fruit[j], strlen(fruit[j])) == -1) {perror("write error");
}
2. 更精确的缓冲区模拟
// 如果要用数组作为缓冲区而不是管道
char buffer[RESOURCE_NUM][7];
int in = 0, out = 0;
3. 多个生产者和消费者
pthread_t producers[3], consumers[2];
for (int i = 0; i < 3; i++) {pthread_create(&producers[i], NULL, productor, ¶m);
}
for (int i = 0; i < 2; i++) {pthread_create(&consumers[i], NULL, consumer, ¶m);
}
总结
这个程序很好地演示了:
生产者-消费者模式的经典实现
信号量用于资源计数和同步
互斥锁用于保护临界区
管道作为线程间通信机制
多线程同步的正确实践