【Linux进阶系列】:线程(下)
🔥 本文专栏:Linux
🌸作者主页:努力努力再努力wz


💪 今日博客励志语录:
生活有时会把你推下悬崖,勇气不是指望能立刻飞翔,而是在坠落的过程中,拼命去寻找并相信自己能长出翅膀。
★★★ 本文前置知识:
线程(上)
上文在探讨了线程的基本概念、线程管理以及线程互斥之后,本文会先从线程互斥的基础上切入,旨在最终构建一个关于线程的完整、系统的概念体系。
死锁
我们知道,当多个 线程 并发访问同一个共享资源时,如果访问操作不是原子的,就会导致 数据不一致 的问题。此时常用的解决方案是使 线程 之间以互斥的方式访问该共享资源。所谓互斥访问,是指当一个线程正在访问共享资源时,其他线程不能同时访问,必须等待当前 线程 访问结束后,才有机会获得访问权限。实现互斥访问的一种典型机制是使用 互斥锁 。
为了帮助理解 互斥锁 的工作原理,我们可以借助一个自习室的模型进行类比:假设图书馆中有一个自习室,该自习室一次只能供一名学生使用。为了确保使用的独占性,学生需要向图书管理员申请一把锁和对应的钥匙,遵循先到先得的原则。谁先申请到锁和钥匙,谁就可以持有锁、进入自习室并将门锁上。这样,其他学生无法进入,只有持有钥匙的学生能够访问和使用自习室资源。使用完毕后,该学生只需要出门解锁,并将钥匙挂在门口,以便后续有需要的学生获取钥匙,继续遵循先到先得的原则。
以上就是 互斥锁 的基本思想。前面所介绍的 互斥锁 应用场景属于最典型和简单的情形,即多个线程并发访问 同一个 共享资源,此时我们只需要申请一把互斥锁对该资源进行保护。
而本文要讨论的情况稍复杂一些:多个线程不再访问同一个共享资源,而是可能访问多个不同的共享资源。为了便于理解,我们仍以自习室模型为例进行说明:
之前描述的场景相当于图书馆中只有一个自习室,一次只能由一个学生使用。而在新场景中,图书馆拥有多个自习室,每个自习室仍然一次只能供一名学生使用。但由于存在多个自习室,学生不必竞争同一个自习室,而是可以竞争不同的自习室。那么问题来了:在这种情况下,学生是向管理员申请一把锁和一把钥匙,还是多把锁和多把钥匙?
如果只申请一把锁和一把钥匙,由于只有一把锁,却存在 n 个自习室,每个自习室都需要保证独占使用,那么可行的办法是将这把锁用于锁住图书馆的大门。任何希望使用自习室的学生必须先申请到钥匙,进入图书馆后将大门反锁,然后使用其中一个自习室。尽管这种方式确实保证了每个自习室的独占性,但它存在明显缺陷:例如,学生 A 想使用自习室 1,学生 B 想使用自习室 2,两者并不冲突,理论上可以并发进行。但由于只有一把锁锁在大门,若学生 A 先获得钥匙,则学生 B 必须等待 A 使用完毕后才能进入,即使他们使用不同的自习室。这导致本可并行执行的操作被迫串行化,降低了效率。
因此,更合理的方式是:如果图书馆有 n 个自习室,就配备 n 把锁和 n 把钥匙,每把锁用于锁住一个自习室的门。学生使用某个自习室时,只需竞争该自习室对应的钥匙。在这种机制下,多个线程(学生)可以并发地访问不同的共享资源(自习室),实现真正的并行访问。
通过这个多自习室模型可以看出,一个进程中可以存在多个不同的共享资源,不同线程可能访问不同资源。每个共享资源的访问应当是互斥的,因此我们需要为每个共享资源分配一把独立的锁。希望访问某个资源的线程,必须竞争获得该资源对应的锁,从而获得访问权限。
通过上文可知,对于每一个会被并发访问的共享资源,只需要一把锁即可实现互斥保护。这类似于为一间自习室配一把锁,无需两把或多把锁。每个访问该共享资源的线程只需竞争该资源对应的锁的钥匙。
然而在某些场景中,一个线程可能需要同时竞争两把甚至更多把锁的钥匙。既然每个共享资源仅需一把锁,那么线程需要同时获取多把锁的钥匙,就意味着它要同时访问多个不同的共享资源。读者可能已经意识到这类场景确实存在,但可能因缺乏具体实例而感到困惑。下面我们将引入一个典型且具体的例子——转账操作。
我们经常使用微信、支付宝等应用进行转账。在同一时刻,A 向 B 转账和 C 向 D 转账这两个操作是可以并发执行的,并不需要 C 等待 A 的转账完成。从程序角度看,转账可被理解为调用一个转账函数,该函数涉及两个对象:发送方和接收方。而线程本质上就是一个用户态函数的执行流。既然转账操作可以并发执行,那么transfer 函数就可以作为一个线程执行。
该函数涉及两个对象:转出账户和接收账户。若以C++实现,我们可以用一个类来表示账户,不同的账户即为该类的不同实例对象。
transfer 函数将接收两个账户对象作为参数。账户类中包含一个成员变量 balance ,表示当前余额。
typedef struct {int balance;
} Account;
假设对象 A 向对象 B 转账 100 元,这个动作对应一个线程(假设为线程1)执行transfer函数的上下文。 transfer函数的内容是:将 A 对象的 balance 减 100,B 对象的 balance 加 100。同时,可能存在另一个并发操作,如 A 向 C 转账 100 元,对应另一个线程(线程2)。此时,线程2也会对 A 对象的 balance 减 100,C 对象的 balance 加 100。
void transfer(Account* from, Account* to, int amount) {if (from->balance >= amount)from->balance -= amount;to->balance += amount;
}
需要注意的是,对balance 的修改并非原子操作。其底层通常对应三条指令:从内存读取余额到寄存器、在寄存器中执行算术运算、将结果写回内存。
假设线程1先被调度执行,在执行完读取 A 的 balance 并完成减法运算后(假设 A 初始余额为 1000,寄存器中结果为 900),时间片耗尽,发生线程切换。此时线程2被调度,同样读取 A 的余额(仍为 1000),完成减法运算后写入内存,A 的余额变为 900。当线程1再次被调度时,它会继续执行之前的运算结果(寄存器中的 900),而此时寄存器的内容已经过时,写回内存后导致 A 的余额仍为 900,而实际上应扣除 200,余额应为 800。这就出现了数据不一致问题。
📊
初始状态:账户A余额1000元,账户B余额1000元线程1(A→B转账100):1. 读取A余额:1000
2. A余额减100 → 900(未写回)⬇️ 线程切换 ⬇️线程2(A→C转账100):1. 读取A余额:1000(线程1未写回)
2. A余额减100 → 900
3. 写入A余额:900
4. 完成C的余额增加切换回线程1:3. 写入A余额:900(覆盖线程2的修改)
4. 完成B的余额增加结果:A余额为900(应为800),数据不一致。
除了转出方,接收方账户同样可能被多个线程并发访问。例如,A向B转账和C向B转账同时发生时,两个线程都会对B的余额执行加法操作。若该加法操作被中断,也可能因类似的原因导致B的余额更新出现错误,例如本应增加200,结果只增加100。
因此,每个账户的balance 都是一个共享资源。如前所述,每个共享资源都需要一把锁来保护,以确保互斥访问,避免数据不一致。我们可以在账户结构中引入一个互斥锁,这样每个账户对象都拥有自己的锁。任何线程在访问该账户的balance 之前,必须先获得对应的锁。
typedef struct {int balance;pthread_mutex_t lock; // 每个账户一把锁
} Account;
由于transfer函数需要同时访问转出方和接收方的balance ,因此执行该函数的线程需要同时获取两把不同的锁:转出账户的锁和接收账户的锁。以A向B转账和A向C转账为例,两个线程会竞争转出方A的锁,只有一个线程能成功获取,另一个线程必须等待。类似地,在A向B转账和C向B转账的场景中,两个线程会竞争接收方B的锁。通过这种方式,我们保证了涉及同一账户的转账操作串行执行,从而避免了数据不一致。
线程1(A→B转账100):
1. 锁定A账户
2. 锁定B账户
3. 执行转账:A=900, B=1100
4. 释放锁线程2(A→C转账100):
1. 尝试锁定A账户 → 等待(因为被线程1持有)
2. 线程1完成后,获取A锁,然后获取C锁
3. 执行转账:A=800, C=1100
// 转账函数 - 需同时锁定两个账户
void* transfer(Account* from, Account* to, int amount) {pthread_mutex_lock(&from->lock);pthread_mutex_lock(&to->lock);if (from->balance >= amount) {from->balance -= amount;to->balance += amount;
}pthread_mutex_unlock(&to->lock);
pthread_mutex_unlock(&from->lock);
return (void*)0
}
但这里存在一个关键问题:由于所有线程都执行相同的transfer函数,加锁顺序是固定的(例如先转出方后接收方)。这种固定的顺序可能导致死锁(deadlock)。为了引出死锁问题,我们进行了上述铺垫。
而注意一个账户既可以作为转出方,也可以作为接收方。考虑账户A向B转账(线程1)与B向A转账(线程2)同时发生的情况。两个线程都需要获取两个账户的锁,但加锁顺序固定。假设线程1先锁定A,线程2先锁定B,之后线程1尝试锁定B(已被线程2持有),线程2尝试锁定A(已被线程1持有)。两个线程都会互相等待对方释放锁,从而陷入永久阻塞,即死锁。
我们可以通过一个比喻理解该场景:两个人A和B要各自进入一个房间,他们要各自进入的房间前有两道门,每道门有一把锁,要进入该房间需要解开两道门对应的锁,而每道门需要对应的一把钥匙才能打开,而A要打开的第一道房门的锁和B要打开的第二道房门的锁是相同的,而B要打开的第一道房门的锁和A要打开的第二道房门的锁是相同的。但两人各自持有其中一把打开各自第一道房门锁的钥匙,双方都需要对方手中的钥匙才能打开下一道房门从而进入房间,由于双方都不愿先交出各自的钥匙但又想对方交出各自的钥匙,最终陷入僵持状态。
🔴 潜在的死锁问题:
线程1(A→B转账) 线程2(B→A转账)1. 锁定A账户 1. 锁定B账户
2. 等待B账户锁 2. 等待A账户锁
3. ⛔ 死锁! 3. ⛔ 死锁!
而这里不管是先加锁转账方还是先加锁接收方,那么都是会导致同样的结果,也就是死锁
根据前文讨论的现象,我们现在来分析死锁发生的原因。首先,两个线程需要 互斥 访问同一个共享资源。正是因为需要互斥访问,才引入了互斥锁,进而可能引发死锁。第二个关键原因是 请求与保持 。以上文为例,线程1已持有一把锁,并尝试获取另一把锁,但在获取失败后并不会释放自己已持有的资源;线程2的行为类似。最终,双方互不让步,导致死锁。
第三个导致死锁的原因是不可剥夺 。例如,线程1申请锁失败是因为线程2已持有该锁,但线程1不会强行从线程2手中夺取该锁。最后一个原因是循环等待 。仍以上文为例,线程1持有锁A并等待锁B,线程2持有锁B并等待锁A,双方互相等待,陷入僵持,从而产生死锁。
线程1:持有A锁---------→ 等待B锁↑ ↓
线程2:持有B锁---------→ 等待A锁
综上所述,🔄死锁的产生可归纳为以下四个必要条件:
互斥条件(Mutual Exclusion)请求与保持条件(Hold and Wait)不可剥夺条件(No Preemption)循环等待条件(Circular Wait)
需要注意的是,这四个条件是产生死锁的必要条件。所谓必要条件,指的是这四个条件同时成立并不一定导致死锁,但若发生死锁,则这四个条件必须同时存在。有读者可能会问,为什么四个条件都满足时不一定产生死锁?我们可以用上文提到的转账例子来说明。
假设线程1执行A向B转账 ,线程2执行B向A转账 。线程1先获取A锁,再申请B锁;线程2先获取B锁,再申请A锁。如果线程1先执行,且在执行过程中没有发生线程切换,完整执行完整个流程后才调度线程2执行,则两个线程实际上是串行执行,并未并发。或者,线程1在获取两把锁之后才发生线程切换,同样不会产生死锁。由此可见,死锁的出现实际上是一个概率性问题,这也导致一旦程序出现死锁,很难通过调试复现。
因此,死锁发生的充分必要条件是:在加锁过程中发生线程切换(例如在持有一把锁后、获取第二把锁前被切换),并且同时满足上述四个必要条件。
接下来我们探讨如何解决死锁。既然死锁的発生依赖于四个必要条件,那么只要破坏其中任意一个条件,就能避免死锁。
首先需要明确,互斥条件是无法破坏的,因为在多线程并发访问共享资源时,必须通过加锁机制保证互斥访问。因此,我们只能从其余三个条件入手。
- 破坏
请求与保持条件
请求与保持 指的是线程在已持有资源的情况下,若申请其他资源失败,会阻塞等待但仍不释放已持有资源。破坏这一条件的方法是:当线程申请第二个资源失败时,立即释放已持有的资源,之后重新发起申请。
我们可以使用线程库提供的非阻塞加锁函数 pthread_mutex_trylock 实现这一机制:
pthread_mutex_trylock- 头文件:<pthread.h>
- 函数声明:int pthread_mutex_trylock(pthread_mutex_t *mutex);
- 返回值:成功返回0,失败返回错误码(不设置
errno)
该函数在加锁失败时不会阻塞,而是立即返回非零错误码。我们可借此在申请第二把锁失败后释放已持有的锁,然后重新尝试,从而避免死锁。示例代码如下:
void* transfer(Account* from, Account* to, int amount) {int ret = 1;while (ret != 0) {pthread_mutex_lock(&from->lock);ret = pthread_mutex_trylock(&to->lock);if (ret != 0) {pthread_mutex_unlock(&from->lock);} else {if (from->balance >= amount) {from->balance -= amount;to->balance += amount;}break;}}pthread_mutex_unlock(&to->lock);pthread_mutex_unlock(&from->lock);return (void*)0;
}
然而,这种做法可能引发 活锁 (Livelock)问题:假设线程1首先被调度执行,它成功申请到第一把锁,但在尝试申请第二把锁前被切换;随后线程2被调度,同样成功申请到其第一把锁后又被切换回线程1。此时,线程1尝试申请第二把锁失败,若在此刻再次被切换;线程2被调度后,同样会申请第二把锁失败。接着,线程1释放其已持有的锁,线程2也随之释放其锁。这一过程会不断重复:线程1获得第一把锁,线程2获得其第一把锁,随后双方申请第二把锁均失败,继而各自释放已持有的锁……如此循环,形成活锁。
// 初始状态:线程1(A→B)和线程2(B→A)同时开始时间点 T0:
线程1: 锁定A成功 ✓
线程2: 锁定B成功 ✓时间点 T1:
线程1: 尝试锁定B → 失败(被线程2持有)
线程2: 尝试锁定A → 失败(被线程1持有)时间点 T2:
线程1: 释放A锁 ✓
线程2: 释放B锁 ✓// 关键问题出现在这里!
时间点 T3:
线程1: 立即重试 → 锁定A ✓
线程2: 立即重试 → 锁定B ✓时间点 T4:
线程1: 尝试锁定B → 失败
线程2: 尝试锁定A → 失败时间点 T5:
线程1: 释放A
线程2: 释放B时间点 T6:
线程1: 立即重试 → 锁定A
线程2: 立即重试 → 锁定B
// 如此循环...
这种情况下,线程在行为上表现为一种 相互礼让 但 步伐一致 的循环依赖,导致系统整体工作停滞,这就是活锁。它与死锁的关键区别在于:死锁中的线程处于阻塞等待状态(如普通的 pthread_mutex_lock 会使其休眠),而活锁中的线程则处于繁忙的循环执行状态。为解决该问题,可引入重试次数限制或退避策略:
void* transfer(Account* from, Account* to, int amount) {int ret = 1;int retry_count = 0;const int MAX_RETRIES = 10;while (ret != 0 && retry_count < MAX_RETRIES) {pthread_mutex_lock(&from->lock);ret = pthread_mutex_trylock(&to->lock);if (ret != 0) {pthread_mutex_unlock(&from->lock);usleep(1000 * (retry_count + 1)); // 退避策略:等待时间递增retry_count++;
} else {if (from->balance >= amount) {from->balance -= amount;to->balance += amount;}break;
}}if (ret == 0) {pthread_mutex_unlock(&to->lock);pthread_mutex_unlock(&from->lock);return (void*)0; // 成功
} else {return (void*)1; // 失败:超过最大重试次数
}}
- 破坏
不可剥夺条件
不可剥夺 指线程持有的资源不能被强制释放。要破坏该条件,需引入资源强制回收机制。然而,实现这一点较为复杂:一方面,剥夺操作需保证原子性,避免多个线程同时剥夺同一资源;另一方面,被剥夺的线程需能感知资源丢失并妥善处理。因此,该方案在实际中较少使用,本文不展开实现。
- 破坏
循环等待条件(推荐)
循环等待的根本原因是加锁顺序不一致。通过统一加锁顺序可有效避免该问题。例如,为每个资源设置唯一ID,并规定加锁必须按照ID从小到大顺序进行:
typedef struct {int balance;pthread_mutex_t lock;int id;
} Account;void* transfer(Account* from, Account* to, int amount) {if (from->id < to->id) {pthread_mutex_lock(&from->lock);pthread_mutex_lock(&to->lock);} else {pthread_mutex_lock(&to->lock);pthread_mutex_lock(&from->lock);}if (from->balance >= amount) {from->balance -= amount;to->balance += amount;
}pthread_mutex_unlock(&to->lock);
pthread_mutex_unlock(&from->lock);
return (void*)0;}
这种方法简单有效,是实践中常用的死锁预防手段。
除了上述方法,还存在如银行家算法等死锁避免机制,但因篇幅所限,本文不再展开。掌握以上几种方法已能应对多数死锁场景,若读者感兴趣可进一步查阅相关资料。
线程的同步
那么之前我讲解了线程的互斥,那么想必读者对于线程间的互斥并不陌生,那么就会是通过互斥锁来实现线程间的互斥,那么除了线程间的互斥,那么还有一个概念,那么便是线程间的同步,那么为了让读者理解线程间的同步这个概念,那么这里我还是引入一个模型来帮组读者理解什么是线程间的间的同步
那么现在有一个厨房,那么这个厨房和之前我上文提到的自习室是一样,那么就是该厨房只能提供给一个厨师所使用,意味着厨房的使用肯定是得保证互斥的,所以如何保证互斥,我们也并不陌生,那么就是我们可以给厨房门口申请一把锁,然后让厨师持有该锁对应的钥匙,并且遵循先到先得的原则,然后进入厨房将门反锁,这样其他厨师就无法进入该厨房,然后使用厨房内部的资源,那么这部分内容和之前所谈的线程的互斥是一样的,而接下来的区别就来了,那么我们知道厨师使用该厨房的目的就是为了做菜,但是当厨师获取到该厨房的钥匙,也就是获得了该厨房的使用权,然后进入厨房,但是发现该厨房内厨师要做的菜所需要的原材料还没有准备好,那么意味着这里厨师只能等待专门人员将其所需要的原材料送到厨房来
而现在就有一个问题,那么就是厨房一次只能提供给一个厨师使用,那么现在厨师由于原材料还没有就绪,那么该厨师待在这个厨房里面啥事也做不了,因为没有原材料,但是厨房外面却还有很多需要使用该厨房的厨师,所以该厨师不可能就一直等待原材料送到该厨房中来并且还一直占用厨房,所以这里厨师只能解锁,然后离开厨房并且将钥匙放到门口,但是这里要注意,这里厨师离开厨房,并不是说因为原材料还没有就绪,那么厨师就离开厨房不干了
而是因为厨房外面还有需要使用该厨房的厨师,那么为了避免造成其他厨师的等待,所以这里提高效率,让厨师先暂时离开厨房,也就是对应的让线程先挂起
而我们知道厨房只能提供给一个人使用,那么运送原材料的快递员,那么虽然其身份不是厨师,但是它也需要进入到厨房来将原材料放入到厨房然后离开,那么意味着快递员也需要竞争该厨房的锁,获取厨房的访问权,然后将原材料送到厨房内之后,然后离开,那么这里就要注意的是,对于该快递员来说,那么其要做的工作不仅仅是把原材料放到厨房,其还需要通知之前因为该原材料没有就绪而离开厨房陷入阻塞的厨师,让其在厨房外面不要在继续等待了,让其去和其他厨师一起,共同争抢钥匙获取厨房的访问权
那么通过上文的例子,我们便可以引入以及理解所谓线程的同步,那么所谓线程的同步,就是当一个线程互斥访问该共享资源的时候,发现资源缺少某个条件,或者资源没有就绪,从而陷入等待,然后等待资源就绪的时候,在重新尝试获取该资源的访问权来访问该资源
那么这里我们结合刚才说的,才来重新审视上文的例子,那么所谓的资源缺少某个条件或者资源没有就绪,其实就是对应上文的厨房中缺少当前厨师所需要的原材料,并且这里我们要从线程的同步的概念要认识到关键的一点,那么就是线程的同步一定是在互斥的基础之上的,还是以上文的例子为例,我们知道当厨房缺少厨师所需要的资源或者资源还没有就绪的时候,那么厨师会暂时先离开厨房,等待资源就绪
💡 那么这里我们要知道的就是,厨师之所以暂时陷入阻塞并且暂时离开是因为资源没有就绪,而关键是,厨师需要亲自进入到厨房当中去亲自检查究竟资源就没就绪,也就是厨房中是否有自己需要的原材料,而我们反复强调厨房只能提供给一个厨师使用,也就是厨房的访问是互斥的,那么意味着在厨师检查资源是否就绪这个动作之前,一定得先申请厨房的访问权,也就是获取锁,才能够能够进入厨房检查到资源是否就绪,所以这里就验证了线程的同步一定是在线程互斥的基础上,或者说线程的同步必须伴随着或者配合着线程的互斥,这也是我们习惯性说线程的同步与互斥,会将其放在一起来说,那么这是我要阐明的第一个点
而这里我接下来我要来阐明的第二个点:那么我们知道如果线程发现当前资源没有准备就绪,此时会释放锁,然后陷入阻塞等待直到资源就绪,那么这里要理解的一点,假设资源就绪了,那么一定有某种机制能够立刻通知并且唤醒该线程,至于这个机制是什么,我会在后续详细讲解,而此时我们可以以一个新的视角来看待线程,那么就是我们可以将竞争锁的线程分为三类,第一类是因为没有竞争到锁也就是加锁失败而在等待队列中被唤醒的线程,第二类就是刚才说的,发现资源没有就绪而陷入等待然后被唤醒的线程,而第三类就是第一次尝试竞争获取该锁的线程
那么这个资源没有就绪其实是一个很大的概念,这个资源就绪可以具体到很多方面,比如究竟是因为没有醋还是因为没有酱油而导致的该厨师无法做菜,而我们知道因为厨房没有醋而离开厨房陷入等待的厨师肯定不只有一个,而我们上文将互斥锁就提到过,那么会有很多个线程会竞争该锁而陷入等待,那么一旦该锁被释放,那么为了避免惊群效应带来大量的无效的上下文切换,因为只有一个线程能够获取到锁,那么线程库会维护一个等待队列,那么一次只唤醒队头的线程,然后与其他第一次访问尝试加锁的线程来竞争,而不是全部唤醒
同理这里因为某个条件缺失或者资源没就绪而陷入阻塞等待的线程不只有一个,那么这里也会维护一个等待队列将这些因为某个条件而陷入阻塞的线程组织起来,那么一旦条件就绪,那么只会唤醒队头的线程,而不是唤醒所有线程,因为避免惊群效应,而至于线程在等待队列中的位置则取决于他们先后访问共享资源的时刻
而我们知道有些厨师可能是因为醋没有陷入等待,有些厨师是因为酱油没有而陷入等待,那么与之对应的陷入等待的线程可能是因为不同的资源没就绪而陷入的等待,而我们知道因为资源没有就绪的线程会采取等待队列的方式来组织,而这里对于某一个特定资源没有就绪的线程,那么我们会维护特定的等待队列,这是因为当该特定资源的队列就绪之后,那么会唤醒因为该资源没就绪而陷入等待的线程,不可能你醋没就绪,结果你让人家因为酱油没就绪的厨师进入厨房做饭,所以这里会为每一个特定的条件缺失的线程维护对应的等待队列
那么接下来就是理解的一个关键的一点,当某个条件就绪时,系统会唤醒对应等待队列中的队首线程。此时,该线程是否直接获得共享资源的使用权?
若被条件就绪唤醒的线程直接获得共享资源的访问权,则可能引发线程饥饿问题。因为需要访问同一共享资源的线程可能不止一个。如果被条件就绪唤醒的线程具有较高的调度优先级,就可能导致那些因竞争锁失败而阻塞的线程长时间无法被调度,从而持续处于阻塞状态。需要注意的是,竞争该锁的线程主要包括三类:因加锁失败而被阻塞后唤醒的线程、因条件就绪而被唤醒的线程,以及首次尝试加锁的线程。
💡 因此,正确的处理方式是:==当条件就绪时,唤醒对应等待队列的队首线程,但该线程仍需与其他两类线程共同竞争锁资源。如果竞争失败,则该线程会被放入加锁失败的等待队列中继续等待。==这正是本文要阐明的第二个关键点:每个因特定条件未就绪而阻塞的线程会被置于对应的条件等待队列中;一旦条件就绪,系统将唤醒相应队列的队首线程,使其与其他线程公平竞争锁资源。
那么相信通过上文的内容,读者应该能够彻底打通对于线程的同步关于概念层面上的疑惑,那么接下来我们将目光聚焦于如何在代码层面上实现线程的同步,那么要实现线程的同步,那么就得通过条件变量来实现
条件变量
那么条件变量本质上与锁类似,都是一个变量。条件变量的类型为 pthread_cond_t ,而pthread_cond_t 本质上是一个联合体(union)。其典型实现如下:
struct pthread_cond_impl {pthread_mutex_t __lock; // 内部锁,用于保护条件变量状态struct wait_queue *__queue; // 等待队列头指针int __waiters; // 当前等待的线程数int __signals; // 已发送的信号数// ... 其他平台相关字段
};typedef union {struct pthread_cond_impl __impl;char __size[__SIZEOF_PTHREAD_COND_T];
} pthread_cond_t;
条件变量的相关属性记录在内部结构体的字段中。我们无需掌握所有字段的细节,但需要理解其中几个关键成员:一个是内部锁(用于保护条件变量状态),另一个是指向等待队列头部的指针。这些是理解条件变量接口底层实现原理的核心。
值得注意的是,pthread_cond_t 被定义为联合体。联合体的所有成员变量从对象的起始地址开始偏移,其大小由最大的成员决定。这里的 __size 字符数组用于控制联合体的总大小,通常是为了满足不同平台对齐和大小约束而设计的。
定义条件变量即声明一个 pthread_cond_t 类型的变量。了解如何定义后,接下来需要掌握其初始化方式。条件变量的初始化与互斥锁类似,分为静态初始化和动态初始化两种。
静态初始化通过宏 PTHREAD_COND_INITIALIZER 实现。该宏是一个编译时常量,对应一个各字段初始化为零的联合体对象,通常用于初始化全局变量或静态变量。
动态初始化则需调用 pthread_cond_init 函数:
pthread_cond_init- 头文件:<pthread.h>
- 声明:int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- 返回值:成功返回0,失败返回错误码(不设置
errno)
该函数接受两个参数:第一个参数为指向条件变量的指针,第二个参数用于设置条件变量的属性(如进程共享属性)。若使用动态初始化,则必须在使用完毕后调用 pthread_cond_destroy 进行销毁:
pthread_cond_destroy- 头文件:<pthread.h>
- 声明:int pthread_cond_destroy(pthread_cond_t *cond);
- 返回值:成功返回0,失败返回错误码(不设置
errno)
在了解如何初始化条件变量之后,接下来我们将深入探讨条件变量的核心接口。如前文所述,当线程获取锁并进入临界区后,若发现所需资源尚未就绪,该线程会释放锁并被置入等待队列。这一行为正是通过pthread_cond_wait 接口实现的。
pthread_cond_wait- 头文件:<pthread.h>
- 声明:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 返回值:成功时返回 0,失败时返回错误码(不设置
errno)。
pthread_cond_wait 接受两个参数:条件变量指针和互斥锁指针,二者均为输入参数。第一个参数的理解较为直观,而第二个参数(互斥锁)的存在则与 pthread_cond_wait 的底层机制密切相关。
如前所述,条件变量本质上是一个联合体(union),其内部结构体包含两个关键字段:一把互斥锁和一个等待队列。由于条件变量本身是共享资源,可能被多个线程并发访问,为保护其内部状态的一致性,条件变量会维护一把内部互斥锁。
pthread_cond_wait 首先会获取条件变量的内部锁,然后将当前线程加入等待队列,最后释放传入的第二个参数(即用户提供的互斥锁)。以下是其伪代码实现,帮助理解这一过程的原子性:
//pthread_cond_wait伪代码实现
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) {if (cond == NULL || mutex== NULL){return EINVAL;}//获取条件变量内部结构
cond_internal_t* cond_internal = (cond_internal_t*) cond;// 1. 锁定条件变量内部锁(保护条件变量状态)
pthread_mutex_lock(&cond_internal->internal_lock);// 2. 验证调用线程持有传入的互斥锁
if (mutex->owner != current_thread) {pthread_mutex_unlock(&cond_internal->internal_lock);return EPERM;
}// 3. 将当前线程加入条件变量的等待队列
current_thread->state = THREAD_WAITING;
current_thread->waiting_mutex = mutex;
current_thread->waiting_cond = cond_internal;
//将线程加入等待队列(简化实现)
enqueue(&cond_internal->wait_queue, current_thread);// 4. 记录需要重新获取的锁
pthread_mutex_t* user_mutex_to_relock = mutex;// 5. 释放条件变量内部锁
pthread_mutex_unlock(&cond_internal->internal_lock);// 6. 关键原子操作:释放用户锁+线程挂起
// 这个操作必须是原子的,防止竞争条件
// 6.1 首先释放用户传入的锁
int unlock_result=pthread_mutex_lock(user_mutex_to_relock);
if (unlock_result != 0) {// 处理错误:需要将线程从等待队列中移除pthread_mutex_lock(&cond_internal->internal_lock);remove_from_queue(&cond_internal->wait_queue, current_thread);pthread_mutex_unlock(&cond_internal->internal_lock);return unlock_result;
}// 6.2 线程主动让出CPU,进入等待状态
// 这里会调用调度器,切换到其他线程
thread_block(); // 系统调用:将线程状态设为阻塞,并触发调度// 7. 当线程被pthread_cond_signal唤醒后,从这里继续执行
// 此时线程需要重新获取用户互斥锁// 8. 重新获取用户锁(可能会阻塞)
int lock_result = pthread_mutex_lock(user_mutex_to_relock);
if (lock_result != 0) {// 获取锁失败,线程状态可能不一致,需要错误处理return lock_result;
}// 9. 线程现在重新持有了用户锁,可以安全返回
return 0;}
当线程被唤醒时,会从pthread_cond_wait 的阻塞点继续执行。此时,线程需要重新获取之前释放的用户互斥锁。因此,pthread_cond_wait 在返回前会调用 pthread_mutex_lock 尝试重新加锁。若加锁失败,线程会再次进入阻塞状态,等待锁的可用性。
在理解了如何使线程等待资源就绪后,我们还需掌握如何唤醒等待的线程。这需要通过pthread_cond_signal 接口实现:
pthread_cond_signal- 头文件:<pthread.h>
- 声明: int pthread_cond_signal(pthread_cond_t *cond);
- 返回值:成功时返回 0,失败时返回错误码(不设置
errno)。
该接口接受一个条件变量指针作为输入参数,其核心功能是唤醒等待队列中的一个线程。条件变量内部维护了一个等待队列, pthread_cond_signal 会从队列头部取出一个线程并将其置为可调度状态。以下是其伪代码示例:
int pthread_cond_signal(pthread_cond_t *cond) {if (cond == NULL) {return EINVAL;}struct cond_internal *cond_internal = cond->internal;// 1. 锁定条件变量内部互斥锁pthread_mutex_lock(&cond_internal->internal_mutex);// 2. 如果等待队列不为空,唤醒一个线程if (!is_queue_empty(&cond_internal->wait_queue)) {thread_t *thread_to_wake = dequeue(&cond_internal->wait_queue);// 3. 将线程状态改为可运行thread_to_wake->state = THREAD_READY;// 4. 将线程加入调度器的就绪队列scheduler_enqueue(thread_to_wake);}// 5. 释放条件变量内部锁pthread_mutex_unlock(&cond_internal->internal_mutex);return 0;
}
此外,还存在另一个唤醒接口 pthread_cond_broadcast,其功能是唤醒等待队列中的所有线程。但需注意,大规模唤醒可能引发“惊群效应”(thundering herd problem),导致系统资源竞争加剧。因此,若无特殊需求,通常优先使用 pthread_cond_signal 以逐个唤醒线程。
以上便是条件变量的核心接口及其底层机制。接下来,我们将通过经典的生产者-消费者模型,进一步探讨条件变量的实际应用场景与使用方法。
生产者-消费者模型
在具体实现生产者与消费者模型之前,我们首先需要明确这一模型的基本概念。这里我们依然通过一个例子来帮助读者快速理解什么是生产者-消费者模型。
现实生活中, 生产者 负责加工原材料并制造产品,然后将其交给 消费者 ;而消费者则获取并使用这些产品。
假设消费者需通过批发商获取生产者制造的产品。批发商设置了一个货架,生产者 完成货物生产后,会直接将货物放置于货架上,随后继续生产下一个货物;消费者 则通过检查货架来获取已摆放的货物,若有货物则直接取走。
现在考虑一个特殊情形:该货架仅有一个槽位,即只能存放一件货物。假设当前为单生产者-单消费者场景,生产者在将货物放入货架后,是否能够立即继续生产下一个货物?
答案是肯定的,生产者 可以继续生产下一件货物。但问题是,如果生产者 已经完成了下一件货物的生产,而消费者尚未取走货架上的货物,导致货架槽位仍被占用,那么此时生产者能否继续无视该情况,转而生产新的货物?
在这种情况下,答案是否定的。由于货架只有一个槽位,且当前已被占用,生产者手中已生产好的货物无法放置,此时生产者只能进入阻塞状态,等待消费者取走货物、腾出槽位。一旦槽位空闲,生产者便可放置货物,并继续后续生产活动。
让我们将视角转移到消费者 , 消费者 需从货架获取货物。若检查货架时发现槽位为空,即生产者 尚未产出货物,则消费者只能等待,直到生产者将货物放置于货架后方可取走。
上述场景正是条件变量 (condition variable)的典型应用。货架作为共享资源,可被生产者和 消费者 并发访问。由于对共享资源的访问操作不具备原子性,必须通过互斥机制来保证其访问的互斥性。
因此,该货架在任何时刻只允许一个实体(无论是生产者还是消费者)访问。若生产者获得货架的访问权,其首先应检查货架槽位是否为空。若为空,则需放置货物并释放访问权,随后继续生产;若不为空,说明条件未满足,生产者不能持续占用访问权,而应释放访问权并进入等待状态,直到接收到槽位空闲的通知。收到通知后,生产者重新获得访问权并放置货物,并且在完成放置后,需通知消费者货物已就绪。
同理,若消费者获得货架访问权,也应先检查槽位中是否有货物。若有,则取走货物并通知生产者槽位已空;若无货物,则释放访问权并等待,直到接收到货物就绪的通知。
由此可见,生产者与消费者之间需要一种同步机制,这正是通过条件变量实现的。在单生产者-单消费者场景中,当条件未就绪时,生产者或消费者将进入等待状态,待条件就绪后被唤醒,完成相应操作。
然而,实际中更常见的场景是多生产者-多消费者。需理解的是,生产者的工作不仅包括将货物放入货架,还包括获取原材料、加工原材料从而生产出消费者需要的货物;消费者也不仅是取走货物,还包括后续对货物的处理。
由于货架是共享资源,其访问必须互斥执行。因此,多个生产者在访问货架时需串行进行,但其准备货物的过程(如原材料加工)可以并发执行。同样,多个消费者在取货时需互斥访问,而对货物的后续处理可以并发执行。
在货架仅有一个槽位的情况下,若生产者数量远大于消费者,即生产能力超过消费能力,将导致多个生产者已生产好货物却因货架占满而无法放置,只能进入等待状态,直到消费者取走货物并通知空闲。反之,若消费者数量远大于生产者,即消费能力超过生产能力,货架经常处于空闲状态,消费者将频繁等待生产者供货。
这种生产能力与消费能力不匹配的情况,将导致较弱的一方长时间处于饥饿状态,迟迟无法被调度执行,降低系统的并发度和执行效率。其根本原因在于货架容量有限,仅有一个槽位。
为解决该问题,可扩展货架规模,使其具备多个槽位。在多槽位货架中,生产者只需找到空闲槽位即可放置货物,并继续后续生产;消费者也只需找到有货物的槽位取走货物。多槽位设计显著提升了系统的并发能力。由于货架具备多个槽位,可用空间更加充裕,生产者通常无需等待即可找到空闲槽位放置货物,从而持续进行生产;消费者也更容易发现已有货物可取的槽位,及时获取并继续执行后续处理任务。
更重要的是,多槽位货架能够缓解生产与消费能力不匹配的问题。若生产能力较强,由于货架不只有一个槽位,生产者可以填满多个槽位,消费者按自己的节奏逐步取货;若消费能力较强,多个空闲槽位可确保生产者能快速放置货物,消费者及时获取,从而在整体上提升系统的吞吐量与响应性。
那么,在理解了抽象的生产者-消费者模型之后,接下来我们需要具体探讨如何在代码层面实现这一模型。
如前文所述,生产者与消费者之间通过一个具有多个槽位的共享区域——即“缓冲区”——进行数据传递。缓冲区可以采用不同的方式实现,而生产者和消费者本质上属于工作线程,只是各自承担的角色不同。生产者线程的主要任务是获取数据,经过加工处理后将其写入缓冲区;消费者线程则从缓冲区中读取数据,并进行后续处理。下文将介绍基于阻塞队列实现缓冲区的生产者-消费者模型。

