Linux系统多线程总结
Linux系统多线程总结
在Linux多线程编程中,我们始终围绕两个核心问题展开:如何保证数据安全(互斥) 和如何让线程按预期顺序执行(同步)。从最初的互斥锁、条件变量,到今天要深入的环形队列、信号量,再到工程中常用的线程池,每一步都是为了在并发效率和数据安全之间找到平衡。
这篇总结会沿着实战路线,从环形队列与信号量的结合讲起,逐步扩展到多生产多消费模型,再深入线程池的设计与实现,最后通过线程的面向对象封装简化开发。
一、环形队列与信号量:同步的高效实现
在之前的生产者-消费者模型中,我们用阻塞队列(基于std::queue)实现了同步,但阻塞队列是线性结构,在高并发场景下可能存在效率瓶颈。而环形队列通过数组模拟环形结构,用模运算实现下标循环,能更高效地利用内存;配合信号量(本质是计数器),可以简化同步逻辑,避免复杂的条件判断。
1.1 环形队列:数组模拟的“循环容器”
环形队列的核心是“用线性数组模拟环形空间”,通过两个下标(生产者下标prod_idx
、消费者下标cons_idx
)追踪生产和消费的位置,再用模运算让下标“绕回”数组开头,实现“环形”效果。
1.1.1 环形队列的核心原理
假设我们用数组buf
模拟环形队列,容量为cap
,那么:
- 生产者:每次生产数据后,
prod_idx = (prod_idx + 1) % cap
,下标超过数组长度时自动回到0; - 消费者:每次消费数据后,
cons_idx = (cons_idx + 1) % cap
,同理循环; - 核心问题:如何判断队列“空”或“满”?因为当队列空或满时,
prod_idx
和cons_idx
会指向同一个位置。
文档中提到两种解决方法,我们逐一拆解:
方法1:用计数器判断(直观易懂)
在队列中维护一个计数器count
,记录当前队列中的元素个数:
- 空:
count == 0
; - 满:
count == cap
; - 生产时
count++
,消费时count--
。
这种方法的优点是逻辑简单,缺点是多线程下count
需要加锁保护(不过后续会用信号量规避这个问题)。
方法2:空出一个位置(无需计数器)
故意让队列始终空出一个位置,不存储数据:
- 空:
prod_idx == cons_idx
; - 满:
(prod_idx + 1) % cap == cons_idx
;
比如容量为5的队列,最多存储4个元素,当生产者下标prod_idx=4
(数组最后一个位置),下一个位置(4+1)%5=0
,若此时cons_idx=0
,则队列满。
这种方法无需计数器,但需要理解“空一个位置”的设计,适合对内存效率要求不极致的场景。
1.1.2 环形队列的C++实现(基于vector)
我们用vector
作为底层存储,选择“空出一个位置”的判空判满方式,先实现单生产单消费版本:
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <cassert>template <typename T>
class RingQueue {
public:// 构造函数:指定队列容量,默认10explicit RingQueue(int cap = 10) : cap_(cap), buf_(cap), prod_idx_(0), cons_idx_(0) {// 初始化信号量:空间信号量(初始值=容量)、数据信号量(初始值=0)int ret = sem_init(&sem_space_, 0, cap_);assert(ret == 0);ret = sem_init(&sem_data_, 0, 0);assert(ret == 0);}~RingQueue() {// 销毁信号量sem_destroy(&sem_space_);sem_destroy(&sem_data_);}// 生产者:向队列中放数据void Push(const T& data) {// 1. P操作:申请空间资源(空间信号量-1)sem_wait(&sem_space_);// 2. 放数据到当前生产者下标位置buf_[prod_idx_] = data;std::cout << "生产者:放数据 " << data << " 到位置 " << prod_idx_ << std::endl;// 3. 更新生产者下标(环形递增)prod_idx_ = (prod_idx_ + 1) % cap_;// 4. V操作:释放数据资源(数据信号量+1)sem_post(&sem_data_);}// 消费者:从队列中拿数据T Pop() {// 1. P操作:申请数据资源(数据信号量-1)sem_wait(&sem_data_);// 2. 从当前消费者下标位置拿数据T data = buf_[cons_idx_];std::cout << "消费者:拿数据 " << data << " 从位置 " << cons_idx_ << std::endl;// 3. 更新消费者下标(环形递增)cons_idx_ = (cons_idx_ + 1) % cap_;// 4. V操作:释放空间资源(空间信号量+1)sem_post(&sem_space_);return data;}private:std::vector<T> buf_; // 底层存储数组int cap_; // 队列容量(实际存储cap_-1个元素)int prod_idx_; // 生产者下标int cons_idx_; // 消费者下标sem_t sem_space_; // 空间信号量:记录空闲位置数sem_t sem_data_; // 数据信号量:记录可用数据数
};
1.1.3 信号量:简化同步的“计数器”
在上面的代码中,我们引入了信号量sem_space_
和sem_data_
,这两个信号量的作用是什么?
-
信号量的本质:一个原子计数器,用于描述“可用资源的数量”,提供两个核心操作:
- P操作(申请资源):
sem_wait
,计数器减1;若计数器<0,线程阻塞; - V操作(释放资源):
sem_post
,计数器加1;若计数器<=0,唤醒一个阻塞线程。
- P操作(申请资源):
-
空间信号量(sem_space_):
- 初始值=队列容量
cap_
,表示初始时有cap_
个空闲位置; - 生产者生产前必须P操作:申请一个空闲位置,没位置就阻塞;
- 消费者消费后必须V操作:释放一个空闲位置,让生产者可以继续生产。
- 初始值=队列容量
-
数据信号量(sem_data_):
- 初始值=0,表示初始时没有可用数据;
- 消费者消费前必须P操作:申请一个数据,没数据就阻塞;
- 生产者生产后必须V操作:释放一个数据,让消费者可以继续消费。
通过信号量,我们巧妙地规避了“判空判满”的手动判断——信号量会自动帮我们管理资源状态,申请不到就阻塞,申请到就直接操作,代码比用条件变量简洁多了!
1.1.4 单生产单消费的测试
我们创建一个生产者线程和一个消费者线程,测试环形队列的同步效果:
#include "RingQueue.hpp"
#include <unistd.h>
#include <cstdlib>
#include <ctime>// 全局环形队列(容量5)
RingQueue<int> rq(5);// 生产者线程:生成1~10的随机数
void* producer(void* arg) {int prod_id = *(int*)arg;delete (int*)arg;srand(time(nullptr)); // 初始化随机数种子for (int i = 0; i < 10; ++i) {int data = rand() % 10 + 1; // 1~10的随机数rq.Push(data);usleep(500000); // 模拟生产耗时(0.5秒)}std::cout << "生产者" << prod_id << ":生产完毕!" << std::endl;return nullptr;
}// 消费者线程:消费数据并打印
void* consumer(void* arg) {int cons_id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 10; ++i) {int data = rq.Pop();usleep(1000000); // 模拟消费耗时(1秒)}std::cout << "消费者" << cons_id << ":消费完毕!" << std::endl;return nullptr;
}int main() {pthread_t prod_tid, cons_tid;// 创建生产者线程(传递ID)int* prod_id = new int(1);pthread_create(&prod_tid, nullptr, producer, prod_id);// 创建消费者线程(传递ID)int* cons_id = new int(1);pthread_create(&cons_tid, nullptr, consumer, cons_id);// 等待线程结束pthread_join(prod_tid, nullptr);pthread_join(cons_tid, nullptr);return 0;
}
编译运行(需要链接pthread库):
g++ main.cpp -o ring_queue -lpthread
./ring_queue
运行结果(节选):
生产者:放数据 5 到位置 0
消费者:拿数据 5 从位置 0
生产者:放数据 3 到位置 1
生产者:放数据 8 到位置 2
消费者:拿数据 3 从位置 1
生产者:放数据 2 到位置 3
消费者:拿数据 8 从位置 2
...
你会发现:
- 生产者生产慢(0.5秒/个),消费者消费慢(1秒/个),所以生产者会先填满队列(容量5,实际存4个),然后阻塞;
- 消费者每消费一个,生产者就会唤醒并生产一个,形成“消费一个→生产一个”的节奏。
1.2 从单生产单消费到多生产多消费
单生产单消费能满足简单场景,但实际项目中往往需要多个生产者(比如多个客户端发送请求)和多个消费者(多个线程处理请求)。这时候需要解决一个核心问题:生产者之间的下标竞争和消费者之间的下标竞争。
1.2.1 多生产多消费的问题:下标竞争
在多生产者场景下,多个生产者会同时修改prod_idx
:
- 生产者A刚计算出
prod_idx=2
,还没更新,就被切换; - 生产者B也计算出
prod_idx=2
,更新为3; - 生产者A恢复后,把
prod_idx
更新为3,导致两个生产者把数据放到了同一个位置,数据覆盖!
消费者之间也存在同样的问题,多个消费者会竞争cons_idx
,导致重复消费。
1.2.2 解决方案:两把互斥锁
为了解决竞争,我们需要给生产者和消费者分别加锁:
- 生产者锁(prod_mutex_):保护
prod_idx
的修改,确保同一时间只有一个生产者能更新下标; - 消费者锁(cons_mutex_):保护
cons_idx
的修改,确保同一时间只有一个消费者能更新下标。
为什么用两把锁,而不是一把?因为生产和消费可以并发进行(只要队列不为空且不满),用两把锁能提高并发效率——生产者加生产者锁,消费者加消费者锁,两者互不干扰。
1.2.3 多生产多消费的代码优化
修改RingQueue
类,添加两把互斥锁,并调整Push
和Pop
方法:
template <typename T>
class RingQueue {
public:explicit RingQueue(int cap = 10) : cap_(cap), buf_(cap), prod_idx_(0), cons_idx_(0) {// 初始化信号量int ret = sem_init(&sem_space_, 0, cap_);assert(ret == 0);ret = sem_init(&sem_data_, 0, 0);assert(ret == 0);// 初始化两把互斥锁ret = pthread_mutex_init(&prod_mutex_, nullptr);assert(ret == 0);ret = pthread_mutex_init(&cons_mutex_, nullptr);assert(ret == 0);}~RingQueue() {sem_destroy(&sem_space_);sem_destroy(&sem_data_);pthread_mutex_destroy(&prod_mutex_);pthread_mutex_destroy(&cons_mutex_);}void Push(const T& data) {// 1. P操作:申请空间(信号量原子操作,无需加锁)sem_wait(&sem_space_);// 2. 加生产者锁:保护prod_idx的修改pthread_mutex_lock(&prod_mutex_);// 3. 放数据+更新下标buf_[prod_idx_] = data;std::cout << "生产者" << pthread_self() << ":放数据 " << data << " 到位置 " << prod_idx_ << std::endl;prod_idx_ = (prod_idx_ + 1) % cap_;// 4. 解锁pthread_mutex_unlock(&prod_mutex_);// 5. V操作:释放数据sem_post(&sem_data_);}T Pop() {// 1. P操作:申请数据sem_wait(&sem_data_);// 2. 加消费者锁:保护cons_idx的修改pthread_mutex_lock(&cons_mutex_);// 3. 拿数据+更新下标T data = buf_[cons_idx_];std::cout << "消费者" << pthread_self() << ":拿数据 " << data << " 从位置 " << cons_idx_ << std::endl;cons_idx_ = (cons_idx_ + 1) % cap_;// 4. 解锁pthread_mutex_unlock(&cons_mutex_);// 5. V操作:释放空间sem_post(&sem_space_);return data;}private:std::vector<T> buf_;int cap_;int prod_idx_;int cons_idx_;sem_t sem_space_;sem_t sem_data_;pthread_mutex_t prod_mutex_; // 生产者互斥锁pthread_mutex_t cons_mutex_; // 消费者互斥锁
};
1.2.4 加锁时机:为什么在信号量之后?
这里有个关键细节:互斥锁加在信号量P操作之后,而不是之前。为什么?
我们用生活中的例子类比:
- 信号量P操作 = 买电影票(申请资源,原子操作,无需排队);
- 互斥锁 = 电影院检票(进入影院前排队,确保有序);
如果先加锁再申请信号量,相当于“先排队检票,再买票”——没票的人也得排队,浪费时间;而先申请信号量(买票),再加锁(检票),只有有票的人才需要排队,效率更高。
在代码中,信号量P操作是原子的,多个生产者可以同时申请信号量(买 ticket),申请成功后再排队加锁(检票),这样能减少线程阻塞在锁上的时间,提高并发度。
1.2.5 多生产多消费的测试
创建3个生产者和2个消费者,测试并发效果:
int main() {pthread_t prod_tids[3], cons_tids[2];// 创建3个生产者for (int i = 0; i < 3; ++i) {int* prod_id = new int(i + 1);pthread_create(&prod_tids[i], nullptr, producer, prod_id);}// 创建2个消费者for (int i = 0; i < 2; ++i) {int* cons_id = new int(i + 1);pthread_create(&cons_tids[i], nullptr, consumer, cons_id);}// 等待所有线程结束for (int i = 0; i < 3; ++i) {pthread_join(prod_tids[i], nullptr);}for (int i = 0; i < 2; ++i) {pthread_join(cons_tids[i], nullptr);}return 0;
}
运行结果(节选):
生产者140703347826432:放数据 7 到位置 0
生产者140703339433728:放数据 3 到位置 1
生产者140703331041024:放数据 9 到位置 2
消费者140703322648320:拿数据 7 从位置 0
消费者140703314255616:拿数据 3 从位置 1
生产者140703347826432:放数据 2 到位置 3
...
你会看到多个生产者和消费者并发执行,数据没有覆盖或重复消费,因为互斥锁保护了下标的修改。
1.3 任务化改造:从“传递数据”到“传递任务”
之前的例子中,环形队列传递的是整数,但实际项目中,我们更需要传递“任务”(比如计算任务、IO任务)。我们可以定义一个Task
类,让环形队列存储Task
对象,实现“生产者生成任务,消费者执行任务”的模式。
1.3.1 Task类的实现(支持加减乘除)
#include <string>class Task {
public:Task(int a, int b, char op) : a_(a), b_(b), op_(op), result_(0), exit_code_(0) {}// 执行任务void Run() {switch (op_) {case '+':result_ = a_ + b_;break;case '-':result_ = a_ - b_;break;case '*':result_ = a_ * b_;break;case '/':if (b_ == 0) {exit_code_ = 1; // 除零错误result_ = 0;} else {result_ = a_ / b_;}break;default:exit_code_ = 2; // 无效操作符result_ = 0;}}// 获取任务结果字符串std::string GetResult() const {std::string res = std::to_string(a_) + op_ + std::to_string(b_) + " = ";if (exit_code_ == 0) {res += std::to_string(result_) + "(成功)";} else if (exit_code_ == 1) {res += "错误(除零)";} else {res += "错误(无效操作符)";}return res;}private:int a_; // 操作数1int b_; // 操作数2char op_; // 操作符(+、-、*、/)int result_; // 计算结果int exit_code_; // 退出码:0成功,1除零,2无效操作符
};
1.3.2 任务化的环形队列测试
修改生产者和消费者,生成并执行任务:
// 全局环形队列(存储Task对象)
RingQueue<Task> rq(5);// 生产者:生成任务
void* producer(void* arg) {int prod_id = *(int*)arg;delete (int*)arg;srand(time(nullptr) + prod_id); // 不同生产者用不同种子,避免重复char ops[] = {'+', '-', '*', '/'};int op_cnt = sizeof(ops) / sizeof(ops[0]);for (int i = 0; i < 5; ++i) {int a = rand() % 10 + 1;int b = rand() % 5; // 可能为0,测试除零错误char op = ops[rand() % op_cnt];Task task(a, b, op);rq.Push(task);std::cout << "生产者" << prod_id << ":生成任务 " << task.GetResult() << std::endl;usleep(500000);}std::cout << "生产者" << prod_id << ":任务生成完毕!" << std::endl;return nullptr;
}// 消费者:执行任务
void* consumer(void* arg) {int cons_id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 7; ++i) { // 3个生产者共15个任务,2个消费者各执行7~8个Task task = rq.Pop();task.Run(); // 执行任务std::cout << "消费者" << cons_id << ":执行任务 " << task.GetResult() << std::endl;usleep(1000000);}std::cout << "消费者" << cons_id << ":任务执行完毕!" << std::endl;return nullptr;
}
运行结果(节选):
生产者1:生成任务 5+3 = (未执行)
生产者2:生成任务 8-0 = (未执行)
生产者3:生成任务 4*2 = (未执行)
消费者1:执行任务 5+3 = 8(成功)
消费者2:执行任务 8-0 = 8(成功)
生产者1:生成任务 7/0 = (未执行)
消费者1:执行任务 4*2 = 8(成功)
消费者2:执行任务 7/0 = 错误(除零)
...
这样就实现了“生产者生成任务、消费者执行任务”的异步模型,这也是线程池的核心雏形。
二、线程池:多线程的工程化实践
在实际项目中,频繁创建和销毁线程会带来很大的系统开销(比如内核创建PCB、分配栈空间等)。线程池的核心思想是“预先创建一批线程,复用线程处理多个任务”,以空间换时间,提高系统响应速度。
2.1 池化技术:为什么需要“池”?
在计算机领域,“池化技术”无处不在,比如:
- 内存池:预先申请一块大内存,避免频繁
malloc/free
; - 进程池:预先创建一批进程,处理客户端请求;
- 线程池:预先创建一批线程,处理任务队列中的任务。
它们的本质都是“以空间换时间”——提前占用一部分内存资源,避免频繁与内核交互(创建线程/进程、申请内存),从而减少开销。
举个例子:如果每次有任务来才创建线程,处理完销毁,假设创建/销毁线程需要1ms,任务处理需要0.1ms,那么1000个任务的总耗时是1000*(1+0.1) = 1100ms
;而用线程池(预先创建10个线程),总耗时是1(创建线程) + 1000*0.1 = 101ms
,效率提升10倍!
2.2 线程池的核心组件
一个基础的线程池需要包含以下组件:
- 任务队列:存储待处理的任务,由外部线程提交;
- 线程数组:预先创建的线程,循环从任务队列中取任务执行;
- 互斥锁:保护任务队列的访问,避免多线程竞争;
- 条件变量:当任务队列为空时,线程阻塞;有新任务时,唤醒线程;
- 控制变量:比如线程池是否运行的标志。
2.3 线程池的C++实现
我们用C++实现一个支持任务提交的线程池,支持任意类型的任务(基于模板)。
2.3.1 线程池的类设计
#include <vector>
#include <queue>
#include <pthread.h>
#include <iostream>
#include <cassert>
#include <functional> // 用于std::functiontemplate <typename T>
class ThreadPool {
public:// 构造函数:指定线程数,默认5explicit ThreadPool(int thread_num = 5) : thread_num_(thread_num), is_running_(false) {// 初始化互斥锁和条件变量int ret = pthread_mutex_init(&mutex_, nullptr);assert(ret == 0);ret = pthread_cond_init(&cond_, nullptr);assert(ret == 0);}~ThreadPool() {// 停止线程池Stop();// 销毁互斥锁和条件变量pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}// 启动线程池:创建预先的线程void Start() {if (is_running_) return;is_running_ = true;// 创建thread_num_个线程threads_.resize(thread_num_);for (int i = 0; i < thread_num_; ++i) {// 线程函数必须是静态的(无this指针),传递this指针作为参数int ret = pthread_create(&threads_[i], nullptr, ThreadFunc, this);assert(ret == 0);std::cout << "线程池:创建线程 " << threads_[i] << std::endl;}}// 停止线程池:唤醒所有线程,等待退出void Stop() {if (!is_running_) return;is_running_ = false;// 唤醒所有阻塞的线程pthread_cond_broadcast(&cond_);// 等待所有线程退出for (auto& tid : threads_) {pthread_join(tid, nullptr);std::cout << "线程池:线程 " << tid << " 退出" << std::endl;}threads_.clear();}// 提交任务到任务队列void Submit(const T& task) {// 加锁保护任务队列pthread_mutex_lock(&mutex_);// 向队列中添加任务tasks_.push(task);std::cout << "线程池:提交任务 " << task.GetResult() << std::endl;// 唤醒一个阻塞的线程(有新任务了)pthread_cond_signal(&cond_);// 解锁pthread_mutex_unlock(&mutex_);}private:// 线程函数(静态成员函数,无this指针)static void* ThreadFunc(void* arg) {// 将参数强转为ThreadPool指针ThreadPool* pool = static_cast<ThreadPool*>(arg);assert(pool != nullptr);// 循环处理任务,直到线程池停止while (pool->is_running_) {// 加锁pthread_mutex_lock(&pool->mutex_);// 任务队列为空且线程池运行中,阻塞等待while (pool->tasks_.empty() && pool->is_running_) {pthread_cond_wait(&pool->cond_, &pool->mutex_);}// 如果线程池已停止,解锁并退出if (!pool->is_running_) {pthread_mutex_unlock(&pool->mutex_);break;}// 从队列中取出任务T task = pool->tasks_.front();pool->tasks_.pop();// 解锁(任务处理不需要持有锁)pthread_mutex_unlock(&pool->mutex_);// 执行任务task.Run();std::cout << "线程 " << pthread_self() << ":执行任务 " << task.GetResult() << std::endl;}return nullptr;}private:std::vector<pthread_t> threads_; // 线程数组std::queue<T> tasks_; // 任务队列int thread_num_; // 线程数量bool is_running_; // 线程池是否运行pthread_mutex_t mutex_; // 保护任务队列的互斥锁pthread_cond_t cond_; // 唤醒线程的条件变量
};
2.3.1 关键细节:类内线程函数的问题
C++类的非静态成员函数默认带有this
指针,而pthread_create
要求线程函数的签名是void* (*)(void*)
,无法直接传递非静态成员函数。解决方案是:
- 定义静态成员函数
ThreadFunc
:静态函数没有this
指针,符合pthread_create
的要求; - 传递
this
指针作为参数:在Start
中创建线程时,把this
传给ThreadFunc
,函数内部再强转为ThreadPool*
,从而访问类的成员。
2.3.2 任务处理的逻辑
线程池中的线程会循环执行以下逻辑:
- 加锁,检查任务队列是否为空;
- 若为空且线程池运行中,调用
pthread_cond_wait
阻塞,释放互斥锁; - 有新任务时,被唤醒,取出任务,解锁;
- 执行任务(任务处理不需要持有锁,提高并发度);
- 线程池停止时,退出循环。
2.3.3 线程池的测试
用之前的Task
类测试线程池:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <cstdlib>
#include <ctime>int main() {// 创建线程池(5个线程)ThreadPool<Task> pool(5);pool.Start();// 提交10个任务srand(time(nullptr));char ops[] = {'+', '-', '*', '/'};int op_cnt = sizeof(ops) / sizeof(ops[0]);for (int i = 0; i < 10; ++i) {int a = rand() % 20 + 1;int b = rand() % 10;char op = ops[rand() % op_cnt];Task task(a, b, op);pool.Submit(task);usleep(200000); // 模拟任务提交间隔}// 等待所有任务执行完毕(实际项目中需更优雅的等待方式)sleep(5);// 停止线程池pool.Stop();return 0;
}
运行结果(节选):
线程池:创建线程 140698473785088
线程池:创建线程 140698465392384
线程池:创建线程 140698457000704
线程池:创建线程 140698448608000
线程池:创建线程 140698440215296
线程池:提交任务 15+3 = (未执行)
线程 140698473785088:执行任务 15+3 = 18(成功)
线程池:提交任务 8-5 = (未执行)
线程 140698465392384:执行任务 8-5 = 3(成功)
...
线程池:线程 140698473785088 退出
线程池:线程 140698465392384 退出
你会看到线程池中的线程被复用,处理多个任务,避免了频繁创建销毁线程的开销。
2.4 线程池的优化方向
基础线程池可以满足简单场景,但实际项目中还需要优化:
- 任务优先级:给任务添加优先级,任务队列用优先队列(
std::priority_queue
); - 线程数量动态调整:根据任务队列长度动态增加/减少线程(避免线程过多导致CPU调度开销);
- 任务执行结果返回:用
std::future
和std::promise
让外部线程获取任务执行结果; - 异常处理:在任务执行中捕获异常,避免单个任务崩溃导致整个线程退出。
三、线程的面向对象封装:简化多线程开发
Linux原生线程库(pthread)是面向过程的,使用起来不够直观,比如创建线程需要手动管理pthread_t
、传递参数等。我们可以将线程封装成C++类,隐藏底层细节,提供更友好的接口,类似C++11的std::thread
。
3.1 线程封装的核心目标
- 隐藏底层接口:不需要手动调用
pthread_create
、pthread_join
等函数; - 支持任务回调:可以传递任意可调用对象(函数、函数对象、lambda)作为线程任务;
- 管理线程属性:比如线程ID、名字、启动时间、运行状态等;
- 支持线程控制:提供
start
、join
、isRunning
等方法。
3.2 线程类的实现
我们封装一个Thread
类,支持任务回调和线程属性管理:
#include <pthread.h>
#include <iostream>
#include <cassert>
#include <string>
#include <ctime>
#include <functional> // 用于std::functionclass Thread {
public:// 任务类型:无返回值,无参数(可扩展为带参数)using Task = std::function<void()>;// 构造函数:传递任务和线程名Thread(Task task, const std::string& name = "Unknown") : task_(std::move(task)), name_(name), tid_(0), start_time_(0), is_running_(false) {}~Thread() {// 线程未分离且未join,自动分离(避免僵尸线程)if (is_running_ && pthread_joinable(tid_)) {pthread_detach(tid_);std::cout << "线程 " << name_ << ":自动分离" << std::endl;}}// 启动线程void start() {if (is_running_) return;is_running_ = true;// 记录启动时间start_time_ = time(nullptr);// 创建线程,传递this指针int ret = pthread_create(&tid_, nullptr, threadFunc, this);assert(ret == 0);std::cout << "线程 " << name_ << ":启动(ID=" << tid_ << ")" << std::endl;}// 等待线程结束void join() {if (!is_running_ || !pthread_joinable(tid_)) return;pthread_join(tid_, nullptr);is_running_ = false;std::cout << "线程 " << name_ << ":join成功" << std::endl;}// 获取线程IDpthread_t getTid() const { return tid_; }// 获取线程名std::string getName() const { return name_; }// 获取启动时间(时间戳)time_t getStartTime() const { return start_time_; }// 判断线程是否运行bool isRunning() const { return is_running_; }private:// 静态线程函数static void* threadFunc(void* arg) {Thread* thread = static_cast<Thread*>(arg);assert(thread != nullptr);// 执行任务if (thread->task_) {thread->task_();}thread->is_running_ = false;return nullptr;}private:Task task_; // 线程任务std::string name_; // 线程名pthread_t tid_; // 线程IDtime_t start_time_; // 启动时间戳bool is_running_; // 是否运行
};
3.2.1 关键细节:任务类型与参数传递
我们用std::function<void()>
作为任务类型,支持任意可调用对象:
- 普通函数;
- 函数对象;
- lambda表达式;
- 绑定了参数的函数(用
std::bind
)。
如果需要传递参数,可以扩展任务类型为std::function<void(Args...)>
,并在构造函数中传递参数,用std::bind
绑定到任务中。
3.2.2 线程类的测试
用lambda表达式和普通函数测试线程类:
#include "Thread.hpp"
#include <unistd.h>// 普通函数任务
void printHello() {for (int i = 0; i < 3; ++i) {std::cout << "普通函数:Hello " << i << std::endl;usleep(500000);}
}int main() {// 1. 测试普通函数任务Thread t1(printHello, "Thread-1");t1.start();t1.join();// 2. 测试lambda任务(带参数)int count = 3;Thread t2([&count]() {for (int i = 0; i < count; ++i) {std::cout << "Lambda:Count " << i << std::endl;usleep(500000);}}, "Thread-2");t2.start();t2.join();// 3. 测试线程属性std::cout << "Thread-2 启动时间:" << ctime(&t2.getStartTime());std::cout << "Thread-2 是否运行:" << (t2.isRunning() ? "是" : "否") << std::endl;return 0;
}
运行结果:
线程 Thread-1:启动(ID=140702542288640)
普通函数:Hello 0
普通函数:Hello 1
普通函数:Hello 2
线程 Thread-1:join成功
线程 Thread-2:启动(ID=140702533895936)
Lambda:Count 0
Lambda:Count 1
Lambda:Count 2
线程 Thread-2:join成功
Thread-2 启动时间:Wed Oct 11 15:30:00 2024
Thread-2 是否运行:否
通过封装,线程的创建和管理变得非常直观,不需要关注底层的pthread
接口。
四、补充概念:多线程开发的关键细节
除了上述核心内容,还有一些关键概念和问题需要掌握,避免在实际项目中踩坑。
4.1 STL容器的线程安全问题
STL大部分容器不是线程安全的,比如std::vector
、std::queue
、std::map
等,原因是为了效率——如果每个操作都加锁,会严重影响并发性能。
4.1.1 为什么STL容器不是线程安全的?
以std::vector::push_back
为例,它可能包含以下步骤:
- 检查容量是否足够;
- 若不足,扩容(分配新内存、拷贝旧数据、释放旧内存);
- 将新元素放到数组末尾。
多线程同时调用push_back
时,可能导致:
- 扩容过程中数据拷贝被中断,导致数据损坏;
- 多个线程把元素放到同一个位置,数据覆盖。
4.1.2 如何保证STL容器的线程安全?
- 手动加锁:在访问容器的临界区前后加互斥锁,确保同一时间只有一个线程访问;
- 使用线程安全的容器:比如Boost库的
boost::thread_safe_queue
,或C++20的std::jthread
配合std::queue
的线程安全包装; - 避免共享容器:尽量让每个线程使用独立的容器,减少共享。
4.2 自旋锁:轻量级的互斥机制
在之前的例子中,我们用的是互斥锁(pthread_mutex_t),线程申请不到锁时会阻塞,切换到其他线程,存在上下文切换的开销。而自旋锁的核心思想是“忙等”——线程申请不到锁时,不阻塞,而是循环检查锁是否可用,直到申请到。
4.2.1 自旋锁的适用场景
- 临界区执行时间短:比如只修改一个变量,循环几次就能完成;
- CPU密集型场景:上下文切换开销大于忙等的开销;
- 多CPU环境:单CPU环境下,自旋锁会导致CPU 100%占用,其他线程无法运行。
4.2.2 自旋锁的实现(基于pthread)
Linux提供了自旋锁的接口pthread_spinlock_t
:
#include <pthread.h>
#include <iostream>
#include <unistd.h>pthread_spinlock_t spinlock;
int count = 0;void* increment(void* arg) {int id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 100000; ++i) {// 申请自旋锁(忙等)pthread_spin_lock(&spinlock);count++;// 释放自旋锁pthread_spin_unlock(&spinlock);}std::cout << "线程 " << id << ":count = " << count << std::endl;return nullptr;
}int main() {pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE); // 线程间共享pthread_t t1, t2;int* id1 = new int(1);int* id2 = new int(2);pthread_create(&t1, nullptr, increment, id1);pthread_create(&t2, nullptr, increment, id2);pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_spin_destroy(&spinlock);std::cout << "最终 count = " << count << std::endl;return 0;
}
4.2.3 自旋锁与互斥锁的区别
特性 | 互斥锁(pthread_mutex_t) | 自旋锁(pthread_spinlock_t) |
---|---|---|
申请不到锁时的行为 | 阻塞,切换线程 | 忙等,不切换线程 |
上下文切换开销 | 有 | 无 |
CPU占用 | 低(阻塞时不占用CPU) | 高(忙等时占用CPU) |
适用临界区 | 执行时间长 | 执行时间短 |
4.3 单例模式的线程安全
单例模式是一种常用的设计模式,确保一个类只有一个实例。但在多线程环境下, naive的单例模式会存在线程安全问题。
4.3.1 不安全的单例模式(懒汉式)
class Singleton {
public:static Singleton* getInstance() {if (instance_ == nullptr) { // 第一次检查// 线程A和B同时进入,都会创建实例instance_ = new Singleton();}return instance_;}private:Singleton() {} // 私有构造static Singleton* instance_;
};Singleton* Singleton::instance_ = nullptr;
问题:多个线程同时进入if (instance_ == nullptr)
,会创建多个实例,违反单例原则。
4.3.2 线程安全的单例模式(双重检查锁定)
#include <pthread.h>class Singleton {
public:static Singleton* getInstance() {if (instance_ == nullptr) { // 第一次检查(无锁,提高效率)pthread_mutex_lock(&mutex_); // 加锁if (instance_ == nullptr) { // 第二次检查(有锁,确保唯一)instance_ = new Singleton();}pthread_mutex_unlock(&mutex_); // 解锁}return instance_;}private:Singleton() {}static Singleton* instance_;static pthread_mutex_t mutex_;
};// 初始化
Singleton* Singleton::instance_ = nullptr;
pthread_mutex_t Singleton::mutex_ = PTHREAD_MUTEX_INITIALIZER;
双重检查锁定的优点:
- 第一次检查无锁,避免每次调用
getInstance
都加锁,提高效率; - 第二次检查有锁,确保只有一个线程创建实例。
4.3.3 C++11后的更简单实现
C++11规定,局部静态变量的初始化是线程安全的,因此可以简化单例模式:
class Singleton {
public:static Singleton& getInstance() {static Singleton instance; // 局部静态变量,线程安全初始化return instance;}private:Singleton() {}
};
这种方式无需手动加锁,简洁且线程安全,推荐在C++11及以上环境使用。
五、总结:多线程开发的核心思想
Linux多线程开发涉及的知识点很多,但核心思想可以总结为以下几点:
5.1 平衡效率与安全
- 互斥:保证数据安全,避免竞争条件(如用互斥锁、自旋锁);
- 同步:保证线程按预期顺序执行(如用条件变量、信号量);
- 最小临界区:临界区越小,线程阻塞时间越短,并发效率越高;
- 避免过度同步:不需要同步的地方不要加锁,避免性能损耗。
5.2 复用与池化
- 线程复用:用线程池避免频繁创建销毁线程的开销;
- 资源复用:用池化技术(内存池、连接池)减少与内核的交互,提高响应速度。
5.3 封装与抽象
- 隐藏底层细节:用面向对象封装简化复杂接口(如线程类、线程池类);
- 解耦:用生产者-消费者模型解耦生产和消费逻辑,提高代码可维护性。
5.4 实战中的注意事项
- 线程安全:STL容器、全局变量、静态变量需要加锁保护;
- 死锁:避免循环等待、重复加锁,固定加锁顺序;
- 性能:根据场景选择互斥锁或自旋锁,线程池大小要合理(一般为CPU核心数的1~2倍)。
多线程开发没有银弹,关键是理解每个技术的适用场景,结合实际项目选择合适的方案。希望这篇总结能帮你梳理清楚Linux多线程的核心知识点,在实际开发中少踩坑,写出高效、安全的多线程代码!# Linux系统多线程总结:从底层原理到工程化实战
在Linux多线程编程的学习中,我们始终围绕两个核心矛盾展开:如何保证共享数据的安全性(互斥) 与如何让线程按预期顺序协同工作(同步)。从最初的互斥锁、条件变量,到环形队列、信号量,再到工程中不可或缺的线程池,每一步都是对“效率”与“安全”的平衡。
这篇总结会沿着实战路线,从环形队列与信号量的深度结合讲起,逐步扩展到多生产多消费模型,再深入线程池的设计与实现,最后通过线程的面向对象封装简化开发。每个部分都会结合具体场景、代码示例和底层原理,帮你搞懂“为什么这么做”而非单纯记API——毕竟,只有理解原理,才能在复杂场景中灵活应对。
一、环形队列与信号量:同步的高效实现
在之前的生产者-消费者模型中,我们用阻塞队列(基于std::queue
)实现了线程同步,但线性结构的阻塞队列在高并发下可能存在内存碎片和效率瓶颈。而环形队列通过数组模拟环形结构,用模运算实现下标循环,能更高效地利用内存;配合信号量(本质是原子计数器),可大幅简化同步逻辑,避免手动判空判满的复杂条件判断。
1.1 环形队列:数组模拟的“循环容器”
环形队列的核心是“用线性数组模拟环形空间”,通过两个下标(生产者下标prod_idx
、消费者下标cons_idx
)追踪生产和消费位置,再用模运算让下标“绕回”数组开头,实现“环形”效果。
1.1.1 环形队列的核心原理
假设用数组buf
模拟环形队列,容量为cap
,那么:
- 生产者行为:每次生产数据后,
prod_idx = (prod_idx + 1) % cap
——下标超过数组长度时自动回到0; - 消费者行为:每次消费数据后,
cons_idx = (cons_idx + 1) % cap
——同理循环; - 核心问题:当队列空或满时,
prod_idx
和cons_idx
会指向同一个位置,如何区分“空”和“满”?
文档中提到两种经典解决方案,我们逐一拆解:
方法1:用计数器判断(直观易懂)
在队列中维护一个计数器count
,记录当前元素个数:
- 空:
count == 0
; - 满:
count == cap
; - 生产时
count++
,消费时count--
。
这种方法的优点是逻辑简单,缺点是多线程下count
需要加锁保护(后续会用信号量规避这个问题)。
方法2:空出一个位置(无需计数器)
故意让队列始终空出一个位置不存储数据,通过下标关系判断:
- 空:
prod_idx == cons_idx
; - 满:
(prod_idx + 1) % cap == cons_idx
;
比如容量为5的队列,最多存储4个元素——当生产者下标prod_idx=4
(数组最后一位),下一个位置(4+1)%5=0
,若此时cons_idx=0
,则队列满。
这种方法无需计数器,但需要接受“浪费一个位置”的设计,适合对内存效率要求不极致的场景。
1.1.2 环形队列的C++实现(基于vector)
我们选择“空出一个位置”的判空判满方式,用vector
作为底层存储,先实现单生产单消费版本:
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <cassert>template <typename T>
class RingQueue {
public:// 构造函数:指定队列容量,默认10explicit RingQueue(int cap = 10) : cap_(cap), buf_(cap), prod_idx_(0), cons_idx_(0) {// 初始化信号量:空间信号量(初始值=容量)、数据信号量(初始值=0)int ret = sem_init(&sem_space_, 0, cap_);assert(ret == 0); // 断言确保初始化成功,实际项目可抛异常ret = sem_init(&sem_data_, 0, 0);assert(ret == 0);}~RingQueue() {// 销毁信号量(避免资源泄漏)sem_destroy(&sem_space_);sem_destroy(&sem_data_);}// 生产者:向队列中放数据void Push(const T& data) {// 1. P操作:申请空间资源(空间信号量-1,无空间则阻塞)sem_wait(&sem_space_);// 2. 放数据到当前生产者下标位置buf_[prod_idx_] = data;std::cout << "[生产者] 放数据 " << data << " 到位置 " << prod_idx_ << std::endl;// 3. 更新生产者下标(环形递增:模运算实现循环)prod_idx_ = (prod_idx_ + 1) % cap_;// 4. V操作:释放数据资源(数据信号量+1,唤醒等待的消费者)sem_post(&sem_data_);}// 消费者:从队列中拿数据T Pop() {// 1. P操作:申请数据资源(数据信号量-1,无数据则阻塞)sem_wait(&sem_data_);// 2. 从当前消费者下标位置拿数据T data = buf_[cons_idx_];std::cout << "[消费者] 拿数据 " << data << " 从位置 " << cons_idx_ << std::endl;// 3. 更新消费者下标(环形递增)cons_idx_ = (cons_idx_ + 1) % cap_;// 4. V操作:释放空间资源(空间信号量+1,唤醒等待的生产者)sem_post(&sem_space_);return data;}private:std::vector<T> buf_; // 底层存储数组int cap_; // 队列容量(实际存储cap_-1个元素)int prod_idx_; // 生产者下标int cons_idx_; // 消费者下标sem_t sem_space_; // 空间信号量:记录空闲位置数sem_t sem_data_; // 数据信号量:记录可用数据数
};
1.1.3 信号量:简化同步的“原子计数器”
上述代码中,我们引入了两个信号量sem_space_
和sem_data_
,这是环形队列同步的核心。很多人会问:“信号量到底是什么?为什么能替代条件变量的复杂判断?”
-
信号量的本质:一个原子计数器,用于描述“可用资源的数量”,提供两个核心操作:
- P操作(申请资源):
sem_wait
,计数器减1;若计数器<0,线程阻塞(进入等待队列); - V操作(释放资源):
sem_post
,计数器加1;若计数器<=0,唤醒一个阻塞线程。
- P操作(申请资源):
-
空间信号量(sem_space_):
- 初始值=队列容量
cap_
:表示初始时有cap_
个空闲位置; - 生产者生产前必须执行P操作:申请一个空闲位置,没位置就阻塞;
- 消费者消费后必须执行V操作:释放一个空闲位置,让生产者继续生产。
- 初始值=队列容量
-
数据信号量(sem_data_):
- 初始值=0:表示初始时没有可用数据;
- 消费者消费前必须执行P操作:申请一个数据,没数据就阻塞;
- 生产者生产后必须执行V操作:释放一个数据,让消费者继续消费。
通过信号量,我们巧妙地规避了“手动判空判满”——信号量会自动管理资源状态:申请不到就阻塞,申请到就直接操作,代码比用条件变量简洁至少50%!
1.1.4 单生产单消费的测试
创建一个生产者线程和一个消费者线程,测试环形队列的同步效果:
#include "RingQueue.hpp"
#include <unistd.h>
#include <cstdlib>
#include <ctime>// 全局环形队列(容量5,实际存储4个元素)
RingQueue<int> rq(5);// 生产者线程:生成1~10的随机数
void* producer(void* arg) {int prod_id = *(int*)arg;delete (int*)arg; // 释放传入的ID内存srand(time(nullptr)); // 初始化随机数种子for (int i = 0; i < 10; ++i) {int data = rand() % 10 + 1; // 生成1~10的随机数rq.Push(data);usleep(500000); // 模拟生产耗时(0.5秒/个)}std::cout << "[生产者" << prod_id << "] 生产完毕!" << std::endl;return nullptr;
}// 消费者线程:消费数据并打印
void* consumer(void* arg) {int cons_id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 10; ++i) {int data = rq.Pop();usleep(1000000); // 模拟消费耗时(1秒/个)}std::cout << "[消费者" << cons_id << "] 消费完毕!" << std::endl;return nullptr;
}int main() {pthread_t prod_tid, cons_tid;// 创建生产者线程(传递ID=1)int* prod_id = new int(1);pthread_create(&prod_tid, nullptr, producer, prod_id);// 创建消费者线程(传递ID=1)int* cons_id = new int(1);pthread_create(&cons_tid, nullptr, consumer, cons_id);// 等待线程结束(避免主线程提前退出)pthread_join(prod_tid, nullptr);pthread_join(cons_tid, nullptr);return 0;
}
编译运行(需链接pthread
库):
g++ main.cpp -o ring_queue -lpthread
./ring_queue
运行结果(节选):
[生产者] 放数据 5 到位置 0
[消费者] 拿数据 5 从位置 0
[生产者] 放数据 3 到位置 1
[生产者] 放数据 8 到位置 2
[消费者] 拿数据 3 从位置 1
[生产者] 放数据 2 到位置 3
[消费者] 拿数据 8 从位置 2
[生产者] 放数据 7 到位置 4 # 队列满((4+1)%5=0 == cons_idx=3?不,此时cons_idx=3,队列还能放,直到prod_idx=0时满)
[消费者] 拿数据 2 从位置 3
[生产者] 放数据 9 到位置 0 # 消费者释放位置3后,生产者才能放数据到0
...
你会观察到一个关键现象:
生产者生产速度快(0.5秒/个),消费者消费慢(1秒/个),所以生产者会先填满队列(容量5,实际存4个),然后阻塞;消费者每消费一个,生产者就被唤醒并生产一个,形成“消费一个→生产一个”的节奏——这正是信号量同步的效果。
1.2 从单生产单消费到多生产多消费
单生产单消费能满足简单场景,但实际项目中往往需要多个生产者(比如多个客户端发送请求)和多个消费者(多个线程处理请求)。这时候会遇到一个核心问题:生产者之间的下标竞争和消费者之间的下标竞争。
1.2.1 多生产多消费的“隐形坑”:下标竞争
在多生产者场景下,多个生产者会同时修改prod_idx
:
- 生产者A计算出
prod_idx=2
,还没更新就被线程调度切换; - 生产者B也计算出
prod_idx=2
,更新为3; - 生产者A恢复后,把
prod_idx
更新为3——两个生产者把数据放到了同一个位置,数据覆盖!
消费者之间也存在同样问题:多个消费者会竞争cons_idx
,导致重复消费。
1.2.2 解决方案:两把互斥锁,而非一把
为了解决竞争,我们需要给生产者和消费者分别加锁:
- 生产者锁(prod_mutex_):保护
prod_idx
的修改,确保同一时间只有一个生产者能更新下标; - 消费者锁(cons_mutex_):保护
cons_idx
的修改,确保同一时间只有一个消费者能更新下标。
为什么用两把锁,而不是一把?
因为生产和消费可以并发进行(只要队列不为空且不满):生产者加生产者锁,消费者加消费者锁,两者互不干扰,能大幅提高并发效率。比如生产者在放数据时,消费者可以同时拿数据,无需等待对方释放锁。
1.2.3 多生产多消费的代码优化
修改RingQueue
类,添加两把互斥锁,并调整Push
和Pop
方法:
template <typename T>
class RingQueue {
public:explicit RingQueue(int cap = 10) : cap_(cap), buf_(cap), prod_idx_(0), cons_idx_(0) {// 初始化信号量int ret = sem_init(&sem_space_, 0, cap_);assert(ret == 0);ret = sem_init(&sem_data_, 0, 0);assert(ret == 0);// 初始化两把互斥锁ret = pthread_mutex_init(&prod_mutex_, nullptr);assert(ret == 0);ret = pthread_mutex_init(&cons_mutex_, nullptr);assert(ret == 0);}~RingQueue() {sem_destroy(&sem_space_);sem_destroy(&sem_data_);pthread_mutex_destroy(&prod_mutex_);pthread_mutex_destroy(&cons_mutex_);}void Push(const T& data) {// 1. P操作:申请空间(信号量是原子操作,无需加锁)sem_wait(&sem_space_);// 2. 加生产者锁:保护prod_idx的修改pthread_mutex_lock(&prod_mutex_);// 3. 放数据+更新下标buf_[prod_idx_] = data;std::cout << "[生产者" << pthread_self() << "] 放数据 " << data << " 到位置 " << prod_idx_ << std::endl;prod_idx_ = (prod_idx_ + 1) % cap_;// 4. 解锁(释放生产者锁,让其他生产者可以修改下标)pthread_mutex_unlock(&prod_mutex_);// 5. V操作:释放数据资源sem_post(&sem_data_);}T Pop() {// 1. P操作:申请数据资源(先确保有数据,再竞争锁)sem_wait(&sem_data_);// 2. 加消费者锁:保护cons_idx的修改pthread_mutex_lock(&cons_mutex_);// 3. 拿数据+更新下标T data = buf_[cons_idx_];std::cout << "[消费者" << pthread_self() << "] 拿数据 " << data << " 从位置 " << cons_idx_ << std::endl;cons_idx_ = (cons_idx_ + 1) % cap_;// 4. 解锁(释放消费者锁,让其他消费者可以修改下标)pthread_mutex_unlock(&cons_mutex_);// 5. V操作:释放空间资源sem_post(&sem_space_);return data;}private:std::vector<T> buf_;int cap_;int prod_idx_;int cons_idx_;sem_t sem_space_;sem_t sem_data_;pthread_mutex_t prod_mutex_; // 生产者互斥锁(保护prod_idx)pthread_mutex_t cons_mutex_; // 消费者互斥锁(保护cons_idx)
};
1.2.4 关键细节:加锁时机为什么在信号量P操作之后?
这里有个容易被忽略但至关重要的设计:互斥锁加在信号量P操作之后,而非之前。很多人会疑惑:“先加锁再申请信号量,不是更安全吗?”其实恰恰相反,这样做会严重影响效率。
我们用生活中的场景类比:
- 信号量P操作 = 买电影票(申请资源,原子操作,无需排队);
- 互斥锁 = 电影院检票(进入影院前排队,确保有序)。
如果先加锁再申请信号量,相当于“先排队检票,再买票”——没买到票的人也得跟着排队,浪费所有人的时间;而先申请信号量(买票),再加锁(检票),只有买到票的人才需要排队,效率更高。
在代码中,信号量P操作是原子的,多个生产者可以同时申请信号量(比如3个生产者同时申请空间,只要有空间就会成功),申请成功后再排队加锁修改下标。这样能减少线程阻塞在锁上的时间:没申请到信号量的线程不会进入锁的等待队列,避免“无效排队”。
举个具体例子:
假设队列只剩1个空间,3个生产者同时执行sem_wait(&sem_space_)
,只有1个生产者能申请成功,另外2个会阻塞在信号量上;申请成功的生产者加锁修改下标后释放锁,此时另外2个生产者仍阻塞在信号量上,不会竞争锁——如果先加锁,3个生产者会先竞争锁,1个拿到锁后发现信号量申请失败,阻塞时还持有锁,导致另外2个生产者也阻塞在锁上,效率极低。
1.2.5 多生产多消费的测试验证
创建3个生产者和2个消费者,测试并发场景下的数据安全性:
int main() {pthread_t prod_tids[3], cons_tids[2];// 创建3个生产者线程(传递不同ID,避免随机数重复)for (int i = 0; i < 3; ++i) {int* prod_id = new int(i + 1);pthread_create(&prod_tids[i], nullptr, producer, prod_id);}// 创建2个消费者线程for (int i = 0; i < 2; ++i) {int* cons_id = new int(i + 1);pthread_create(&cons_tids[i], nullptr, consumer, cons_id);}// 等待所有线程结束for (int i = 0; i < 3; ++i) {pthread_join(prod_tids[i], nullptr);}for (int i = 0; i < 2; ++i) {pthread_join(cons_tids[i], nullptr);}return 0;
}// 修正生产者函数:添加ID打印,避免随机数重复
void* producer(void* arg) {int prod_id = *(int*)arg;delete (int*)arg;// 不同生产者用不同种子,避免生成相同随机数srand(time(nullptr) + prod_id);for (int i = 0; i < 5; ++i) { // 每个生产者生成5个数据,共15个int data = rand() % 20 + 1;rq.Push(data);std::cout << "[生产者" << prod_id << "] 生成数据 " << data << std::endl;usleep(300000); // 0.3秒/个,模拟生产耗时}std::cout << "[生产者" << prod_id << "] 生产完毕!" << std::endl;return nullptr;
}// 修正消费者函数:添加ID打印
void* consumer(void* arg) {int cons_id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 8; ++i) { // 2个消费者共处理15个数据,1个处理8个,1个处理7个int data = rq.Pop();std::cout << "[消费者" << cons_id << "] 处理数据 " << data << std::endl;usleep(500000); // 0.5秒/个,模拟消费耗时}std::cout << "[消费者" << cons_id << "] 消费完毕!" << std::endl;return nullptr;
}
运行结果(节选):
[生产者1] 生成数据 15
[生产者1] 放数据 15 到位置 0
[生产者2] 生成数据 8
[生产者2] 放数据 8 到位置 1
[生产者3] 生成数据 22
[生产者3] 放数据 22 到位置 2
[消费者1] 拿数据 15 从位置 0
[消费者1] 处理数据 15
[生产者1] 生成数据 7
[生产者1] 放数据 7 到位置 3
[消费者2] 拿数据 8 从位置 1
[消费者2] 处理数据 8
...
[生产者3] 生产完毕!
[生产者2] 生产完毕!
[生产者1] 生产完毕!
[消费者1] 消费完毕!
[消费者2] 消费完毕!
从结果能看到:
3个生产者并发生成数据,2个消费者并发处理数据,没有出现数据覆盖(同一位置被多次写入)或重复消费(同一数据被多次读取)——因为两把互斥锁分别保护了prod_idx
和cons_idx
的修改,信号量保证了空间和数据的可用性。
1.3 任务化改造:从“传递数据”到“传递任务”
之前的例子中,环形队列传递的是整数,但实际项目中,我们更需要传递“任务”(比如计算任务、IO任务、网络请求任务)。比如在后端服务中,生产者可能是接收客户端请求的线程,消费者是处理请求的线程,此时传递的应该是“请求任务”而非单纯的整数。
我们可以定义一个Task
类,封装任务的“数据”和“处理逻辑”,让环形队列存储Task
对象,实现“生产者生成任务,消费者执行任务”的异步模型——这也是线程池的核心雏形。
1.3.1 Task类的实现(支持加减乘除与异常处理)
#include <string>
#include <iostream>class Task {
public:// 构造函数:传入操作数和操作符Task(int a, int b, char op) : a_(a), b_(b), op_(op), result_(0), exit_code_(0) {}// 执行任务:处理加减乘除,处理异常(如除零)void Run() {switch (op_) {case '+':result_ = a_ + b_;break;case '-':result_ = a_ - b_;break;case '*':result_ = a_ * b_;break;case '/':if (b_ == 0) {exit_code_ = 1; // 除零错误标记result_ = 0;} else {result_ = a_ / b_;}break;default:exit_code_ = 2; // 无效操作符标记result_ = 0;}}// 获取任务结果描述(含错误信息)std::string GetResultDesc() const {std::string desc = std::to_string(a_) + " " + op_ + " " + std::to_string(b_) + " = ";if (exit_code_ == 0) {desc += std::to_string(result_) + "(成功)";} else if (exit_code_ == 1) {desc += "错误:除零(分母为0)";} else {desc += "错误:无效操作符(仅支持+、-、*、/)";}return desc;}private:int a_; // 操作数1int b_; // 操作数2char op_; // 操作符(+、-、*、/)int result_; // 计算结果(仅exit_code_=0时有效)int exit_code_; // 退出码:0成功,1除零,2无效操作符
};
Task
类的核心是Run
方法:
- 按照操作符执行计算,处理除零和无效操作符的异常;
- 用
exit_code_
标记任务执行状态,避免异常崩溃; GetResultDesc
方法返回人类可读的结果描述,方便调试。
1.3.2 任务化环形队列的测试
修改生产者和消费者,让它们生成和执行Task
对象:
// 全局环形队列(存储Task对象,容量5)
RingQueue<Task> rq(5);// 生产者:生成计算任务
void* task_producer(void* arg) {int prod_id = *(int*)arg;delete (int*)arg;srand(time(nullptr) + prod_id);char ops[] = {'+', '-', '*', '/'}; // 支持的操作符int op_cnt = sizeof(ops) / sizeof(ops[0]);for (int i = 0; i < 5; ++i) {// 生成随机操作数(a:1~20,b:0~10,故意让b可能为0,测试除零错误)int a = rand() % 20 + 1;int b = rand() % 11;char op = ops[rand() % op_cnt];// 构造任务并放入队列Task task(a, b, op);rq.Push(task);std::cout << "[任务生产者" << prod_id << "] 生成任务:" << task.GetResultDesc() << std::endl;usleep(400000); // 0.4秒/个,模拟任务生成耗时}std::cout << "[任务生产者" << prod_id << "] 任务生成完毕!" << std::endl;return nullptr;
}// 消费者:执行计算任务
void* task_consumer(void* arg) {int cons_id = *(int*)arg;delete (int*)arg;for (int i = 0; i < 8; ++i) {// 从队列获取任务Task task = rq.Pop();// 执行任务task.Run();// 输出执行结果std::cout << "[任务消费者" << cons_id << "] 执行任务:" << task.GetResultDesc() << std::endl;usleep(600000); // 0.6秒/个,模拟任务执行耗时}std::cout << "[任务消费者" << cons_id << "] 任务执行完毕!" << std::endl;return nullptr;
}int main() {pthread_t prod_tids[2], cons_tids[2];// 创建2个任务生产者for (int i = 0; i < 2; ++i) {int* prod_id = new int(i + 1);pthread_create(&prod_tids[i], nullptr, task_producer, prod_id);}// 创建2个任务消费者for (int i = 0; i < 2; ++i) {int* cons_id = new int(i + 1);pthread_create(&cons_tids[i], nullptr, task_consumer, cons_id);}// 等待线程结束for (int i = 0; i < 2; ++i) {pthread_join(prod_tids[i], nullptr);}for (int i = 0; i < 2; ++i) {pthread_join(cons_tids[i], nullptr);}return 0;
}
运行结果(节选):
[任务生产者1] 生成任务:15 + 3 = (未执行)
[任务生产者1] 放数据 Task(15,3,+) 到位置 0
[任务生产者2] 生成任务:8 / 0 = (未执行)
[任务生产者2] 放数据 Task(8,0,/) 到位置 1
[任务消费者1] 拿数据 Task(15,3,+) 从位置 0
[任务消费者1] 执行任务:15 + 3 = 18(成功)
[任务消费者2] 拿数据 Task(8,0,/) 从位置 1
[任务消费者2] 执行任务:8 / 0 = 错误:除零(分母为0)
[任务生产者1] 生成任务:7 * 5 = (未执行)
[任务生产者1] 放数据 Task(7,5,*) 到位置 2
...
从结果能看到任务化的优势:
- 解耦生产和消费:生产者只负责生成任务(比如接收请求),不关心如何执行;消费者只负责执行任务(比如处理请求),不关心任务来源;
- 支持异常处理:即使任务存在除零错误,也不会导致消费者线程崩溃,只会标记错误状态;
- 扩展性强:如果需要新增任务类型(比如字符串处理、数据库操作),只需继承
Task
类并重写Run
方法,无需修改环形队列或线程逻辑。
二、线程池:多线程的工程化落地
在实际项目中,频繁创建和销毁线程会带来巨大的系统开销——创建线程需要内核分配PCB、栈空间,销毁线程需要回收资源,这些操作的耗时远大于简单任务的执行时间(比如一个计算任务可能只需要1ms,而创建线程需要10ms)。
线程池的核心思想就是“预先创建一批线程,复用线程处理多个任务”,以“空间换时间”,减少线程创建销毁的开销,提高系统响应速度。
2.1 为什么需要“池化技术”?
在计算机领域,“池化技术”无处不在,比如:
- 内存池:预先申请一块大内存,避免频繁
malloc/free
导致的内存碎片; - 连接池:预先创建一批数据库连接,避免频繁建立TCP连接的耗时;
- 线程池:预先创建一批线程,避免频繁创建销毁线程的开销。
它们的本质都是“提前占用一部分资源,减少与内核的交互次数”。举个具体例子:
如果每次有任务来才创建线程,处理完销毁,1000个任务的总耗时可能是1000*(10ms创建+1ms处理+10ms销毁) = 21000ms
;而用线程池(预先创建10个线程),总耗时是100ms创建线程 + 1000*1ms处理 = 1100ms
,效率提升20倍!
2.2 线程池的核心组件
一个基础的线程池需要包含以下5个核心组件,缺一不可:
- 任务队列:存储待处理的任务,由外部线程提交;
- 线程数组:预先创建的线程,循环从任务队列中取任务执行;
- 互斥锁:保护任务队列的访问,避免多线程竞争(比如多个线程同时取任务);
- 条件变量:当任务队列为空时,线程阻塞等待;有新任务时,唤醒线程;
- 控制变量:标记线程池是否运行(比如
is_running_
),用于安全停止线程池。
2.3 线程池的C++实现(支持任务提交与线程复用)
我们实现一个支持任意任务类型的线程池,基于模板和std::function
实现任务回调,兼容函数、函数对象、lambda表达式等可调用对象。
2.3.1 线程池的类设计
#include <vector>
#include <queue>
#include <pthread.h>
#include <iostream>
#include <cassert>
#include <functional> // 用于std::function
#include <unistd.h>template <typename T>
class ThreadPool {
public:// 任务类型:接受任意可调用对象(无返回值,可扩展为带返回值)using Task = std::function<void(T&)>;// 构造函数:指定线程数,默认5个explicit ThreadPool(int thread_num = 5) : thread_num_(thread_num), is_running_(false) {// 初始化互斥锁和条件变量int ret = pthread_mutex_init(&mutex_, nullptr);assert(ret == 0);ret = pthread_cond_init(&cond_, nullptr);assert(ret == 0);}~ThreadPool() {// 停止线程池,避免内存泄漏Stop();// 销毁互斥锁和条件变量pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}// 启动线程池:创建预先定义的线程void Start() {if (is_running_) return; // 避免重复启动is_running_ = true;// 初始化线程数组,创建thread_num_个线程threads_.resize(thread_num_);for (int i = 0; i < thread_num_; ++i) {// 线程函数必须是静态的(无this指针),传递this作为参数int ret = pthread_create(&threads_[i], nullptr, ThreadFunc, this);assert(ret == 0);std::cout << "[线程池] 创建线程 " << threads_[i] << "(编号" << i+1 << ")" << std::endl;}}// 停止线程池:唤醒所有线程,等待退出void Stop() {if (!is_running_) return; // 避免重复停止is_running_ = false;// 唤醒所有阻塞在条件变量上的线程(让它们退出循环)pthread_cond_broadcast(&cond_);// 等待所有线程退出,回收资源for (auto& tid : threads_) {pthread_join(tid, nullptr);std::cout << "[线程池] 线程 " << tid << " 退出" << std::endl;}threads_.clear(); // 清空线程数组}// 提交任务到任务队列void Submit(const T& task) {// 加锁保护任务队列(避免多线程同时提交任务)pthread_mutex_lock(&mutex_);// 向队列中添加任务tasks_.push(task);std::cout << "[线程池] 提交任务:" << task.GetResultDesc() << std::endl;// 唤醒一个阻塞的线程(有新任务了,让线程去执行)pthread_cond_signal(&cond_);// 解锁pthread_mutex_unlock(&mutex_);}private:// 静态线程函数:线程池内线程的执行逻辑static void* ThreadFunc(void* arg) {// 将参数强转为ThreadPool指针(获取线程池对象)ThreadPool* pool = static_cast<ThreadPool*>(arg);assert(pool != nullptr);// 循环处理任务,直到线程池停止while (pool->is_running_) {// 加锁:访问任务队列前必须加锁pthread_mutex_lock(&pool->mutex_);// 任务队列为空且线程池运行中:阻塞等待新任务// 注意用while而非if:避免伪唤醒(线程被唤醒后队列仍为空)while (pool->tasks_.empty() && pool->is_running_) {pthread_cond_wait(&pool->cond_, &pool->mutex_);}// 如果线程池已停止(is_running_为false),解锁并退出if (!pool->is_running_) {pthread_mutex_unlock(&pool->mutex_);break;}// 从队列头部取出任务(队列非空)T task = pool->tasks_.front();pool->tasks_.pop(); // 从队列中移除任务// 解锁:任务执行不需要持有锁,让其他线程可以提交/取任务pthread_mutex_unlock(&pool->mutex_);// 执行任务(核心逻辑:调用任务的Run方法)pool->task_handler_(task);std::cout << "[线程 " << pthread_self() << "] 执行任务完毕:" << task.GetResultDesc() << std::endl;}return nullptr;}// 任务处理函数:封装任务执行逻辑(可扩展)void task_handler_(T& task) {task.Run(); // 调用任务的Run方法执行计算}private:std::vector<pthread_t> threads_; // 线程数组:存储线程IDstd::queue<T> tasks_; // 任务队列:存储待处理任务int thread_num_; // 线程数量bool is_running_; // 线程池运行状态:true运行,false停止pthread_mutex_t mutex_; // 互斥锁:保护任务队列和is_running_pthread_cond_t cond_; // 条件变量:唤醒阻塞的线程Task task_handler_; // 任务处理回调(默认调用task.Run())
};
2.3.2 关键细节解析
线程池的实现有几个容易踩坑的细节,必须重点解释:
1. 静态线程函数的问题
C++类的非静态成员函数默认带有this
指针,而pthread_create
要求线程函数的签名是void* (*)(void*)
,无法直接传递非静态成员函数。解决方案是:
- 定义静态成员函数
ThreadFunc
:静态函数没有this
指针,符合pthread_create
的要求; - 传递
this
指针作为参数:在Start
中创建线程时,把thread_pool
的this
传给ThreadFunc
,函数内部再强转为ThreadPool*
,从而访问类的成员(如任务队列、互斥锁)。
2. 条件变量的“伪唤醒”问题
pthread_cond_wait
可能会出现“伪唤醒”——线程被唤醒后,任务队列仍然为空(比如多个线程被同时唤醒,但队列只有一个任务)。因此必须用while
循环判断队列是否为空,而非if
:
while (tasks_.empty() && is_running_)
:即使被伪唤醒,循环会再次检查队列,为空则继续阻塞;- 如果用
if
:伪唤醒后线程会直接取任务,导致访问空队列崩溃。
3. 任务执行的解锁时机
任务执行(task.Run()
)必须在解锁之后进行,原因是:
- 任务执行可能耗时较长(比如IO任务),如果持有锁执行,会导致其他线程无法提交任务或取任务,严重影响并发效率;
- 解锁后,其他线程可以同时提交任务或取任务,充分利用CPU资源。
2.4 线程池的测试(基于Task任务)
用之前的Task
类测试线程池,验证线程复用和任务处理能力:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>int main() {// 1. 创建线程池(3个线程)ThreadPool<Task> pool(3);pool.Start();// 2. 提交10个任务srand(time(nullptr));char ops[] = {'+', '-', '*', '/'};int op_cnt = sizeof(ops) / sizeof(ops[0]);for (int i = 0; i < 10; ++i) {int a = rand() % 20 + 1;int b = rand() % 10; // b:0~9,测试除零char op = ops[rand() % op_cnt];Task task(a, b, op);pool.Submit(task);usleep(200000); // 0.2秒/个,模拟任务提交间隔}// 3. 等待所有任务执行完毕(实际项目中需用更优雅的等待方式,如计数信号量)sleep(5);// 4. 停止线程池pool.Stop();return 0;
}
运行结果(节选):
[线程池] 创建线程 140703347826432(编号1)
[线程池] 创建线程 140703339433728(编号2)
[线程池] 创建线程 140703331041024(编号3)
[线程池] 提交任务:15 + 3 = (未执行)
[线程 140703347826432] 执行任务完毕:15 + 3 = 18(成功)
[线程池] 提交任务:8 / 0 = (未执行)
[线程 140703339433728] 执行任务完毕:8 / 0 = 错误:除零(分母为0)
[线程池] 提交任务:7 * 5 = (未执行)
[线程 140703331041024] 执行任务完毕:7 * 5 = 35(成功)
[线程池] 提交任务:12 - 4 = (未执行)
[线程 140703347826432] 执行任务完毕:12 - 4 = 8(成功)
...
[线程池] 线程 140703347826432 退出
[线程池] 线程 140703339433728 退出
[线程池] 线程 140703331041024 退出
从结果能看到线程池的核心优势:
- 线程复用:3个线程处理了10个任务,没有频繁创建销毁线程;
- 自动调度:新任务提交后,线程池自动唤醒空闲线程执行,无需手动管理线程;
- 安全停止:调用
Stop
后,所有线程会在执行完当前任务后退出,不会强制终止。
2.5 线程池的优化方向
基础线程池能满足简单场景,但实际项目中还需要针对以下场景优化:
1. 任务优先级
如果任务有紧急程度(比如“支付任务”比“日志任务”优先级高),可以将任务队列改为优先队列(std::priority_queue
),给任务添加优先级字段,让高优先级任务先执行。
2. 动态线程数量
固定线程数量可能导致资源浪费:
- 任务太多时,线程不够用,任务排队耗时;
- 任务太少时,线程空闲,浪费CPU资源。
可以根据任务队列长度动态调整线程数量:
- 队列长度超过阈值(如线程数的2倍),增加线程;
- 队列长度为0且线程空闲时间超过阈值,减少线程。
3. 任务执行结果返回
基础线程池无法获取任务执行结果,实际项目中可能需要知道任务是否执行成功(比如支付任务是否完成)。可以用std::future
和std::promise
:
- 提交任务时返回
std::future
; - 任务执行完毕后,用
std::promise
设置结果; - 外部线程通过
future.get()
获取结果(会阻塞直到结果就绪)。
4. 异常捕获
任务执行过程中可能抛出异常(如空指针、数组越界),如果不捕获,会导致线程崩溃。可以在task_handler_
中添加异常捕获:
void task_handler_(T& task) {try {task.Run();} catch (const std::exception& e) {std::cerr << "[任务处理异常] " << e.what() << std::endl;// 记录日志、重试任务等后续处理}
}
三、线程的面向对象封装:简化多线程开发
Linux原生线程库(pthread
)是面向过程的,使用起来不够直观——比如创建线程需要手动管理pthread_t
、传递参数时要处理指针类型转换,不利于代码的可读性和可维护性。
我们可以将线程封装成C++类,隐藏底层细节,提供类似C++11std::thread
的友好接口,让多线程开发更符合面向对象的思维习惯。
3.1 线程封装的核心目标
- 隐藏底层接口:不需要手动调用
pthread_create
、pthread_join
等函数; - 支持灵活任务:可以传递任意可调用对象(函数、函数对象、lambda)作为线程任务;
- 管理线程属性:比如线程ID、名字、启动时间、运行状态,方便调试和监控;
- 提供安全控制:支持
start
(启动)、join
(等待)、detach
(分离)等操作,避免僵尸线程。
3.2 线程类的实现(支持任务回调与属性管理)
#include <pthread.h>
#include <iostream>
#include <cassert>
#include <string>
#include <ctime>
#include <functional>
#include <unistd.h>class Thread {
public:// 任务类型:支持任意无参数、无返回值的可调用对象using Task = std::function<void()>;// 构造函数:传递任务和线程名(默认名"Unknown")Thread(Task task, const std::string& name = "Unknown") : task_(std::move(task)), name_(name), tid_(0), start_time_(0), is_running_(false) {}~Thread() {// 如果线程正在运行且可join:自动分离(避免僵尸线程)if (is_running_ && pthread_joinable(tid_)) {pthread_detach(tid_);std::cout << "[线程 " << name_ << "] 析构时自动分离(未join)" << std::endl;}}// 启动线程:创建底层pthread线程void Start() {if (is_running_) return; // 避免重复启动is_running_ = true;// 记录启动时间戳(用于监控线程运行时长)start_time_ = time(nullptr);// 创建线程:传递this指针作为参数int ret = pthread_create(&tid_, nullptr, ThreadFunc, this);assert(ret == 0); // 实际项目中可抛异常,此处用断言确保成功std::cout << "[线程 " << name_ << "] 启动(ID=" << tid_ << ")" << std::endl;}// 等待线程结束:回收线程资源void Join() {if (!is_running_ || !pthread_joinable(tid_)) return;pthread_join(tid_, nullptr);is_running_ = false;std::cout << "[线程 " << name_ << "] Join成功" << std::endl;}// 分离线程:线程结束后自动回收资源(无需join)void Detach() {if (!is_running_ || !pthread_joinable(tid_)) return;pthread_detach(tid_);is_running_ = false; // 分离后线程不再受当前对象控制std::cout << "[线程 " << name_ << "] Detach成功" << std::endl;}// 获取线程IDpthread_t GetTid() const { return tid_; }// 获取线程名std::string GetName() const { return name_; }// 获取启动时间(时间戳,可转为本地时间)time_t GetStartTime() const { return start_time_; }// 判断线程是否正在运行bool IsRunning() const { return is_running_; }private:// 静态线程函数:底层pthread线程的执行入口static void* ThreadFunc(void* arg) {// 将参数强转为Thread指针Thread* thread = static_cast<Thread*>(arg);assert(thread != nullptr);// 执行用户传递的任务if (thread->task_) {thread->task_();}// 任务执行完毕,标记线程为非运行状态thread->is_running_ = false;return nullptr;}private:Task task_; // 线程要执行的任务std::string name_; // 线程名(用于调试)pthread_t tid_; // 底层pthread线程IDtime_t start_time_; // 线程启动时间戳bool is_running_; // 线程运行状态
};
3.3 线程类的测试(支持多种任务类型)
我们用三种任务类型测试线程类:普通函数、函数对象、lambda表达式,验证封装的灵活性。
3.3.1 测试代码
#include "Thread.hpp"
#include <iostream>
#include <unistd.h>// 1. 普通函数任务
void PrintHello(const std::string& name) {for (int i = 0; i < 3; ++i) {std::cout << "[普通函数任务] " << name << ":Hello " << i << std::endl;usleep(500000); // 0.5秒/次}
}// 2. 函数对象任务(重载()运算符)
class PrintCount {
public:explicit PrintCount(int count) : count_(count) {}void operator()() const {for (int i = 0; i < count_; ++i) {std::cout << "[函数对象任务] 计数:" << i << std::endl;usleep(500000);}}private:int count_;
};int main() {// 测试1:普通函数(用bind绑定参数)std::cout << "=== 测试普通函数任务 ===" << std::endl;Thread t1(std::bind(PrintHello, "Thread-1"), "Thread-1");t1.Start();t1.Join();// 测试2:函数对象std::cout << "\n=== 测试函数对象任务 ===" << std::endl;PrintCount print_count(3);Thread t2(print_count, "Thread-2");t2.Start();t2.Join();// 测试3:lambda表达式(捕获外部变量)std::cout << "\n=== 测试lambda任务 ===" << std::endl;int num = 3;Thread t3([num]() {for (int i = 0; i < num; ++i) {std::cout << "[lambda任务] 捕获变量:" << i << "(num=" << num << ")" << std::endl;usleep(500000);}}, "Thread-3");t3.Start();t3.Join();// 测试4:线程属性获取std::cout << "\n=== 测试线程属性 ===" << std::endl;Thread t4([]() { usleep(100000); }, "Thread-4");t4.Start();std::cout << "线程名:" << t4.GetName() << std::endl;std::cout << "线程ID:" << t4.GetTid() << std::endl;std::cout << "启动时间:" << ctime(&t4.GetStartTime());std::cout << "是否运行:" << (t4.IsRunning() ? "是" : "否") << std::endl;t4.Join();std::cout << "是否运行:" << (t4.IsRunning() ? "是" : "否") << std::endl;return 0;
}
3.3.2 运行结果
=== 测试普通函数任务 ===
[线程 Thread-1] 启动(ID=140703347826432)
[普通函数任务] Thread-1:Hello 0
[普通函数任务] Thread-1:Hello 1
[普通函数任务] Thread-1:Hello 2
[线程 Thread-1] Join成功=== 测试函数对象任务 ===
[线程 Thread-2] 启动(ID=140703339433728)
[函数对象任务] 计数:0
[函数对象任务] 计数:1
[函数对象任务] 计数:2
[线程 Thread-2] Join成功=== 测试lambda任务 ===
[线程 Thread-3] 启动(ID=140703331041024)
[lambda任务] 捕获变量:0(num=3)
[lambda任务] 捕获变量:1(num=3)
[lambda任务] 捕获变量:2(num=3)
[线程 Thread-3] Join成功=== 测试线程属性 ===
[线程 Thread-4] 启动(ID=140703322648320)
线程名:Thread-4
线程ID:140703322648320
启动时间:Wed Oct 11 16:30:00 2024
是否运行:是
[线程 Thread-4] Join成功
是否运行:否
从结果能看到封装的优势:
- 灵活支持多种任务:普通函数(用
std::bind
绑定参数)、函数对象、lambda表达式都能直接作为任务; - 属性管理清晰:可以方便地获取线程名、ID、启动时间,便于调试和监控;
- 安全控制:
Join
和Detach
操作简单,析构时自动处理未Join
的线程,避免僵尸线程。
四、多线程开发的关键补充概念
除了上述核心内容,还有几个关键概念和问题需要掌握,这些是实际项目中避坑的关键。
4.1 STL容器的线程安全问题
很多初学者会误以为STL容器是线程安全的,实际恰恰相反——STL大部分容器不是线程安全的,原因是为了效率:如果每个容器操作都加锁,会严重影响并发性能。
4.1.1 为什么STL容器不是线程安全的?
以std::vector::push_back
为例,它包含多个步骤:
- 检查当前容量是否足够;
- 若容量不足,扩容(分配新内存、拷贝旧数据、释放旧内存);
- 将新元素写入数组末尾。
多线程同时调用push_back
时,可能出现:
- 扩容过程中数据拷贝被中断,导致数据损坏;
- 多个线程将元素写入同一个位置,导致数据覆盖。
类似地,std::queue::pop
、std::map::insert
等操作也存在类似问题——多个线程同时操作会导致容器内部结构损坏。
4.1.2 如何保证STL容器的线程安全?
-
手动加锁保护:在访问容器的临界区前后加互斥锁,确保同一时间只有一个线程操作容器。比如:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; std::vector<int> vec;void AddToVector(int data) {pthread_mutex_lock(&mutex);vec.push_back(data); // 临界区:操作容器pthread_mutex_unlock(&mutex); }
-
使用线程安全的容器库:如果不想手动加锁,可以使用第三方库的线程安全容器,比如:
- Boost库:
boost::thread_safe_queue
、boost::thread_safe_vector
; - C++20:
std::jthread
配合std::queue
的线程安全包装(需启用C++20及以上标准); - 自研容器:基于互斥锁/读写锁实现线程安全的容器(适合对性能有特殊要求的场景)。
- Boost库:
-
避免容器共享:尽量让每个线程使用独立的容器,减少共享——比如每个线程处理自己的任务,结果存入自己的容器,最后合并结果,避免多线程竞争。
4.2 自旋锁:轻量级的互斥机制
在之前的例子中,我们用的是互斥锁(pthread_mutex_t
),线程申请不到锁时会阻塞并切换到其他线程,存在上下文切换的开销。而自旋锁(pthread_spinlock_t
) 的核心思想是“忙等”——线程申请不到锁时,不阻塞,而是循环检查锁是否可用,直到申请到。
4.2.1 自旋锁的适用场景
自旋锁适合以下场景:
- 临界区执行时间极短:比如只修改一个全局变量(耗时10ns),此时忙等的开销(比如100ns)远小于上下文切换的开销(比如1000ns);
- 多CPU环境:单CPU环境下,自旋锁会导致CPU 100%占用,其他线程无法运行,完全没有意义;
- CPU密集型任务:IO密集型任务不适合用自旋锁——IO操作耗时较长(比如1ms),忙等1ms会浪费大量CPU资源。
4.2.2 自旋锁的实现与测试
Linux提供了自旋锁的接口,使用方式与互斥锁类似:
#include <pthread.h>
#include <iostream>
#include <unistd.h>pthread_spinlock_t spinlock; // 自旋锁
int global_count = 0; // 共享变量// 线程函数:用自旋锁保护共享变量的修改
void* IncrementCount(void* arg) {int thread_id = *(int*)arg;delete (int*)arg;// 循环100000次:每次修改共享变量for (int i = 0; i < 100000; ++i) {// 申请自旋锁(忙等,直到申请到)pthread_spin_lock(&spinlock);global_count++; // 临界区:修改共享变量(耗时极短)pthread_spin_unlock(&spinlock); // 释放自旋锁}std::cout << "[线程 " << thread_id << "] 执行完毕,count = " << global_count << std::endl;return nullptr;
}int main() {// 初始化自旋锁(线程间共享)int ret = pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);assert(ret == 0);pthread_t t1, t2;int* id1 = new int(1);int* id2 = new int(2);pthread_create(&t1, nullptr, IncrementCount, id1);pthread_create(&t2, nullptr, IncrementCount, id2);// 等待线程结束pthread_join(t1, nullptr);pthread_join(t2, nullptr);// 销毁自旋锁pthread_spin_destroy(&spinlock);std::cout << "[主线程] 最终 count = " << global_count << std::endl;return 0;
}
运行结果:
[线程 1] 执行完毕,count = 156789
[线程 2] 执行完毕,count = 200000
[主线程] 最终 count = 200000
结果正确,说明自旋锁保护了共享变量的修改。需要注意的是,若临界区耗时较长(比如加usleep(1)
),自旋锁的效率会远低于互斥锁——因为线程会一直忙等,浪费CPU。
4.2.3 自旋锁与互斥锁的核心区别
特性 | 互斥锁(pthread_mutex_t ) | 自旋锁(pthread_spinlock_t ) |
---|---|---|
申请不到锁时的行为 | 阻塞,切换到其他线程 | 忙等(循环检查锁),不切换线程 |
上下文切换开销 | 有(切换线程时保存/恢复上下文) | 无(线程一直运行) |
CPU占用 | 低(阻塞时不占用CPU) | 高(忙等时占用CPU) |
适用临界区 | 执行时间长(如IO操作) | 执行时间短(如变量修改) |
适用环境 | 单/多CPU | 仅多CPU |
4.3 单例模式的线程安全
单例模式是一种常用的设计模式,确保一个类只有一个实例(比如配置管理类、日志类)。但在多线程环境下, naive的单例模式会存在线程安全问题。
4.3.1 不安全的单例模式(懒汉式)
// 懒汉式:需要时才创建实例
class Singleton {
public:static Singleton* GetInstance() {if (instance_ == nullptr) { // 第一次检查(无锁)// 线程A和B同时进入,都会创建实例,违反单例原则instance_ = new Singleton();}return instance_;}private:Singleton() {} // 私有构造,禁止外部创建static Singleton* instance_; // 唯一实例指针
};// 初始化静态成员
Singleton* Singleton::instance_ = nullptr;
问题:多个线程同时进入if (instance_ == nullptr)
,会创建多个实例,违反“单例”的核心原则。
4.3.2 线程安全的单例模式(双重检查锁定)
为了解决竞争问题,需要加锁保护实例创建过程,但为了效率,我们用“双重检查锁定”(Double-Checked Locking):
#include <pthread.h>class Singleton {
public:static Singleton* GetInstance() {// 第一次检查(无锁):避免每次调用都加锁,提高效率if (instance_ == nullptr) {// 加锁:保护实例创建过程pthread_mutex_lock(&mutex_);// 第二次检查(有锁):确保只有一个线程创建实例if (instance_ == nullptr) {instance_ = new Singleton();}pthread_mutex_unlock(&mutex_); // 解锁}return instance_;}private:Singleton() {} // 私有构造static Singleton* instance_; // 唯一实例static pthread_mutex_t mutex_; // 保护实例创建的互斥锁
};// 初始化静态成员
Singleton* Singleton::instance_ = nullptr;
pthread_mutex_t Singleton::mutex_ = PTHREAD_MUTEX_INITIALIZER;
双重检查锁定的优势:
- 第一次检查无锁:大部分情况下
instance_
已创建,无需加锁,直接返回,效率高; - 第二次检查有锁:确保只有一个线程创建实例,避免竞争。
4.3.3 C++11后的更简洁实现
C++11标准规定:局部静态变量的初始化是线程安全的——即多个线程同时访问局部静态变量的初始化语句时,只有一个线程会执行初始化,其他线程会阻塞直到初始化完成。
利用这个特性,可以实现更简洁的线程安全单例:
class Singleton {
public:// 静态方法:返回唯一实例static Singleton& GetInstance() {// 局部静态变量:C++11后初始化是线程安全的static Singleton instance;return instance;}// 禁止拷贝和赋值(确保实例唯一)Singleton(const Singleton&) = delete;Singleton& operator=(const Singleton&) = delete;private:Singleton() {} // 私有构造,禁止外部创建
};
这种方式的优点:
- 无需手动加锁,代码简洁;
- 自动释放资源:局部静态变量在程序退出时会自动析构;
- 绝对线程安全:依赖C++11标准的保证,无需担心竞争。
注意:需启用C++11及以上标准(编译时加-std=c++11
),否则局部静态变量的初始化可能不是线程安全的。
五、总
Linux多线程开发涉及的知识点繁多,但核心思想可以浓缩为以下几点,掌握这些就能在大部分场景下游刃有余:
5.1 平衡“安全”与“效率”
- 互斥是基础:任何共享资源的访问都需要互斥保护(如互斥锁、自旋锁),避免竞争条件导致数据损坏;
- 同步是协同:用条件变量、信号量实现线程间的顺序协同(如生产者等待空间、消费者等待数据);
- 最小临界区:临界区越小,线程阻塞时间越短,并发效率越高——不要把无关操作(如IO、日志)放入临界区;
- 避免过度同步:不需要共享的资源尽量私有化(如线程局部存储),减少同步开销。
5.2 复用与池化是工程化关键
- 线程复用:用线程池避免频繁创建销毁线程的开销,线程数量一般设为“CPU核心数的1~2倍”(CPU密集型)或“CPU核心数+IO等待数”(IO密集型);
- 资源池化:除了线程池,内存池、连接池等池化技术能大幅减少与内核的交互次数,提升系统响应速度;
- 任务化思维:将复杂逻辑封装为任务,用生产者-消费者模型解耦生产和消费,提高代码的可维护性和扩展性。
5.3 封装与抽象简化开发
- 隐藏底层细节:用面向对象封装简化复杂接口(如线程类、线程池类),让代码更易读、易维护;
- 接口设计友好:设计接口时遵循“最小惊讶原则”——比如线程的
Start
、Join
、Detach
,线程池的Submit
、Start
、Stop
,符合直觉; - 兼容多种场景:用模板和
std::function
支持灵活的任务类型,避免为每种任务单独写代码。
5.4 实战避坑建议
- 永远检查共享资源:任何多线程访问的变量、容器,都要先思考是否需要加锁保护;
- 避免死锁:固定加锁顺序(如按锁的地址排序)、避免重复加锁、及时释放锁(用RAII封装锁,如
std::lock_guard
); - 慎用自旋锁:仅在临界区极短、多CPU环境下使用,避免CPU资源浪费;
- 日志与监控:多线程问题难以调试,建议在关键节点打印日志(如线程启动/退出、任务执行前后),便于定位问题;
- 优先使用标准库:C++11及以上的
std::thread
、std::mutex
、std::condition_variable
已足够强大,尽量避免直接使用pthread
接口,提高代码的可移植性。
多线程开发没有“银弹”,关键是理解每个技术的适用场景,结合实际项目选择合适的方案。比如:
- 简单同步用条件变量,复杂资源管理用信号量;
- 短任务用线程池复用线程,长任务(如IO)可单独创建线程;
- 数据量小用环形队列,数据量大用链表或动态数组。
希望这篇总结能帮你梳理清楚Linux多线程的核心脉络,从底层原理到工程化实践,逐步构建起自己的知识体系。多线程开发的精髓在于“实践”——只有动手写代码、踩坑、调试,才能真正掌握!