【Linux系统】POSIX信号量
POSIX信号量是什么呢?还记得在前面章节中我们介绍过System V 信号量,那这两个有什么关系和区别呢?
1. 信号量的基本概念回顾
首先,信号量(Semaphore)是一种用于控制多个进程或线程对共享资源访问的同步机制。它由一个整数值和两个原子操作组成:
P操作(wait/proberen):减少信号量值,如果值为0则阻塞
V操作(post/verhogen):增加信号量值,唤醒等待的进程
核心差异全景对比
特性 | POSIX信号量 | System V信号量 |
---|---|---|
标准来源 | IEEE POSIX标准(现代跨平台) | UNIX System V规范(传统系统) |
基本单元 | 单个非负整数 | 信号量集合(数组结构体) |
操作粒度 | 仅支持增减1(sem_wait /sem_post ) | 支持任意增减值(semop 可指定操作数) |
权限控制 | 不支持动态修改权限 | 可修改权限为原始权限的子集 |
初始化原子性 | 创建与初始化原子完成(sem_init ) | 需分步创建(semget )和初始化(semctl ) |
生命周期管理 | 无名信号量随进程结束自动清理 | 需显式删除(semctl(, IPC_RMID) ) |
内存占用 | 轻量级(单信号量结构) | 高开销(支持最多25个信号量的集合) |
典型同步场景 | 线程间同步、简单进程同步(命名信号量) | 复杂进程间同步(如共享内存控制) |
POSIX信号量和SystemV信号量确实在功能上相似,都是用于进程或线程间的同步机制,确保对共享资源的无冲突访问。然而:
- POSIX信号量既可以用于进程间同步,也可以用于线程间同步,具有更广泛的适用性
- SystemV信号量主要用于进程间同步,不直接支持线程间的同步操作
2. POSIX信号量核心函数详解
初始化信号量:sem_init
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
sem
:指向要初始化的信号量对象的指针pshared
:0
:信号量在线程间共享(同一进程内的线程)非
0
:信号量在进程间共享(需要位于共享内存中)
value
:信号量的初始值(通常表示可用资源数量)
返回值:成功返回0
,失败返回-1
并设置errno
销毁信号量:sem_destroy
int sem_destroy(sem_t *sem);
参数说明:
sem
:要销毁的信号量
返回值:成功返回0
,失败返回-1
并设置errno
重要注意事项:
只有在没有线程等待信号量时才能安全销毁
销毁后的信号量不能再被使用
必须与
sem_init
配对使用
等待信号量(P操作):sem_wait
int sem_wait(sem_t *sem);
功能:执行P操作(等待/获取信号量)
如果信号量值大于0,将其减1并立即返回
如果信号量值为0,阻塞调用线程,直到信号量值变为大于0
返回值:成功返回0
,失败返回-1
并设置errno
发布信号量(V操作):sem_post
int sem_post(sem_t *sem);
功能:执行V操作(释放/发布信号量)
将信号量值加1
如果有线程正在等待该信号量,唤醒其中一个
返回值:成功返回0
,失败返回-1
并设置errno
其他有用的信号量函数
除了基本操作外,POSIX信号量还提供了一些有用的函数:
1. 非阻塞等待:sem_trywait
int sem_trywait(sem_t *sem);
功能:尝试获取信号量,如果信号量值为0,立即返回错误而不是阻塞
返回值:成功返回0
,如果信号量值为0返回-1
并设置errno
为EAGAIN
2. 带超时的等待:sem_timedwait
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
功能:尝试获取信号量,但在指定的绝对时间前超时
参数:
sem
:信号量abs_timeout
:绝对超时时间
返回值:成功返回0
,超时返回-1
并设置errno
为ETIMEDOUT
3. 获取信号量当前值:sem_getvalue
int sem_getvalue(sem_t *sem, int *sval);
功能:获取信号量的当前值
参数:
sem
:信号量sval
:输出参数,存储信号量的当前值
返回值:成功返回0
,失败返回-1
注意:在多线程环境中,获取的值可能立即过时
3. 封装信号量
和前面章节封装互斥量,条件变量一样,比较简单,不做讲解
代码如下:
#include <iostream>
#include <semaphore.h>
#include <pthread.h>namespace SemModule
{const int defaultvalue = 1;class Sem{public:Sem(unsigned int sem_value = defaultvalue){int n = sem_init(&_sem, 0, sem_value);if(n != 0){perror("sem init failed");}}void P(){int n = sem_wait(&_sem);if(n != 0){perror("sem wait failed");}}void V(){int n = sem_post(&_sem);if(n != 0){perror("sem post failed");}}~Sem(){int n = sem_destroy(&_sem);if(n != 0){perror("sem destroy failed");}}private:sem_t _sem;};
}
4. 基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
- 使用固定大小的数组作为底层存储结构
- 通过模运算(index % capacity)实现循环访问
- 示例:当队尾指针到达数组末尾时,通过取模运算回到数组开头
rear = (rear + 1) % capacity;
环形结构的状态判断方案
- 由于首尾状态相同,需要额外机制判断空/满状态
- 常见解决方案:
- 计数器方案:维护元素计数变量
- count == 0 表示空
- count == capacity 表示满
- 标记位方案:使用bool标志区分空/满
- 预留空间方案:始终保留一个空位
- (rear + 1) % capacity == front 表示满
- rear == front 表示空
- 计数器方案:维护元素计数变量
- 各种方案的比较:
方案 优点 缺点 计数器 判断简单 需要额外维护变量 标记位 实现直接 状态切换复杂 预留空间 逻辑清晰 浪费一个存储位
信号量在多线程同步中的应用
- 使用信号量作为计数器天然适合环形队列场景
- 典型实现方式:
- empty_sem:记录空位数量,初始值为capacity
- full_sem:记录数据数量,初始值为0
- 生产者操作流程:
- P(empty_sem) // 申请空位
- 写入数据
- V(full_sem) // 增加数据计数
- 消费者操作流程:
- P(full_sem) // 申请数据
- 读取数据
- V(empty_sem) // 增加空位计数
- 优势:避免忙等待,提高CPU利用率
模拟实现
说白了,信号量就是一个计数器,我们只需要使用一个信号量记录空位的数量,再使用一个信号量记录数据的数量,入队,空位减1数据加1,出队,空位加1数据减1,队列满了就阻塞生产者,队列空了就阻塞消费者,队列不为空或者不为满,生产者生产数据,消费者消费数据两者可以同时进行。
下面我们同样还是先以单生产者单消费者为例,后面再改成多生产多消费
#pragma once#include <iostream>
#include <vector>
#include "Sem.hpp"using namespace SemModule;static const int gcap = 5;template <class T>
class RingQueue
{
public:Ringqueue(int cap = gcap):_cap(cap),_rq(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}~RingQueue() {}
private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置int _p_step; // 下一个空位置的下标// 消费者Sem _data_sem; // 数据int _c_step; // 下一个数据的下标
};
这里我们使用上面封装好的信号量,注意,对于空位置的信号量需要初始化为容量大小,因为一开始队列为空,全是空位置
生产者生产数据:
// 生产者void Enqueue(const T& in){// 1. 空位置信号量大于0,信号量减1并返回;// 空位置信号量等于0,则阻塞直到信号量大于0被唤醒_blank_sem.P();// 2. 生产数据_rq[_p_step] = in;// 3. 更新下标++_p_step;// 4. 维护环形特性_p_step %= _cap;// 5. 数据信号量加1// 如果队列为空,数据信号量则为0,在阻塞等待,此时会唤醒数据信号量_data_sem.V();}
消费者同理,代码如下:
// 消费者void Pop(T* out){// 1. 获取信号量_data_sem.P();// 2. 消费数据*out = _rq[_c_step];// 3. 更新下标++_c_step;// 4. 维护环形特性_c_step %= _cap;// 5. 释放信号量_blank_sem.V();}
主程序
这里和阻塞队列一样,不做过多解释
#include "RingQueue.hpp"
#include <unistd.h>void* consumer(void* args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);while(true){sleep(1);int data = 0;rq->Pop(&data);std::cout << "消费了一个数据: " << data << std::endl;}
}void* producer(void* args)
{int data = 1;RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);while(true){std::cout << "生产了一个数据: " << data << std::endl;rq->Enqueue(data);data++;}
}int main()
{RingQueue<int>* rq = new RingQueue<int>(); // 构建生产和消费者pthread_t c[1], p[1];pthread_create(c, nullptr, consumer, rq);pthread_create(p, nullptr, producer, rq);pthread_join(c[0], nullptr);pthread_join(p[0], nullptr);return 0;
}
运行结果:
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/ThreadSync/Sem$ ./rq
生产了一个数据: 1
生产了一个数据: 2
生产了一个数据: 3
生产了一个数据: 4
生产了一个数据: 5
生产了一个数据: 6
消费了一个数据: 1
生产了一个数据: 7
消费了一个数据: 2
生产了一个数据: 8
消费了一个数据: 3
生产了一个数据: 9
消费了一个数据: 4
生产了一个数据: 10
消费了一个数据: 5
生产了一个数据: 11
^C
可以看到我们让生产者先运行,消费者sleep上1秒,生产者很快就将队列生产满,然后消费者消费旧数据,生产者又继续生产新数据。
但这是单生产单消费,我们使用信号量完成了生产者和消费者之间的互斥和同步,不需要维护生产者与生产者之间的互斥关系,也不需要维护消费者与消费者之间的互斥关系,那如果是多生产多消费呢?那我们就需要维护生产者与生产者之间,消费者与消费者之间的互斥关系,怎么维护?答案是加锁。生产者与生产者之间需要一把锁,消费者与消费者之间需要一把锁。
private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置int _p_step; // 下一个空位置的下标// 消费者Sem _data_sem; // 数据int _c_step; // 下一个数据的下标// 维护多生产,多消费Mutex _pmutex;Mutex _cmutex;
那加锁应该怎么加呢?是在获取信号量前加还是之后加?解锁呢?
// 生产者void Enqueue(const T& in){_pmutex.Lock(); // 是在信号量前加锁?// 1. 空位置信号量大于0,信号量减1并返回;// 空位置信号量等于0,则阻塞直到信号量大于0被唤醒_blank_sem.P();_pmutex.Lock(); // 还是在信号量之后加锁?// 2. 生产数据_rq[_p_step] = in;// 3. 更新下标++_p_step;// 4. 维护环形特性_p_step %= _cap;// 5. 数据信号量加1// 如果队列为空,数据信号量则为0,在阻塞等待,此时会唤醒数据信号量_pmutex.Unlock(); // 解锁是在信号量前?_data_sem.V();_pmutex.Unlock(); // 还是在信号量之后?}
正确的加锁策略是:先获取信号量,再获取互斥锁
1. 信号量在锁之前获取
信号量用于控制资源可用性(空位/数据)
先获取信号量可以避免持有锁时等待,减少锁的持有时间
如果先获取锁再获取信号量,其他线程无法访问队列,降低了并发性
2. 锁保护具体的操作
生产者锁保护
_p_step
和_rq[_p_step]
的修改消费者锁保护
_c_step
和_rq[_c_step]
的访问锁的范围应该尽可能小,只保护必要的临界区
3. 信号量在锁之后释放
先完成数据操作,再通知其他线程
确保接收信号的线程能看到完整的数据状态
为什么不能先加锁再获取信号量?
如果先加锁再获取信号量,会导致严重的性能问题甚至死锁
问题在于:
如果队列已满,生产者会持有锁并阻塞在
_blank_sem.P()
消费者无法获取锁来消费数据并释放空位
导致死锁:生产者等待消费者,消费者等待生产者的锁
我们可以举一个例子来帮助理解为什么应该先等待信号量,再加锁
想象一个电影院有:
多个检票口(消费者线程)
大量观众排队(生产者线程)
有限的座位(环形队列容量)
座位号(队列中的位置)
角色对应:
观众 = 生产者(生产"观影需求")
检票员 = 消费者(消费"观影需求")
电影院座位 = 环形队列的槽位
空座位数量 =
_blank_sem
信号量已坐观众数量 =
_data_sem
信号量检票口秩序管理员 =
_cmutex
消费者锁观众排队引导员 =
_pmutex
生产者锁
错误的方式:先加锁再等待信号量
这就像:
观众先抢到一个"排队优先权"(加锁)
然后才开始查看有没有空座位(等待信号量)
如果没空座位,观众就占着排队位置不动,阻塞在那里
其他观众无法排队,检票员也无法帮助腾出空座位
结果:整个系统死锁!大家都动不了
正确的方式:先等待信号量再加锁
这就像:
观众先确认电影院还有空座位(等待信号量)
确定有空座位后,再礼貌地排队(加锁)
快速找到座位坐下,然后离开排队区域(释放锁)
其他观众可以继续排队,检票员可以继续工作
结果:系统流畅运行!
消费者消费数据同样如此
完整代码:
#pragma once#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"using namespace MutexModule;
using namespace SemModule;static const int gcap = 5;template <class T>
class RingQueue
{
public:RingQueue(int cap = gcap):_cap(cap),_rq(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}// 生产者void Enqueue(const T& in){// 1. 空位置信号量大于0,信号量减1并返回;// 空位置信号量等于0,则阻塞直到信号量大于0被唤醒_blank_sem.P();_pmutex.Lock();// 2. 生产数据_rq[_p_step] = in;// 3. 更新下标++_p_step;// 4. 维护环形特性_p_step %= _cap;// 5. 数据信号量加1// 如果队列为空,数据信号量则为0,在阻塞等待,此时会唤醒数据信号量_pmutex.Unlock(); _data_sem.V();}// 消费者void Pop(T* out){// 1. 获取信号量_data_sem.P();_cmutex.Lock();// 2. 消费数据*out = _rq[_c_step];// 3. 更新下标++_c_step;// 4. 维护环形特性_c_step %= _cap;// 5. 释放信号量_cmutex.Unlock();_blank_sem.V();}~RingQueue() {}
private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置int _p_step; // 下一个空位置的下标// 消费者Sem _data_sem; // 数据int _c_step; // 下一个数据的下标// 维护多生产,多消费Mutex _pmutex;Mutex _cmutex;
};
这里就不再测试了
进一步理解信号量:
信号量的本质:
信号量不仅用于实现同步互斥,更关键的是它能以原子操作的方式,在访问临界资源之前就完成对资源状态(如“是否存在”“是否就绪”)的判断。这种预先判断避免了传统条件判断(如if
)可能存在的竞态条件问题。
信号量与互斥锁(mutex)的适用场景:
- 如果资源可以拆分使用(例如多实例资源池),适合用信号量(如计数信号量控制资源数量)。
如果资源必须整体使用(一次仅允许一个线程访问),适合用互斥锁(本质是二元信号量,但更强调所有权和互斥)。