基于阻塞队列的生产者-消费者模型
原理
那么,缓冲区的第一种实现方式是阻塞队列。所谓阻塞队列,实质上是采用队列这一数据结构来实现缓冲区功能。队列的底层通常是一个链表,链表的每个节点用于存放生产者线程向消费者线程发送的数据。
对于生产者线程而言,其主要操作包括获取数据、对数据进行加工、创建新节点并将加工后的数据存入节点的数据域,最后将新节点链接到链表(即队列)的尾部。而消费者线程则从队列头部读取数据,并随后删除该头节点。

在实现阻塞队列时,我们可以定义一个 Blockqueue 类。鉴于 C++ 标准模板库(STL)已提供队列容器
queue ,本设计中的 Blockqueue 类将封装一个 queue 对象。此处需要特别强调的是,由于
Blockqueue 内部包含一个共享的 queue 容器,且运行环境为多生产者、多消费者场景,这意味着将有多个线程并发访问Blockqueue 类中的 queue 容器。
生产者线程的核心逻辑是调用 queue 容器所提供的push 函数向队尾插入元素。然而,push 操作并非原子操作,执行过程中可能发生线程切换。因此,多个生产者线程之间必须互斥地访问阻塞队列,以确保数据一致性。同理,消费者线程的核心逻辑是读取队首元素后调用 queue 容器所提供的 pop 函数移除该元素。
pop 函数同样是非原子操作,因此多个消费者线程之间也必须互斥地访问队列。
至此,我们已经明确了两类互斥关系:生产者与生产者之间,以及消费者与消费者之间。接下来探讨第三类关系:生产者与消费者之间能否并发访问阻塞队列?即是否允许一个线程执行 push 操作的同时,另一个线程执行 pop 操作?
从客观角度分析,STL 中的 queue 容器通常基于容器适配器模式实现,底层可选用 deque 或 list 等容器。假设底层采用带哨兵节点的双向循环链表(list ),则 push 操作修改的是尾节点的后继指针及其哨兵节点的前驱指针,而 pop 操作修改的是头节点的后继节点的前驱指针及其哨兵节点的后继指针。如果这两个操作修改的是不同的内存区域因为修改的实不同的指针变量,由于并发访问的共享资源不同,那么即使操作非原子,也可能不会引发数据竞争,所以这里是否意味着生产者和消费者之间能够并发访问阻塞队列呢?
然而,答案是否定的,生产者与消费者不能并发访问队列。考虑队列仅含一个节点的边界情况:此时 pop 操作此时需要修改哨兵节点的前驱和后继指针,而 push 操作也要修改哨兵节点的前驱指针。若两者并发执行,将导致对哨兵节点前驱指针的并发修改,进而引发数据不一致。此外,若队列底层由动态数组(如 vector )实现,push 操作可能触发数组扩容,导致 pop 操作访问已失效的旧数组,同样会造成数据错误。
因此,我们得出结论:生产者与生产者、消费者与消费者、以及生产者与消费者之间,均需保证互斥访问。这意味着整个 Blockqueue 类只需维护一把互斥锁,所有线程在访问队列前必须竞争该锁。
除互斥外,阻塞队列还需实现同步机制。回顾生产者-消费者模型的基本原理:当生产者线程获取队列访问权(即锁)后,应首先检查条件是否满足——即队列是否已满(我们需预设队列容量上限,避免因生产速率高于消费速率导致队列持续增长,最终耗尽堆内存)。若队列已满,则该生产者线程应进入等待状态,并加入条件变量对应的等待队列并且释放锁,直至条件满足后被唤醒。同理,消费者线程在获得锁后,也需先检查队列是否为空;若为空,则进入等待状态,并且释放锁,直至被唤醒。
注意,生产者和消费者因不同条件未满足而等待,因此需要两个独立的条件变量:一个用于生产者(等待“队列未满”条件),另一个用于消费者(等待“队列非空”条件)。这样,当消费者弹出队首元素后,可调用pthread_cond_signal 通知生产者条件变量,精准唤醒生产者线程;反之,生产者插入新元素后,也可精准唤醒消费者线程。
综上所述,Blockqueue 类需包含以下成员:一个 queue 容器、一把互斥锁、两个条件变量,以及一个表示队列容量上限的整型变量。由于生产者所生产的数据类型未知,该类应设计为模板类。
template<typename T>
class Blockqueue
{
public:Blockqueue(int max_size = max_num): Max_size(max_size){pthread_mutex_init(&mutex, NULL);pthread_cond_init(&c_wait, NULL);pthread_cond_init(&p_wait, NULL);}~Blockqueue(){pthread_mutex_destroy(&mutex);pthread_cond_destroy(&c_wait);pthread_cond_destroy(&p_wait);}// .....
private:std::queue<T> q;pthread_mutex_t mutex;pthread_cond_t c_wait; // 消费者等待的条件变量(队列非空)pthread_cond_t p_wait; // 生产者等待的条件变量(队列未满)int Max_size;
};
在了解 Blockqueue 类的基本框架之后,接下来我们实现其关键成员函数:push 和 pop 。
首先看 push 函数。该函数仅由生产者线程调用,用于向阻塞队列尾部插入元素。在插入之前,生产者线程需要先获取互斥锁。加锁成功后,线程会检查条件是否满足——即队列是否已满。若队列已满,则调用pthread_cond_wait 将当前线程置于条件变量的等待队列中,并释放锁;若未满,则将数据插入队列尾部(调用底层queue 的 push 方法)。插入完成后,调用 pthread_cond_wait 唤醒可能因队列为空而等待的消费者线程。
push 函数的实现逻辑较为简单,但有一个关键细节需要注意:虚假唤醒(Spurious Wakeup)。通常,当线程因条件不满足而进入等待状态时,预期只有在条件就绪时才会被唤醒。然而,在 Linux 系统中,即使条件未就绪,线程也可能被唤醒,这种现象称为虚假唤醒。
虚假唤醒与 pthread_cond_wait 的底层实现有关。如前文所述,pthread_cond_wait 在执行时会先获取条件变量内部的互斥锁,将当前线程加入等待队列,然后释放已持有的锁并使线程进入阻塞状态。其中,阻塞操作通过futex_wait 系统调用实现,其核心是调用schedule() 使线程让出 CPU 进入睡眠。
// 内核中 futex_wait 的简化逻辑
static int futex_wait(..., int *uaddr, int val) {// 1. 检查条件,避免竞争if (*uaddr != val)return -EAGAIN;// 2. 将线程状态设置为 TASK_INTERRUPTIBLE
set_current_state(TASK_INTERRUPTIBLE);// 3. 将线程加入等待队列
add_wait_queue(&futex_q->wq, &wait);// 4. 让出 CPU,进入等待状态
schedule(); // 线程在此处进入睡眠// 5. 若被信号唤醒,从此处继续执行
remove_wait_queue(&futex_q->wq, &wait);// 检查是否被信号中断
if (signal_pending(current))return -EINTR; // 返回中断错误return 0;}
线程在阻塞期间可能收到信号。每个线程对应一个task_struct 结构,其中维护了信号位图和阻塞位图。当内核向线程发送信号时,会设置对应信号的位图标志,并检查线程状态。若线程处于TASK_INTERRUPTIBLE (可中断睡眠)状态,内核会将其状态改为就绪,并重新放入调度队列;若线程处于TASK_UNINTERRUPTIBLE (不可中断睡眠)状态,则仅设置信号位,待线程被调度时再处理信号。
发送信号 → 设置信号位图 → 检查线程状态 → 如果是TASK_INTERRUPTIBLE → 唤醒线程(改变状态)→ 线程重新调度 → 系统调用返回EINTR → 表现为虚假唤醒
因此,当线程因条件不满足进入 TASK_INTERRUPTIBLE 状态后,若在此期间收到信号,可能会被提前唤醒,即使队列条件并未就绪。此时线程会从 pthread_cond_wait 中恢复执行,但可能并未满足执行条件(如队列仍为满)。若直接继续执行后文,可能导致数据错误。
为解决这一问题,应将 pthread_cond_wait 置于 while 循环中,循环条件为队列是否已满。这样,即使线程被虚假唤醒,也会重新检查条件,若条件仍不满足则再次等待。
void push(const T& data)
{pthread_mutex_lock(&mutex);while (q.size() == Max_size) {pthread_cond_wait(&p_wait, &mutex);}q.push(data);pthread_cond_signal(&c_wait);pthread_mutex_unlock(&mutex);
}
对于 pop 函数,其实现逻辑与 push 类似。消费者线程先加锁,然后检查队列是否为空:若不为空,则取出队首元素并弹出,随后唤醒可能因队列满而等待的生产者线程;若为空,则调用 pthread_cond_wait 进入等待,并同样使用 while 循环避免虚假唤醒。
T pop()
{pthread_mutex_lock(&mutex);while (q.size() == 0) {pthread_cond_wait(&c_wait, &mutex);}T data = q.front();q.pop();pthread_cond_signal(&p_wait);pthread_mutex_unlock(&mutex);return data;
}
以上我们已经完成了阻塞队列 Blockqueue 类的成员函数和成员变量的介绍。接下来,我们将设计生产者线程和消费者线程。
生产者线程对应一个用户态函数 producer ,其返回类型为void ,参数为void* 。主线程通过调用pthread_create 创建生产者线程,并将 Blockqueue 对象作为参数传递给生产者线程,从而使生产者线程能够访问该阻塞队列并调用其 push 成员函数向队列中发送数据。
需要注意的是,生产者线程不仅负责发送数据,还包括对数据的加工处理,并将处理后的数据送入阻塞队列。为模拟数据加工过程,我们设计了一个 Task 类,作为生产者线程向消费者线程传递的数据单元。 Task 类封装了两个整型成员变量,并支持对它们进行加、减、除、取模等运算,运算结果保存在 result 成员变量中。具体执行哪种运算由 Task类中的任务码(taskcode)决定,该任务码为范围在 0 到 3 的整型变量。此外,类中还包含一个整型变量作为标志位(exitcode),用于表示当前计算结果是否正确。
#pragma once
#include <iostream>enum
{Success = 0,div_error,mod_error,unknown,
};class Task
{
public:Task(int _data1 = 1, int _data2 = 1, int _Taskcode = 0): data1(_data1), data2(_data2), Taskcode(_Taskcode){}void run()
{result = 0;exitcode = Success;switch (Taskcode){case 0:result = data1 + data2;break;case 1:result = data1 - data2;break;case 2:if (data2 == 0){exitcode = div_error;break;}result = data1 / data2;break;case 3:if (data2 == 0){exitcode = mod_error;break;}result = data1 % data2;break;default:exitcode = unknown;}
}void gettask()
{switch (Taskcode){case 0:std::cout << data1 << " + " << data2 << " = " << result << " the exitcode is: " << exitcode << std::endl;break;case 1:std::cout << data1 << " - " << data2 << " = " << result << " the exitcode is: " << exitcode << std::endl;break;case 2:std::cout << data1 << " / " << data2 << " = " << result << " the exitcode is: " << exitcode << std::endl;break;case 3:std::cout << data1 << " % " << data2 << " = " << result << " the exitcode is: " << exitcode << std::endl;break;default:std::cout << "unknown task" << std::endl;}
}private:int data1;int data2;int result;int Taskcode;int exitcode;
};
生产者线程的主要工作是生成数据、创建 Task对象,随后,生产者线程将 Task对象送入阻塞队列,并循环执行上述过程。为模拟连续生产行为,我们设定生产者线程在循环中连续发送 5 次数据后退出。
在生成数据时,我们使用随机数生成两个整型操作数,并允许操作数为 0,以便观察标志位在异常情况(如除零)下的表现。
void* producer(void* args)
{Blockqueue<Task>* q = (Blockqueue<Task>*)args;int cnt = 5;while (cnt) {int data1 = rand() % 10;int data2 = rand() % 4;int taskcode = rand() % 5;Task t(data1, data2, taskcode);q->push(t);std::cout << "producer send taskcode " << taskcode << std::endl;sleep(1);cnt--;}std::cout << "producer thread over" << std::endl;return NULL;
}
消费者线程对应一个返回类型为 void 、参数为 void* 的 consumer 函数,同样由主线程通过pthread_create 创建,并接收阻塞队列对象的指针作为参数。消费者线程从阻塞队列中取出Task对象(通过 pop方法),,而 Task类中的 run函数根据任务码执行运算,将结果存入 result成员变量,并根据运算正确性设置标志位(正确为 0,错误为非零)。所以这里随后会调用该对象的 run 方法执行运算,并通过
gettask方法打印运算结果及标志位信息。
void* consumer(void* args)
{Blockqueue<Task>* q = (Blockqueue<Task>*) args;int cnt = 5;while (cnt){Task t = q->pop();t.run();t.gettask();sleep(1);cnt--;}std::cout << "consumer thread over" << std::endl;return NULL;
}
主线程负责创建阻塞队列对象、生产者线程和消费者线程。在本例中,我们创建了 10 个生产者线程和 10 个消费者线程。主线程通过 pthread_create 创建线程时将阻塞队列对象传递给各线程,并维护两个数组分别保存生产者与消费者线程的标识符(tid)。创建完成后,主线程依次调用 pthread_join等待所有线程退出。
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>int main()
{srand(time(NULL));std::vector<pthread_t> producer_thread;std::vector<pthread_t> consumer_thread;Blockqueue<Task> q;for (int i = 0; i < 10; i++)
{pthread_t tid;pthread_create(&tid, NULL, producer, (void*)(&q));producer_thread.push_back(tid);
}for (int i = 0; i < 10; i++)
{pthread_t tid;pthread_create(&tid, NULL, consumer, (void*)(&q));consumer_thread.push_back(tid);
}for (size_t i = 0; i < producer_thread.size(); i++)
{pthread_join(producer_thread[i], NULL);
}for (size_t i = 0; i < consumer_thread.size(); i++)
{pthread_join(consumer_thread[i], NULL);
}return 0;}
源码
Blockqueue.h:
#pragma once
#include<pthread.h>
#include<queue>
#define max_num 10
template<typename T>
class Blockqueue
{
public:Blockqueue(int max_size=max_num):Max_size(max_size) // 使用初始化列表设置队列的最大容量{// 初始化互斥锁,用于线程同步pthread_mutex_init(&mutex,NULL);// 初始化消费者条件变量,用于消费者线程的等待和唤醒pthread_cond_init(&c_wait,NULL);// 初始化生产者条件变量,用于生产者线程的等待和唤醒pthread_cond_init(&p_wait,NULL);}void push(const T& data){// 加锁,确保线程安全pthread_mutex_lock(&mutex);// 当队列已满时,生产者线程等待while(q.size() == Max_size){// 等待消费者发出信号pthread_cond_wait(&p_wait, &mutex);}// 将数据加入队列q.push(data);// 通知等待的消费者线程pthread_cond_signal(&c_wait);// 解锁pthread_mutex_unlock(&mutex);}T pop() // 弹出元素的函数{pthread_mutex_lock(&mutex); // 加锁,确保线程安全// 当队列为空时,等待条件变量,直到队列不为空while(q.size() == 0){pthread_cond_wait(&c_wait, &mutex);}T data = q.front(); // 获取队列头部元素q.pop(); // 从队列中移除头部元素pthread_cond_signal(&p_wait); // 通知可能正在等待的生产者pthread_mutex_unlock(&mutex); // 解锁return data; // 返回获取的元素}
// 析构函数,用于释放互斥锁和条件变量资源~Blockqueue(){// 销毁互斥锁,释放相关资源pthread_mutex_destroy(&mutex);// 销毁消费者等待的条件变量,释放相关资源pthread_cond_destroy(&c_wait);// 销毁生产者等待的条件变量,释放相关资源pthread_cond_destroy(&p_wait);}
private:std::queue<T> q;pthread_mutex_t mutex;pthread_cond_t c_wait;pthread_cond_t p_wait;int Max_size;
};
main.cpp:
#include<iostream>
#include<cstdlib>
#include<time.h>
#include<vector>
#include"Blockqueue.h"
#include"Task.h"
#include<unistd.h>void* consumer(void* args)
{// 将参数转换为任务阻塞队列的指针Blockqueue<Task>* q = (Blockqueue<Task>*) args;// 设置计数器,控制循环次数int cnt = 5;// 当计数器大于0时,继续循环while (cnt){// 从队列中弹出一个任务Task t = q->pop();// 执行任务t.run();// 获取任务信息t.gettask();// 休眠1秒sleep(1);// 计数器减1cnt--;}// 打印消费者线程结束信息std::cout<<"consumer thread over"<<std::endl;// 返回NULLreturn NULL;
}
void* producer(void* args)
{// 将传入的参数转换为任务队列指针Blockqueue<Task>* q = (Blockqueue<Task>*)args;// 设置任务计数器,共生成5个任务int cnt = 5;// 循环生成任务,直到cnt减为0while (cnt) {// 生成随机数data1,范围0-9int data1 = rand() % 10;// 生成随机数data2,范围0-3int data2 = rand() % 4;// 生成随机任务类型,范围0-4int taskcode = rand() % 5;// 创建任务对象Task t(data1, data2, taskcode);// 将任务推入队列q->push(t);// 打印已发送的任务类型std::cout << "producer send taskcode " << taskcode << std::endl;// 休眠1秒,模拟任务生成间隔sleep(1);// 任务计数器减1cnt--;}// 打印生产者线程结束信息std::cout<<"producer thread over"<<std::endl;// 返回NULL,线程结束return NULL;
}
int main()
{// 使用当前时间作为随机数种子,确保每次运行程序时随机数不同srand(time(NULL));// 创建存储生产者线程ID的向量std::vector<pthread_t> producer_thread;// 创建存储消费者线程ID的向量std::vector<pthread_t> consumer_thread;// 创建一个任务阻塞队列Blockqueue<Task> q;// 创建10个生产者线程for (int i = 0; i < 10; i++){pthread_t tid; // 线程ID// 创建生产者线程,传入队列q的指针作为参数pthread_create(&tid, NULL, producer, (void*)(&q));producer_thread.push_back(tid); // 将线程ID添加到生产者线程向量中}// 创建10个消费者线程for (int i = 0; i < 10; i++){pthread_t tid; // 线程ID// 创建消费者线程,传入队列q的指针作为参数pthread_create(&tid, NULL, consumer, (void*)(&q));consumer_thread.push_back(tid); // 将线程ID添加到消费者线程向量中}// 等待所有生产者线程完成执行for (int i = 0; i < producer_thread.size(); i++){pthread_join(producer_thread[i], NULL);}// 等待所有消费者线程完成执行for (int i = 0; i < consumer_thread.size(); i++){pthread_join(consumer_thread[i], NULL);}return 0; // 程序正常退出
}
运行截图:

基于环形队列的生产者-消费者模型
上文介绍了第一种缓冲区实现方式,即基于阻塞队列的方法。接下来我们讨论第二种实现方式——唤醒队列 。在阻塞队列的实现中,我们注意到一个明显的局限性:由于只使用一把互斥锁,任何线程(无论是生产者还是消费者)都必须先竞争该锁,才能访问队列。这就导致生产者和消费者对队列的访问完全是串行化 的。
然而,生产者和消费者在实际操作中除了访问队列(如弹出队首元素或插入队尾元素)之外,还各自执行其他任务。例如,生产者可能需要加工数据,消费者则需要处理数据。这些操作本身并不属于临界区,理论上可以并发执行。但由于阻塞队列的锁机制,这些本可并发的操作也被强制串行化,从而降低了整体效率 。
从数据结构角度看,如果阻塞队列基于链表实现,并且队列中包含多个节点,那么生产者和消费者可能同时访问不同的节点(生产者操作队尾,消费者操作队首),此时并发执行并不会引发数据竞争。但为了考虑更一般的情况(例如队列基于数组实现,或队列中只有一个节点),我们必须保证生产者与消费者之间的互斥访问。这种保守的同步策略虽然安全,却不可避免地带来性能损失。
为了解决上述问题,我们引入环形队列作为一种更高效的缓冲区实现方式。在深入其实现原理之前,首先需要了解一个关键的前置知识——信号量 (Semaphore)机制。
信号量
在上文中,我们了解到 阻塞队列 存在一个明显的缺陷,即生产者与消费者之间的访问无法完全并行化。其根本原因在于,我们将 阻塞队列 这一共享资源视为一个 整体 进行处理。既然被视为 整体,对该资源的访问就只能以 串行方式进行。接下来,我将介绍信号量 的核心思想,并继续通过比喻帮助读者快速建立对信号量 的理解。
回顾之前讨论的 阻塞队列 ,我们可以将其类比为一个自习室。该自习室仅能供一名学生使用。为确保访问的独占性,我们在门口设置一把锁,由学生竞争这把锁的钥匙,遵循先到先得原则。先获得钥匙的学生即可进入自习室并使用它。
现在考虑另一种场景:自习室不再是一个整体、一次仅供一人使用,而是被划分为多个规格相同的 小隔间 ,每个 隔间 仅允许一人进入并使用。在这种情况下,原有的管理机制——即在门口设置一把锁,由学生竞争唯一的一把钥匙——就不再适用。
原因很明显:每个学生只想使用其中的某一个隔间,而不是整个自习室。假设学生A希望使用隔间1,学生B希望使用隔间2。如果学生A获得了钥匙,则只有A能够进入自习室,而B被拒之门外。然而实际上,A和B使用的是不同的隔间,理论上应允许他们同时进入。
因此,我们需要调整管理策略。我们不再维护一把锁和对应的钥匙,而是设置一个 计数器 ,用于记录当前自习室中剩余的空闲隔间数量。此时,学生竞争的不再是锁,而是这个 计数器 ,同样遵循先到先得原则。若 计数器 不为零,表示仍有空闲隔间,竞争成功的学生只需将 计数器 减一,即可进入自习室,并任意选择一个空闲隔间进行使用。
若 计数器 为零,则说明所有隔间已被占用,没有多余空位。此时,后续到达的学生只能在门外等待,直到有学生使用完毕并离开隔间。注意,这里并非“释放锁”,而是将 计数器 加一,表示空闲隔间数量增加了一个。
在这种机制下,我们通过计数器实现资源的管理,而这个 计数器 本就是 信号量 。因此,信号量 的本质是一个 计数器 。
重新审视上述例子,我们可以进一步梳理出几个关键细节:
第一,自习室被划分为多个隔间,相当于将共享资源拆分为更细粒度的单元。每个隔间作为一个共享资源单元,仍需保证互斥访问。与互斥锁的场景相比,此处存在多个资源单元,使得多个线程能够在同一时刻并发且互斥地访问不同的资源单元,从而显著提高系统的并发度和运行效率。相反,互斥锁在任何时刻只允许一个线程访问资源。从信号量的视角来看,互斥锁可被视为信号量的一个特例:在互斥锁适用的场景中,相当于自习室只有一个隔间,或者说这唯一的隔间就是自习室本身。因此,互斥锁本质上也是一个计数器,其取值仅为0或1,故也被称为二元信号量 。理解这一点是掌握信号量 的第一个关键。
第二,信号量所维护的计数器仅记录空闲资源的数量,而不关心具体是哪个隔间。这意味着,自习室中的每个隔间必须具有完全相同的属性(如面积、设施等)。学生只关心是否有空闲隔间,而不在意具体选择哪一个,因为所有隔间是无差别的。这一点也引出一个重要推论:如果自习室中存在属性不同的隔间(例如大小两种类型),则需要分别维护两个信号量,分别记录不同类型隔间的数量。这是掌握信号量的第二个关键细节。
第三,尽管每个隔间(即每个共享资源单元)必须保证互斥访问,但信号量作为计数器,仅控制线程能否获得资源访问权限,而不负责具体分配哪一个资源单元。一旦线程成功申请到信号量,它具体访问哪个资源单元,应由程序员在代码逻辑中进行管理,而非由信号量机制决定。如果线程错误地访问已被占用的资源,所产生的后果与信号量本身无关。理解信号量的这一职责边界,是掌握其使用的第三个关键细节。
在建立起对信号量 基本概念的理解后,接下来我们将进一步探讨如何在代码层面实际使用信号量。
信号量 与锁(mutex)和条件变量(condition variable)类似,本质上是一个同步变量。在Linux中,信号量 对应sem_t 类型,其实现通常是一个联合体(union):
#include <semaphore.h>
#define __SIZEOF_SEM_T 32 /* 确保足够存储整个结构 */typedef union {struct
{int __value; /* 信号量的当前值 */int __pad; /* 填充对齐 */pthread_mutex_t __lock; /* 内部互斥锁 */pthread_cond_t __cond; /* 内部条件变量 */// 其他内部字段...
}__data;char __size[__SIZEOF_SEM_T]; // 确保足够大小的存储空间long __align; // 对齐字段
} sem_t;
实际Linux内核中的实现可能更为复杂,但上述结构已足够帮助我们理解信号量的基本工作原理。信号量的属性存储在该联合体的结构体字段中。其中,__value 是最核心的字段,作为计数器使用。
由于信号量是共享资源,会面临并发访问。为避免数据竞争,信号量内部维护了一把互斥锁(__lock ),用于保证对信号量操作的原子性;同时包含一个条件变量(__cond ),用于管理等待队列——当条件未就绪(即当前无可用资源)时,将线程置于等待状态。理解这几个字段,有助于掌握信号量相关函数的底层机制。
在使用信号量之前,需先进行初始化。初始化分为静态初始化和动态初始化两种方式。
静态初始化通过宏函数实现,该宏接收一个参数用于初始化计数器字段,其他字段通常初始化为0。注意:若使用SEM_INITIALIZER 时不提供参数,将导致编译错误。
#define SEM_INITIALIZER(value) { 0, (value), 0, 0, 0, 0 }// 使用示例(此为宏,非函数调用)
sem_t sem1 = SEM_INITIALIZER(3);
静态初始化通常用于全局或静态信号量,此类信号量无需显式销毁。
第二种方式为动态初始化,通过调用sem_init 函数实现:
sem_init- 头文件:<semaphore.h>
- 声明:int sem_init(sem_t *sem, int pshared, unsigned int value);
- 返回值:成功返回0,失败返回-1(并设置
errno)
-
参数说明:
-
sem_t:指向待初始化信号量的指针(输入参数) -
pshared:指定共享范围。0表示线程间共享,非0表示进程间共享 -
value:信号量计数器的初始值
动态初始化的信号量必须显式销毁,通过调用sem_destroy 函数释放可能关联的资源(如进程间共享信号量涉及的共享内存):
sem_destroy- 头文件:<semaphore.h>
- 声明:int sem_destroy(sem_t *sem);
- 返回值:成功返回0,失败返回-1(并设置
errno)
-
信号量的核心操作可归纳为P(等待/申请资源)和V(释放资源)两种:
-
P操作:计数器减1,对应
sem_wait函数 -
V操作:计数器加1,对应
sem_post函数
sem_wait 函数说明:
sem_wait- 头文件:<semaphore.h>
- 声明:int sem_wait(sem_t *sem);
- 返回值:成功返回0,失败返回-1(并设置
errno)
sem_wait是典型的P操作函数,其内部逻辑如下:
- 对信号量内部互斥锁加锁
- 检查计数器值:
- 若大于0,立即减1并释放锁
- 若等于0,将当前线程加入条件变量的等待队列,进入阻塞状态
- 被唤醒后重新检查条件,满足时完成减1操作并释放锁
// sem_wait 伪代码示意
int sem_wait(sem_t *sem) {pthread_mutex_lock(&sem->__lock); // 获取内部锁while (sem->__value <= 0) { // 循环检查避免虚假唤醒pthread_cond_wait(&sem->__cond, &sem->__lock);
}sem->__value--; // 原子性减少信号量值
pthread_mutex_unlock(&sem->__lock); // 释放内部锁
return 0;}
sem_post函数说明:
- 函数名:
sem_post- 头文件:<semaphore.h>
- 声明:int sem_post(sem_t *sem);
- 返回值:成功返回0,失败返回-1(并设置
errno)
sem_post的执行流程为:
- 获取内部互斥锁
- 将计数器加1
- 若有线程在条件变量上等待,则唤醒其中一个(或全部,取决于实现)
- 释放内部锁
// sem_post 伪代码示意
int sem_post(sem_t *sem) {pthread_mutex_lock(&sem->__lock); sem->__value++;
if (有线程等待于__cond) {pthread_cond_signal(&sem->__cond); // 或使用broadcast取决于策略
}pthread_mutex_unlock(&sem->__lock);
return 0;}
理解信号量的基本概念和操作函数后,即可进一步实现基于环形队列(ring buffer)的生产者-消费者模型。
原理
那么,环形队列 既可以通过链表实现,也可以通过数组实现。本文采用数组来实现环形队列。在这种实现中,生产者和消费者分别通过数组索引进行数据的写入和访问。初始状态下队列为空,生产者和消费者持有的索引均指向数组的起始位置。
首先我们分析生产者与生产者之间能否并发 访问环形队列 。生产者访问环形队列 的过程是:访问其当前索引对应的数组位置,将数据写入该位置,然后移动索引(即执行索引递增)。由于是环形队列 且基于数组实现,索引递增后需对数组大小取模,以防止越界。需要注意的是,索引递增操作并非原子操作,可能被中断,因此生产者之间必须互斥 地访问环形队列 。
对于消费者而言,其访问过程为:访问消费者索引所指向的数组位置,读取数据后同样对索引进行递增并取模。同理,由于索引递增操作的非原子性,消费者之间也必须互斥访问环形队列 。
接下来讨论生产者与消费者之间是否能够并发 访问。我们通过两个边界 条件和一个一般 条件进行分析,这两个边界条件分别是队列为空和队列为满,一般条件为队列既不为空也不为满,这三种情况覆盖了环形队列的所有状态。
首先分析第一个边界条件——队列为空。由于队列中无有效数据,消费者无法读取,此时需要一种同步机制:当队列为空时,消费者应被阻塞,直至生产者写入数据后将其唤醒。
第二个边界条件为队列已满。此时生产者无法继续写入,需等待消费者读取数据腾出空间,因此生产者应被阻塞,待消费者消费后唤醒生产者。这也是一种同步机制。
在一般情况下,队列既非空也非满,生产者和消费者的索引位于数组的不同位置。因为当队列为空或为满时,生产者和消费者的索引会重合;而在一般条件下,两者索引不同,意味着它们访问的是不同的内存区域。数据不一致问题的产生需同时满足两个条件:多线程并发访问、访问同一内存区域且操作非原子。由于此处生产者和消费者访问的区域不同,即使并发执行也不会引发数据一致性问题,因此可以并发访问。
综上所述,在队列为空或为满的边界条件下,生产者与消费者之间存在同步需求。同步机制建立在互斥基础上,即一方需等待另一方满足条件后被唤醒。一旦条件满足(进入一般情况),两者便可并发访问。因此,在队列为空或为满时,生产者与消费者之间是互斥的;其余情况下可并发执行。
由于生产者之间、消费者之间必须互斥访问,我们需要两把互斥锁,分别用于生产者和消费者。此外,在队列为空或为满时还需同步机制,此时信号量便可发挥作用。
当队列已满时,生产者线程应被阻塞,等待消费者线程读取数据后唤醒。信号量本身支持同步机制,因其内部维护了条件变量。我们将为生产者线程设置一个信号量,记录当前空闲位置的数量。若该信号量为0,生产者线程将被阻塞,并置于相应条件变量的等待队列中,直至条件就绪后被唤醒。同样,为消费者线程设置一个信号量,记录当前已写入的元素数量。若该信号量为0,消费者线程将被阻塞,并等待条件就绪后被唤醒。
基于上述分析,我们可以设计Ringqueue 类。该类需包含两把互斥锁和两个信号量。由于环形队列基于数组实现,我们使用vector 容器存储数据,并用一个整型变量记录数组容量。此外,Ringqueue 应设计为模板类,以支持不同的数据类型。
template<typename T>
class Ringqueue
{
public:Ringqueue(int max_num = max_size){Max_size = max_num;rq.resize(Max_size);pthread_mutex_init(&c_mutex, NULL);pthread_mutex_init(&p_mutex, NULL);c_index = 0;p_index = 0;sem_init(&element, 0, 0);sem_init(&space, 0, Max_size);}
~Ringqueue()
{pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);sem_destroy(&element);sem_destroy(&space);
}// 其他成员函数...private:std::vector<T> rq;int Max_size;int c_index;int p_index;pthread_mutex_t c_mutex;pthread_mutex_t p_mutex;sem_t element;sem_t space;
};
接下来我们将实现唤醒队列核心的两个成员函数:push和pop 。其中,push函数用于向环形队列中插入数据,而pop 函数用于从环形队列中取出数据。由此可知,生产者线程将调用push 函数,消费者线程将调用
pop 函数。
首先讲解push函数的实现原理。一个关键问题是:应该先申请信号量,还是先申请互斥锁?
部分读者可能认为应先申请互斥锁,而另一部分则认为应先申请信号量。正确答案是先申请信号量,理由如下:
- 信号量本身的 P/V 操作是原子的,无需互斥锁保护。
- 如果线程在申请互斥锁时失败,会进入阻塞状态。若此时先申请互斥锁,即使信号量资源可用,线程也会因无法获得锁而阻塞,从而无法继续执行信号量申请,降低了并发效率。
- 先申请信号量可以缩短锁的持有时间。线程在获取信号量之后才尝试加锁,进入临界区后仅执行数据写入操作,完成后立即释放锁,有助于提高性能。
因此,push函数的执行流程为:
- 调用
sem_wait申请生产者信号量(space)。 - 获取互斥锁(
p_mutex)。 - 将数据写入生产者索引(
p_index)对应的数组位置。 - 更新生产者索引。
- 释放互斥锁。
- 调用
sem_post增加消费者信号量(element),表示有新数据可消费。
代码实现如下:
void push(const T& data)
{sem_wait(&space);pthread_mutex_lock(&p_mutex);rq[p_index] = data;p_index = (p_index + 1) % rq.size();pthread_mutex_unlock(&p_mutex);sem_post(&element);
}
消费者端的pop 函数实现逻辑类似:
- 调用
sem_wait等待消费者信号量(element)。 - 获取消费者互斥锁(
c_mutex)。 - 读取消费者索引(
c_index)处的数据。 - 更新消费者索引。
- 释放互斥锁。
- 调用
sem_post增加生产者信号量(space),表示有空位可继续生产。 - 返回读取的数据。
代码实现如下:
T pop()
{sem_wait(&element);pthread_mutex_lock(&c_mutex);T data = rq[c_index];c_index = (c_index + 1) % rq.size();pthread_mutex_unlock(&c_mutex);sem_post(&space);return data;
}
生产者线程的执行入口为producer 函数,其类型为void*()(void*) ,参数为主线程通过 pthread_create传递的环形队列对象指针。该函数的核心逻辑是生成任务数据,构造Task 对象,并调用
push 将其加入队列。示例中生产者生成两个随机整数和任务码,重复执行 5 次。
void* producer(void* args)
{Ringqueue<Task>* q = (Ringqueue<Task>*)args;int cnt = 5;while (cnt) {int data1 = rand() % 10;int data2 = rand() % 4;int taskcode = rand() % 4;Task t(data1, data2, taskcode);q->push(t);std::cout << "producer send taskcode " << taskcode << std::endl;sleep(1);cnt--;}std::cout << "producer thread over" << std::endl;return NULL;
}
消费者线程的执行入口为consumer 函数,类型同样为 void*()(void*)。其核心逻辑是通过
pop获取任务,执行任务并输出结果,重复 5 次。
void* consumer(void* args)
{Ringqueue<Task>* q = (Ringqueue<Task>*)args;int cnt = 5;while (cnt){Task t = q->pop();t.run();t.gettask();sleep(1);cnt--;}std::cout << "consumer thread over" << std::endl;return NULL;
}
主线程负责创建环形队列对象,并启动多个生产者和消费者线程。本示例中创建了 10 个生产者线程和 10 个消费者线程,将环形队列对象的地址传递给各线程,并等待所有线程执行结束。
int main()
{srand(time(NULL));std::vector<pthread_t> producer_thread;std::vector<pthread_t> consumer_thread;Ringqueue<Task> q;for (int i = 0; i < 10; i++)
{pthread_t tid;pthread_create(&tid, NULL, producer, (void*)(&q));producer_thread.push_back(tid);
}for (int i = 0; i < 10; i++)
{pthread_t tid;pthread_create(&tid, NULL, consumer, (void*)(&q));consumer_thread.push_back(tid);
}for (int i = 0; i < 10; i++)
{pthread_join(producer_thread[i], NULL);
}for (int i = 0; i < 10; i++)
{pthread_join(consumer_thread[i], NULL);
}return 0;}
源码
Ringqueue.h:
#pragma once
#include<pthread.h>
#include<semaphore.h>
#include<vector>
#define max_size 10
template<typename T>
class Ringqueue
{
public:Ringqueue(int max_num=max_size){ // 构造函数,可指定队列最大容量,默认为max_sizeMax_size = max_num; // 设置队列的最大容量rq.resize(Max_size); // 调整队列容器的大小为Max_sizepthread_mutex_init(&c_mutex,NULL); // 初始化消费者互斥锁pthread_mutex_init(&p_mutex,NULL); // 初始化生产者互斥锁c_index = 0; // 初始化消费者索引为0p_index = 0; // 初始化生产者索引为0sem_init(&element, 0, 0); // 初始化信号量element,初始值为0,表示初始没有可用元素sem_init(&space, 0, Max_size); // 初始化信号量space,初始值为Max_size,表示初始有Max_size个可用空间}void push(const T& data) // 压入数据的函数,参数为要压入的数据{sem_wait(&space); // 等待空间信号量,表示队列中还有可用空间pthread_mutex_lock(&p_mutex); // 加锁,确保线程安全rq[p_index] = data; // 将数据存入队列的当前位置p_index=(p_index+1)%rq.size(); // 更新写入位置索引,使用模运算实现循环队列pthread_mutex_unlock(&p_mutex); // 解锁sem_post(&element); // 发送元素信号量,表示队列中有新元素}T pop() // 从队列中弹出元素的函数{sem_wait(&element); // 等待有元素的信号量,表示队列中至少有一个元素pthread_mutex_lock(&c_mutex); // 加锁,确保线程安全T data = rq[c_index]; // 获取当前索引位置的元素c_index=(c_index+1)%rq.size(); // 更新消费者索引,循环使用队列空间pthread_mutex_unlock(&c_mutex); // 解锁sem_post(&space); // 发送空间可用信号,表示队列中又多了一个空位return data; // 返回取出的元素}~Ringqueue(){// 销毁消费者互斥锁pthread_mutex_destroy(&c_mutex);// 销毁生产者互斥锁pthread_mutex_destroy(&p_mutex);// 销毁元素信号量(表示可用元素的数量)sem_destroy(&element);// 销毁空间信号量(表示可用空间的大小)sem_destroy(&space);}
private:std::vector<T> rq;int Max_size;int c_index;int p_index;pthread_mutex_t c_mutex;pthread_mutex_t p_mutex;sem_t element;sem_t space;
};
main.cpp:
#include<iostream>
#include<cstdlib>
#include<time.h>
#include<vector>
#include"Ringqueue.h"
#include"Task.h"
#include<unistd.h>void* consumer(void* args)
{// 将参数转换为环形队列指针Ringqueue<Task>* q = (Ringqueue<Task>*) args;// 设置循环计数器,初始值为5int cnt = 5;// 当计数器大于0时循环执行while (cnt){// 从队列中取出一个任务Task t = q->pop();// 执行任务t.run();// 获取任务信息t.gettask();// 休眠1秒sleep(1);// 计数器减1cnt--;}// 输出消费者线程结束信息std::cout<<"consumer thread over"<<std::endl;return NULL;
}
void* producer(void* args)
{// 将参数转换为Ringqueue<Task>指针Ringqueue<Task>* q = (Ringqueue<Task>*)args;int cnt = 5; // 生产5个任务后结束// 循环生成任务,直到cnt减为0while (cnt) {// 生成随机数据1 (0-9)int data1 = rand() % 10;// 生成随机数据2 (0-3)int data2 = rand() % 4;// 生成随机任务码 (0-3)int taskcode = rand() % 4;// 创建任务对象Task t(data1, data2, taskcode);// 将任务推入队列q->push(t);// 输出生产信息std::cout << "producer send taskcode " << taskcode << std::endl;// 休眠1秒sleep(1);// 任务计数器减1cnt--;}// 输出生产者线程结束信息std::cout<<"producer thread over"<<std::endl;return NULL;
}int main()
{// 初始化随机数种子,用于任务生成srand(time(NULL));// 创建生产者线程和消费者线程的容器std::vector<pthread_t> producer_thread;std::vector<pthread_t> consumer_thread;// 创建一个环形队列实例,用于任务传递Ringqueue<Task> q;// 创建10个生产者线程for (int i = 0; i < 10; i++){pthread_t tid; // 线程ID// 创建生产者线程,传入环形队列的指针作为参数pthread_create(&tid, NULL, producer, (void*)(&q));producer_thread.push_back(tid); // 将线程ID存入容器}// 创建10个消费者线程for (int i = 0; i < 10; i++){pthread_t tid; // 线程ID// 创建消费者线程,传入环形队列的指针作为参数pthread_create(&tid, NULL, consumer, (void*)(&q));consumer_thread.push_back(tid); // 将线程ID存入容器}// 等待所有生产者线程执行完毕for (int i = 0; i < 10; i++){pthread_join(producer_thread[i], NULL);}// 等待所有消费者线程执行完毕for (int i = 0; i < 10; i++){pthread_join(consumer_thread[i], NULL);}return 0;
}
运行截图:

线程池
原理
接下来我将实现一个Linux环境下的线程池项目。基于前文对生产者-消费者模型的实现经验,线程池的实现将会更加容易。这是因为线程池本质上可以视为生产者-消费者模型的一个特例。
我们要实现的线程池属于“单生产者-多消费者”模型。其核心思想是将缓冲区与多个消费者线程集中管理,形成一个统一的结构。在这个设计中,生产者线程是主线程,负责创建线程池并将任务提交到池中。线程池内部则负责维护一个任务缓冲区以及多个消费者线程,这些线程会主动从缓冲区中获取任务并执行。
我们可以通过一个生活化的类比来理解线程池的工作原理:想象一个农村的养猪场景。养猪人(生产者)将饲料(任务)倒入饲料池(缓冲区),之后即可离开。猪(消费者线程)在饲料可用时会自行前来进食(处理任务)。养猪人只需在饲料被吃完后再次添加,无需干预具体的进食过程。这个过程形象地体现了线程池中任务提交与执行的解耦特性。
从设计角度来看,线程池需要承担以下职责:管理消费者线程的生命周期,维护线程执行上下文,以及提供一个任务缓冲区。在本实现中,缓冲区仍然采用环形队列结构。
基于以上分析,我们可以勾勒出 threadpool 类的基本框架。该类将包含以下成员:
- 一个
vector<Task>类型的容器,作为环形队列缓冲区; - 一个
vector<Thread_info>容器,用于记录所创建的消费者线程信息; - 用于线程同步的互斥锁和信号量:由于是单生产者-多消费者模型,只需一个互斥锁保护消费者端的竞争;
- 生产者和消费者的位置索引(
p_index和c_index); - 其他辅助变量,如线程数量、缓冲区容量等。
此外,我们定义一个 Thread_info 类,用于保存每个线程的标识符和名称信息,便于后续监控与调试。
以下是初步的类定义与结构代码:
class Thread_info
{
public:Thread_info(pthread_t _tid = 0, int num = 0): tid(_tid){threadname = threadname = "I am child thread :" + std::to_string(num);}
public:std::string threadname;pthread_t tid;
};class threadpool
{
public:threadpool(int max_thread_num = max_size, int max_task_size = max_size): Max_size(max_thread_num), c_index(0), p_index(0), Max_task_size(max_task_size){q.resize(Max_task_size);thread.resize(Max_size);pthread_mutex_init(&mutex, NULL);sem_init(&element, 0, 0);sem_init(&space, 0, Max_task_size);}~threadpool()
{pthread_mutex_destroy(&mutex);sem_destroy(&element);sem_destroy(&space);
}// 后续将补充 push等成员函数...private:std::vector<Task> q; // 任务队列(环形缓冲区)int Max_size; // 最大线程数std::vector<Thread_info> thread; // 线程信息记录pthread_mutex_t mutex; // 消费者互斥锁int Max_task_size; // 缓冲区容量int c_index; // 消费者索引int p_index; // 生产者索引sem_t element; // 任务资源信号量sem_t space; // 空闲空间信号量
};
接下来,我们将进一步实现任务提交(push)等关键成员函数,完善线程池的整体逻辑。
上文提到,线程池负责管理消费者线程,这意味着所有消费者线程的创建均由线程池完成。为此,线程池提供了一个 start 函数,供生产者线程(即主线程)调用。
start 函数内部通过调用 pthread_create函数创建一批线程。由于 threadpool 类内部维护了一个表示最大线程数的属性 Max_size ,因此此处将创建 Max_size 个消费者线程。
需要注意的是,线程池不仅需要创建消费者线程,还需为这些线程提供执行上下文。这里需注意pthread_create 函数的规定:线程的执行函数(即用户态函数)必须具有 void* 返回类型和 void* 参数类型。如果向 pthread_create 传递一个非静态成员函数,由于非静态成员函数会隐式传递 this 指针,其参数列表不符合要求。因此,线程池必须提供一个静态成员函数作为线程的执行上下文。
但这样做会带来另一个问题:消费者线程内部需要调用线程池的 pop 函数,以从任务列表(即环形队列)中读取任务。若将执行函数定义为静态成员函数,则由于静态成员函数无法访问类的非静态成员变量(缺少 this 指针),在调用 pthread_create 时,需将 this 指针作为参数传递给该静态成员函数。
static void* handlertask(void* args)
{threadpool* tp = (threadpool*)args;while (1) {Task task = tp->pop();task.run();tp->printinfo(pthread_self());task.gettask();}return NULL;
}void start()
{for (int i = 0; i < thread.size(); i++) {pthread_t tid;pthread_create(&tid, NULL, handlertask, this);thread[i] = Thread_info(tid, i);}
}
接下来是 pop 函数的实现。其基本原理是:首先通过信号量等待可用任务,然后加锁。加锁成功后,通过消费者索引读取数据并保存,随后解锁,并更新生产者信号量(表示空闲空间增加),最后返回保存的数据。
Task pop()
{sem_wait(&element);pthread_mutex_lock(&mutex);Task data = q[c_index];c_index = (c_index + 1) % Max_task_size;pthread_mutex_unlock(&mutex);sem_post(&space);return data;
}
在实现 pop 函数后,为了便于观察线程执行情况,我们在 threadpool 类中定义了一个 printinfo 函数。该函数通过遍历存储线程信息的数组,根据当前线程的 ID 匹配并打印对应的线程名称信息。
void printinfo(pthread_t _tid)
{for (int i = 0; i < thread.size(); i++) {if (thread[i].tid == _tid) {std::cout << thread[i].threadname.c_str() << std::endl;return;}}
}
因此,消费者线程的执行逻辑是:不断检查是否有待处理的任务(通过调用 pop 函数获取 Task 对象),然后调用 Task对象的 run 方法处理任务,最后通过 printinfo 和 gettask 函数输出相关信息。
最后是 push 函数的实现。由于该函数仅由主线程调用,其功能是将数据写入环形队列中生产者索引对应的位置,并更新消费者信号量以通知有新任务可用。
void push(const Task& T)
{sem_wait(&space);pthread_mutex_lock(&mutex);q[p_index] = T;p_index = (p_index + 1) % Max_task_size;pthread_mutex_unlock(&mutex);sem_post(&element);
}
在主线程中,首先创建一个 threadpool 对象,并调用其 start 函数初始化消费者线程。随后,主线程进入循环,生成随机数据并创建对应的 Task 对象,通过 push 函数将任务提交到线程池的环形队列中。
int main()
{threadpool tp;srand(time(NULL));tp.start();while (true) {int data1 = rand() % 10;int data2 = rand() % 10;int taskcode = rand() % 4;Task t(data1, data2, taskcode);std::cout << "main thread send Task:" << taskcode << std::endl;tp.push(t);usleep(100000);}return 0;
}
源码
Threadpool.h
#include<pthread.h>
#include<semaphore.h>
#include<string>
#include<vector>
#include"Task.h"
#define max_size 10
class Thread_info
{
public: Thread_info(pthread_t _tid=0,int num=0):tid(_tid){ threadname = "I am child thread :" + std::to_string(num);}
public: std::string threadname;pthread_t tid;
};
class threadpool
{
public:threadpool(int max_num = max_size, int max_task_size = max_size):Max_size(max_num) // 初始化最大线程数,c_index(0) // 初始化消费者索引,用于任务队列的消费,p_index(0) // 初始化生产者索引,用于任务队列的生产,Max_task_size(max_task_size) // 初始化任务队列的最大容量{// 调整任务队列的大小为Max_task_sizeq.resize(Max_task_size);// 调整线程容器的大小为Max_sizethread.resize(Max_size);// 初始化互斥锁,用于线程同步pthread_mutex_init(&mutex,NULL);// 初始化信号量element,用于表示任务队列中的任务数量sem_init(&element,0,0);// 初始化信号量space,用于表示任务队列中的剩余空间sem_init(&space,0,Max_task_size);}static void* handlertask(void* args){// 将传入的参数转换为线程池指针threadpool* tp = (threadpool*)args;// 无限循环,持续从任务队列中获取并执行任务while(1){// 从线程池中获取一个任务Task task=tp->pop();// 执行任务task.run();// 打印当前线程信息tp->printinfo(pthread_self());// 获取任务信息task.gettask();}// 理论上不会执行到这里,因为线程在无限循环中运行return NULL;}Task pop() // 定义一个返回类型为Task的pop函数{sem_wait(&element); // 等待信号量element,表示等待队列中有元素可取pthread_mutex_lock(&mutex); // 加互斥锁,保证线程安全Task data=q[c_index]; // 获取当前索引位置的任务c_index=(c_index+1)%Max_task_size; // 更新消费索引,循环使用队列空间pthread_mutex_unlock(&mutex); // 解互斥锁sem_post(&space); // 释放信号量space,表示队列中有可用空间return data; // 返回获取的任务}void push(const Task& T){sem_wait(&space); // 等待空间信号量,确保队列有可用空间q[p_index]=T; // 将任务存入队列的当前位置p_index=(p_index+1)%Max_task_size; // 更新写入位置,使用模运算实现循环队列sem_post(&element); // 释放元素信号量,通知消费者有新任务可用}
/*** 启动多个线程执行任务* 该函数会创建指定数量的线程,并将线程信息存储到thread容器中*/void start(){// 遍历thread容器,为每个元素创建一个线程for (int i = 0; i < thread.size(); i++){// 创建线程ID变量pthread_t tid;// 创建新线程,handlertask为线程处理函数,this作为参数传递给线程函数pthread_create(&tid, NULL, handlertask, this);// 将创建的线程信息(线程ID和索引)存储到thread容器中thread[i] = Thread_info(tid, i);
}}
// 析构函数,用于销毁线程池~threadpool(){// 销毁互斥锁,释放相关资源pthread_mutex_destroy(&mutex);// 销毁信号量element,用于通知元素可用sem_destroy(&element);// 销毁信号量space,用于通知空间可用sem_destroy(&space);}
private:void printinfo(pthread_t _tid){ // 函数:打印线程信息// 遍历线程容器for (int i = 0; i < thread.size(); i++){// 检查当前线程是否与目标线程ID匹配if (thread[i].tid == _tid){// 打印匹配到的线程名称std::cout << thread[i].threadname.c_str() << std::endl;return; // 找到后立即返回}}}std::vector<Task> q;int Max_size;std::vector<Thread_info> thread;pthread_mutex_t mutex;int Max_task_size;int c_index;int p_index;sem_t element;sem_t space;
};
main.cpp:
#include<iostream>
#include<cstdlib>
#include<time.h>
#include<vector>
#include"Task.h"
#include"Threadpool.h"
#include<unistd.h>int main()
{// 创建线程池对象threadpool tp;// 设置随机数种子,用于生成随机任务srand(time(NULL));// 启动线程池tp.start();// 无限循环,持续生成任务while (true){// 生成两个随机数(0-9)作为任务数据int data1 = rand() % 10;int data2 = rand() % 10;// 生成任务类型代码(0-3)int taskcode = rand() % 4;// 创建任务对象Task t(data1, data2, taskcode);// 输出任务信息std::cout<<"main thread send Task:"<<taskcode<<std::endl;// 将任务推入线程池tp.push(t);// 暂停100毫秒(0.1秒)usleep(100000);}return 0;
}
运行截图:

补充知识
单例模式的线程安全
我们知道单例模式是 C++ 设计模式中的一种,其核心目标是确保一个类只能被实例化一次,即全局范围内仅存在该类的唯一实例。为实现这一目标,首先需要将类的构造函数私有化。因为无论是创建栈对象、使用new
运算符创建堆对象,还是声明静态对象,都会调用构造函数。将构造函数私有化后,外部无法直接创建该类的任何对象。
接下来,需要向类外提供一个公有的静态成员函数作为接口。由于该静态成员函数位于类内部,它可以访问类中的私有静态成员变量。同时,由于静态成员函数不依赖于对象实例(即无需传递this 指针),因此可以在类外直接调用。在该静态成员函数中,我们返回一个指向类内部私有静态成员变量的引用,而该有私有静态成员变量就是该类对应的静态对象。此外,为避免通过拷贝构造、移动构造或赋值操作创建新的实例,必须显式禁用这些函数,从而保证实例的唯一性。以上即为单例模式中“饿汉模式”的基本实现原理。
class myclass {
public:static myclass& GetInstance() { // 静态成员函数,返回实例的引用return singleton;}size_t GetMember() {return a;}myclass(const myclass&) = delete; // 禁用拷贝构造myclass& operator=(const myclass&) = delete; // 禁用拷贝赋值运算符~myclass() {std::cout << "Hunger::~myclass()" << std::endl;}
private:size_t a;static myclass singleton; // 声明静态成员变量myclass(int _a = 4) : a(_a) {} // 私有构造函数
};
myclass myclass::singleton; // 在全局作用域定义(在 main 函数之前初始化)
除了饿汉模式,单例模式还有另一种常见实现:“懒汉模式”。懒汉模式在类内部声明一个私有静态成员变量,该变量为指向本类类型的指针,同时将构造函数私有化,以防止外部创建对象。随后,在类中定义一个公有的静态成员函数,该函数首先检查静态指针是否为空。若为空,则在堆上动态创建一个对象,并将指针指向该对象;若不为空,则直接返回现有对象的引用。由于静态成员函数不依赖于对象实例,因此可在类外直接调用。同样地,为避免通过拷贝或赋值操作创建新实例,需禁用拷贝构造函数、移动构造函数及对应的赋值运算符。
class myclass {
public:myclass(const myclass&) = delete;myclass& operator=(const myclass&) = delete;static myclass& GetInstance() {if (_ptr == nullptr) {_ptr = new myclass;}return *_ptr; // 返回引用,避免拷贝}// ...(其余成员函数保持不变)~myclass() {delete _ptr;std::cout << "lazy1::~myclass()" << std::endl;}
private:myclass() = default;static myclass* _ptr;
};
myclass* myclass::_ptr = nullptr; // 显式初始化为 nullptr
饿汉模式与懒汉模式的主要区别在此不再赘述,可参考我之前的博客文章以获取更详细的对比分析。在先前介绍单例模式时,我们尚未引入多线程环境下的考量。而在多线程场景下,多个线程可能并发调用单例类的静态成员函数以获取实例,这就可能引发线程安全问题。
对于懒汉模式而言,其静态成员函数中的实例创建过程并非原子操作。如果某个线程在执行指针判空操作后、实际创建对象前被切换,另一个线程也可能同时进入该函数并且也通过了指针判空操作。当线程被重新调度并恢复执行时,它会继续执行后续的对象创建代码,从而相继创建各自的实例,这就违反了单例模式对实例唯一性的要求。因此,必须通过互斥锁(Mutex)对实例创建的关键代码段进行同步保护,确保同一时间仅有一个线程能够执行创建实例的操作。
由于静态成员函数只能访问静态成员变量,因此互斥锁也需声明为静态成员,并在类外进行初始化。在 C++ 中,可使用PTHREAD_MUTEX_INITIALIZER 对 POSIX 互斥锁进行静态初始化。
#include <pthread.h>
class myclass {
public:myclass(const myclass&) = delete;myclass& operator=(const myclass&) = delete;static myclass& GetInstance() {pthread_mutex_lock(&mutex);if (_ptr == nullptr) {_ptr = new myclass;}pthread_mutex_unlock(&mutex);return *_ptr;}// ...(其余成员函数保持不变)~myclass() {delete _ptr;std::cout << "lazy1::~myclass()" << std::endl;}
private:myclass() = default;static myclass* _ptr;static pthread_mutex_t mutex;
};
myclass* myclass::_ptr = nullptr;
pthread_mutex_t myclass::mutex = PTHREAD_MUTEX_INITIALIZER;
引入互斥锁后,可在多线程环境下保证单例类的唯一性。然而,上述实现仍存在一处可优化点:一旦单例实例已被创建,后续线程在调用GetInstance() 时无需再尝试加锁,因为实例已存在,加锁操作只会带来不必要的性能开销。为此,可采用双重检查锁定(Double-Checked Locking) 机制,在加锁前先检查实例是否已存在。若已存在,则直接返回实例,避免锁竞争;若不存在,再进行加锁及实例创建。
需特别注意,必须使用双重条件判断。因为外层的条件判断(if (_ptr == nullptr) )并非原子操作,多个线程可能同时通过该判断并进入临界区竞争锁。若在加锁后内部没有再次进行条件判断,仍可能导致多个线程依次创建实例。因此,内层的条件判断是必要的,它确保了即使在多个线程同时通过外层判断的情况下,仅有一个线程能成功创建实例。
#include <pthread.h>
class myclass {
public:myclass(const myclass&) = delete;myclass& operator=(const myclass&) = delete;static myclass& GetInstance() {if (_ptr == nullptr) { // 第一次检查:避免不必要的锁竞争pthread_mutex_lock(&mutex);if (_ptr == nullptr) { // 第二次检查:确保实例唯一性_ptr = new myclass;}pthread_mutex_unlock(&mutex);}return *_ptr;}// ...(其余成员函数保持不变)~myclass() {delete _ptr;std::cout << "lazy1::~myclass()" << std::endl;}
private:myclass() = default;static myclass* _ptr;static pthread_mutex_t mutex;
};
myclass* myclass::_ptr = nullptr;
pthread_mutex_t myclass::mutex = PTHREAD_MUTEX_INITIALIZER;
自旋锁
接下来,我将介绍一种新型的锁——自旋锁。自旋锁本质上是一种特殊的互斥锁。其特殊性在于,当线程竞争锁失败时,行为与常规互斥锁不同。对于互斥锁而言,一旦加锁失败,线程会立即进入阻塞状态,并等待锁持有者释放锁后再被唤醒。而自旋锁在加锁失败后,并不会放弃 CPU,而是通过循环不断检查锁的状态,直至锁被释放。
为帮助读者更直观地理解自旋锁的工作机制,我们通过一个生活场景进行类比。
假设你到好友的寝室楼下,约他一同吃饭。你打电话通知他下楼,但对方表示手头有事需要处理,稍后才能下来。此时,你选择在楼下安静等待,直到好友主动联系你后再一同出发——这种方式类似于互斥锁的行为:在获取锁失败后,线程进入阻塞状态,等待被唤醒。
而在自旋锁的场景下,你的行为会有所不同:在得知好友需要稍等之后,你不会原地等待,而是每隔30秒就打电话询问对方是否就绪,直至好友下楼。这种不断主动询问的方式,就类似于自旋锁的“忙等待”机制。
从上述例子我们可以总结:互斥锁在加锁失败时,线程会进入阻塞状态,同时释放 CPU 资源,并被移出调度队列;而自旋锁在加锁失败后,线程仍持续占用 CPU 资源,不断轮询锁的状态。
理解了自旋锁的基本概念后,我们来看其具体实现与使用方法。与互斥锁、条件变量类似,自旋锁在 POSIX 线程库中通过 pthread_spinlock_t 类型表示。该类型通常定义为一个结构体,其中关键字段为表示锁状态的整型变量,0 表示未上锁,1 表示已上锁:
// 常见定义位于 /usr/include/pthread.h 或相关头文件
typedef volatile int pthread_spinlock_t;// 或更为常见的结构体形式
typedef struct {int __lock; // 锁状态:0 表示未锁定,1 表示已锁定
} pthread_spinlock_t;
使用自旋锁前需先进行初始化,初始化的方式分为静态初始化和动态初始化。静态初始化适用于全局或静态变量,通过宏 PTHREAD_SPINLOCK_INITIALIZER 实现,其作用是将锁内部状态初始化为 0。静态初始化的锁无需显式销毁。
另一种方式为动态初始化,通过调用 pthread_spin_init函数实现:
pthread_spin_init- 头文件: <pthread.h>
- 函数声明:int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
- 返回值:成功返回 0,失败返回错误编号(不设置
errno)
该函数的第一个参数为指向自旋锁变量的指针,第二个参数 pshared表示锁的共享属性:0 表示线程间共享,非 0 表示进程间共享。动态初始化的锁在使用结束后需调用 pthread_spin_destroy 进行销毁:
pthread_spin_destroy- 头文件:<pthread.h>
- 函数声明:int pthread_spin_destroy(pthread_spinlock_t *lock);
- 返回值:成功返回 0,失败返回错误编号(不设置
errno)
接下来介绍自旋锁的核心操作函数。加锁操作通过 pthread_spin_lock 实现:
pthread_spin_lock- 头文件: <pthread.h>
- 函数声明:int pthread_spin_lock(pthread_spinlock_t *lock);
- 返回值:成功返回 0,失败返回错误编号(不设置
errno)
该函数的底层实现通常依赖于一条原子交换指令。线程先将寄存器值设为 1,然后与内存中的锁状态进行原子交换。若交换前锁状态为 0,则获取锁成功;否则线程进入循环,重复执行该指令,直至成功获取锁。以下为伪汇编代码示意:
; 伪汇编代码
spin_lock:mov eax, 1 ; 将 1 存入寄存器
retry:lock xchg [lock_var], eax ; 原子交换 eax 与锁变量的值test eax, eax ; 测试原锁状态jnz spin_wait ; 若原值非 0,继续循环等待; 成功获取锁,进入临界区ret
解锁操作通过 pthread_spin_unlock 实现:
pthread_spin_unlock头文件: <pthread.h>
函数声明:int pthread_spin_unlock(pthread_spinlock_t *lock);
返回值:成功返回 0,失败返回错误编号(不设置
errno)
该函数将锁状态重置为 0,表示锁已被释放。需要注意的是,在多核 CPU 架构下,为确保修改对所有核心可见,解锁操作不仅需将值写回内存,还需通过缓存一致性协议(如 MESI)使其他核心的缓存副本失效,从而保证可见性。
基于上述原理可知,自旋锁在加锁失败时会进入忙等待状态,持续占用 CPU 资源。这引出一个关键问题:何时应使用自旋锁,而非互斥锁?
我们仍以等好友吃饭的场景为例:若好友告知需等待一小时,则应选择互斥锁方式,使当前线程阻塞,让出 CPU;若好友仅需系鞋带(耗时极短),则可使用自旋锁,通过数次“打电话”快速获知状态变更。因此,自旋锁适用于锁持有时间极短的场景。
此外,CPU 架构也会影响自旋锁的适用性。在单核 CPU 上,若某线程持有自旋锁并执行临界区代码,另一线程因加锁失败而自旋,由于同一时刻仅有一个线程运行,自旋线程会浪费整个时间片,且持有锁的线程无法被调度,导致锁无法及时释放。而在多核 CPU 上,自旋线程可在其他核心上执行,不会影响持有锁线程的运行,从而能快速感知锁的释放。以下为单核环境下可能的问题示意:
// 单核CPU使用自旋锁可能的问题:
线程A(持有锁) → 线程B(自旋等待) → 线程A(无法运行)
尽管存在上述限制,自旋锁仍具独特优势。与互斥锁相比,自旋锁避免了线程的睡眠与唤醒过程,从而省去了两次线程上下文切换的开销。在锁竞争不激烈、持有时间短的场景中,自旋锁往往能提供更高性能。
读写锁
本文将补充的最后一种锁是 读写锁 。为帮助读者快速建立对 读写锁 的理解,我们仍通过一个例子进行说明。
在学生时代,我们常常会制作黑板报。通常有两类人会接触黑板报:一类是绘制黑板报的人,另一类是观看黑板报的人。这里的黑板报可视为一个共享资源。
对于绘制者而言,在同一时间只允许一个人进行绘制。如果允许多人同时绘制,就会引发数据不一致的问题。例如,前一秒某人在黑板上绘制的内容,可能在后一秒被其他人覆盖。因此,绘制者在访问黑板报时必须保证互斥写入,以避免内容冲突。
对于观看者而言,假设黑板报已经绘制完成,他们便可以欣赏其内容。需要注意的是,观看者仅对黑板报进行读取操作,而不进行任何写入。因此,在同一时刻允许多个人同时观看是合理的。如果要求观看者必须排队,只有队首的人才能观看,其余人则被禁止观看,这显然不符合实际场景。
在上述比喻中,黑板报代表共享资源,绘制者相当于 写线程 (writer),观看者相当于 读线程 (reader)。首先, 写线程 之间必须互斥地访问共享资源,因为写入操作不是原子的,并发写入会导致数据不一致。而 读线程 之间则可以并发访问,因为多个线程仅以只读方式访问共享资源,只要不修改资源内容,就不会引发数据不一致的问题。
接下来需要探讨的是 读线程 与 写线程 之间的关系。
我们仍以黑板报为例,思考绘制者与观看者是否可以同时访问黑板报。假设绘制者尚未完成作品,仅完成了部分内容。若在绘制过程中,观看者前来浏览,由于内容不完整,观看者可能会误解绘制者想要表达的主题。例如,绘制者实际想表达主题B,但观看者根据当前不完整的内容理解为主题A。这一误解并非绘制者的责任,而是由于观看者在内容未完成时提前访问所致。因此,观看者必须等待绘制者完成绘制后,才能正确理解内容。
同理,如果观看者正在阅读完整的黑板报内容,而此时绘制者突然开始修改,将会破坏内容的完整性与连续性,导致观看者无法正确理解。
通过以上分析,我们可以明确读线程与 写线程 之间的关系是互斥的。即当 写线程 访问共享资源时,读线程不能访问;当 读线程 访问共享资源时, 写线程 也不能访问。只有当写线程 完成写入后, 读线程 才能读取;或当读线程完成读取后, 写线程 才能写入。
通过上文分析,我们了解到写者与写者之间存在互斥关系,同时读者与写者之间也存在互斥关系。如果使用互斥锁来保证写者与写者之间的互斥以及读者与写者之间的互斥,对于写者而言,其写操作本质上是通过调用 write 函数实现的,该函数会访问临界资源并对临界区进行写入。对于读者,其读操作通过调用 read 函数实现,该函数负责读取临界资源。由于写者与写者之间互斥,我们可以推断写者所调用的write 函数内部必然通过加锁机制来保证写者之间的互斥。然而,读者的read 函数通常不会加锁。在这种情况下,我们仅保证了写者之间的互斥,但尚未实现读者与写者之间的互斥。
进一步分析,我们需要确保在写者进行写操作期间,读者不能访问共享资源;而在读者进行读操作期间,写者也不能访问。这实际上是一种同步机制。条件变量虽然可以用于实现同步,但核心问题在于:写者如何判断当前是否有读者正在读取?由于读者线程仅读取共享资源而不进行修改,且每个线程都是独立的执行流,从写者的视角无法直接获知读者当前的读取状态。同理,读者也无法直接判断写者是否正在写入。因此,仅依靠传统的互斥锁和条件变量难以解决这一问题,而读写锁(Read-Write Lock)正是为应对此类场景而设计的。读写锁的核心特性在于:允许读者在读取时阻止写者写入,同时确保写者在写入时阻止读者读取。接下来,我们将深入探讨读写锁的实现原理及其如何解决上述同步问题。
读写锁本质上是一个变量,与互斥锁和条件变量类似,其类型为
pthread_rwlock_t 。以下是一个典型的读写锁数据结构定义(具体实现可能因系统而异):
/* 在 /usr/include/pthread.h 或相关头文件中 *//* 读写锁数据结构 */
typedef union
{struct pthread_rwlock_impl __data;char __size[__SIZEOF_PTHREAD_RWLOCK_T];long int __align;
} pthread_rwlock_t;struct pthread_rwlock_impl {// 核心状态变量unsigned int reader_count; // 当前活跃读者数量unsigned int writer_waiting; // 等待的写者数量unsigned int reader_waiting; // 等待的读者数量int writer_active; // 写者活跃标志(0/1)// 同步原语
pthread_mutex_t mutex; // 保护内部状态的互斥锁
pthread_cond_t read_cond; // 读者条件变量
pthread_cond_t write_cond; // 写者条件变量// 可重入支持(可选)
pthread_t writer_thread_id; // 当前写者线程ID
int write_recursive_count; // 写锁重入计数};
读写锁的属性由联合体中的结构体成员字段记录。为理解其原理,我们重点关注以下几个关键字段:读者数量(reader_count )、读者等待数量(reader_waiting )、写者等待数量(writer_waiting )、写者活跃标志(writer_active ),以及保护内部状态的互斥锁(mutex )和读写条件变量(read_cond 、write_cond )。掌握这些字段的作用即可深入理解读写锁的底层机制。
定义读写锁后,需进行初始化。读写锁的初始化分为静态和动态两种方式。静态初始化使用宏PTHREAD_RWLOCK_INITIALIZER ,适用于全局变量或静态变量,无需显式销毁。该宏将读写锁的各字段初始化为默认值(通常为0)。
动态初始化则通过调用 pthread_rwlock_init 函数实现:
pthread_rwlock_init- 头文件:<pthread.h>
- 函数声明: int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
- 返回值:成功返回0,失败返回错误编号(不设置
errno)。
第一个参数为读写锁指针,第二个参数用于设置属性(如进程共享),通常设为NULL 。动态初始化的读写锁必须通过pthread_rwlock_destroy 函数销毁:
pthread_rwlock_destroy- 头文件:<pthread.h>
- 函数声明: int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
- 返回值:成功返回0,失败返回错误编号(不设置
errno)。
初始化完成后,读写锁的核心操作包括加锁和解锁。写者线程通过调用pthread_rwlock_wrlock 函数获取写锁:
pthread_rwlock_wrlock- 头文件:<pthread.h>
- 函数声明:int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
- 返回值:成功返回0,失败返回错误编号(不设置
errno)。
pthread_rwlock_wrlock 的内部原理如下:首先获取读写锁内部的互斥锁(mutex )以保护共享状态。接着,检查写者活跃标志(writer_active )和读者数量(reader_count )。若当前有写者活跃(writer_active > 0 )或有读者正在读取(reader_count > 0 ),则写者进入等待状态:增加等待写者计数(writer_waiting ),并在写者条件变量(write_cond )上阻塞(同时释放内部互斥锁)。被唤醒后,减少等待写者计数,并重新检查条件。仅当无活跃写者且无活跃读者时,写者才能获取锁,设置写者活跃标志为1,并释放内部互斥锁。
pthread_rwlock_wrlock(pthread_rwlock_t *rwlock) {// 首先获取保护读写锁内部状态的互斥锁pthread_mutex_lock(&rwlock->mutex);// 检查是否需要等待:有活跃写者 或 有活跃读者
while (rwlock->writer_active > 0 || rwlock->reader_count > 0) {// 增加等待写者计数rwlock->waiting_writers++;// 在写者条件变量上等待,同时释放内部互斥锁pthread_cond_wait(&rwlock->write_cond, &rwlock->mutex);// 被唤醒后,减少等待写者计数rwlock->waiting_writers--;
}// 条件满足:设置写者活跃标志rwlock->writer_active = 1;// 释放内部互斥锁pthread_mutex_unlock(&rwlock->mutex);return 0;
}
读者线程通过调用pthread_rwlock_rdlock 函数获取读锁:
pthread_rwlock_rdlock- 头文件:<pthread.h>
- 函数声明: int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
- 返回值:成功返回0,失败返回错误编号(不设置
errno)。
pthread_rwlock_rdlock 的原理类似:先获取内部互斥锁。随后检查写者活跃标志( writer_active )和等待写者数量( writer_waiting )。若有活跃写者或等待写者,则读者进入等待状态:增加等待读者计数( reader_waiting ),并在读者条件变量( read_cond )上阻塞。被唤醒后减少等待计数,并重新检查条件。仅当无活跃写者且无等待写者时,读者才能获取读锁,增加活跃读者计数( reader_count ),并释放内部互斥锁。
pthread_rwlock_rdlock(pthread_rwlock_t *rwlock){pthread_mutex_lock(&rwlock->mutex); // 保护读写锁内部状态// 检查两个条件:
// 1. 是否有活跃写者(writer_active != 0)
// 2. 是否有等待的写者(waiting_writers != 0)
while (rwlock->writer_active > 0 || rwlock->waiting_writers > 0) {// 在阻塞前增加等待读者计数rwlock->waiting_readers++;// 在读者条件变量上阻塞(自动释放互斥锁)pthread_cond_wait(&rwlock->read_cond, &rwlock->mutex);// 被唤醒后减少等待读者计数rwlock->waiting_readers--;// 循环继续检查条件,确保真的可以获取锁
}
// 条件满足:增加活跃读者计数
rwlock->reader_count++;
// 释放内部互斥锁
pthread_mutex_unlock(&rwlock->mutex);
return 0; // 成功获取读锁}
通过上述加锁逻辑,读写锁实现了写者访问时阻塞读者、读者访问时阻塞写者的同步机制。
最后,读者和写者均通过pthread_rwlock_unlock 函数释放锁:
pthread_rwlock_unlock- 头文件:<pthread.h>
- 函数声明: int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
- 返回值:成功返回0,失败返回错误编号(不设置
errno)。
该函数需区分调用线程的身份(读者或写者)。通过检查写者活跃标志( writer_active )进行判断:若标志为1,则为写者;否则为读者。写者解锁时,清除活跃标志,并优先唤醒等待的写者(若有)。读者解锁时,减少活跃读者计数。若当前读者为最后一个活跃读者,且存在等待写者,则唤醒一个写者。此机制确保在无读者时写者能获得访问权,避免写者饥饿。
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock) {// 加锁内部互斥锁pthread_mutex_lock(&rwlock->mutex);if (rwlock->writer_active) {/* 写者解锁 */rwlock->writer_active = 0; // 清除写者标志// 检查是否有写者等待,有则唤醒一个写者if (rwlock->waiting_writers > 0) {pthread_cond_signal(&rwlock->write_cond);}// 如果没有写者等待,直接退出(不检查读者等待队列)} else {/* 读者解锁 */rwlock->active_readers--; // 减少读者计数// 只有最后一个读者需要检查唤醒条件if (rwlock->active_readers == 0) {// 检查是否有写者等待if (rwlock->waiting_writers > 0) {pthread_cond_signal(&rwlock->write_cond);}}// 如果不是最后一个读者,只减少计数,不进行唤醒}// 释放内部互斥锁pthread_mutex_unlock(&rwlock->mutex);return 0;
}
通过上述分析,我们揭示了读写锁如何通过内部状态管理和条件变量同步,高效实现读者-写者问题的互斥与同步需求。
需要注意的是,上文所讨论的读写锁内部实现原理是基于写者优先的策略。具体来说,在pthread_rwlock_rdlock 函数中,如果当前有写者正在等待,则系统会优先唤醒写者,而不是让后续的读操作继续获取锁。之所以采用写者优先的设计,是因为在典型的读写锁应用场景中,读操作远多于写操作。在这种情况下,写者线程属于“弱势”一方,通常持有锁的时间较短,因此即使优先执行写者,也不太会导致读者线程长时间饥饿。
不过需要指出的是,Linux 系统中默认的读写锁行为确实是写者优先。如果希望调整这一策略,我们可以在调用 pthread_rwlock_init 函数进行动态初始化时,通过其第二个参数(attr)来设定相应的锁属性,从而改变其调度方式。
结语
那么这就是本文关于线程的全部内容,那么十分感谢耐心看到这里的读者,那么恭喜你们,已经成功跨越了Linux系统的第三座大三,前面两座分别是进程以及文件系统,那么至此,我们Linux系统部分的学习变告一段落,我们接下来会进入Linux网络 部分的学习,那么我会持续更新,希望你能够多多关注,如果本文有帮组到你的话,还请三连加关注,你的支持就是我创作的最大动力!

