Linux多线程——线程池
目录
1. 线程池的概念
1.1 池化技术
1.2 线程池的优点
1.3 线程池的应用场景
2. 线程池的实现
2.1 线程池_V1(朴素版)
2.2 线程池_V2(封装版)
3. 单例模式
3.1 什么是单例模式
3.2 单例模式的特点
3.3 单例模式的简单实现
3.3.1 饿汉模式
3.3.2 懒汉模式
3.3.3 懒汉模式(线程安全版)
3.3.4 懒汉模式(双重检查锁定)
3.3.5 C++11 简化懒汉模式
3.4 线程池_V4(最终版)
4. 周边问题补充
4.1 STL 线程安全问题
4.2 智能指针线程安全问题
4.3 其他常见锁概念
4.4 读者写者问题
🏙️ 正文
1. 线程池的概念
1.1 池化技术
线程池的核心概念是池化技术。所谓的线程池,就是提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务并执行。这样可以提高整体效率,同时避免在每次任务到来时频繁创建和销毁线程,从而降低了系统开销。线程池中的一批线程会被合理地维护,避免了额外的调度开销。
池化技术:
池化技术的本质是提前为未来可能频繁使用的资源(比如线程、数据库连接等)进行预分配,从而在需要时直接使用,而不需要每次都创建和销毁资源。池化技术可以极大地提高性能,最典型的就是线程池,通常用于各种涉及网络连接相关的服务中,例如:MySQL连接池、HTTP连接池、Redis连接池等。
除了线程池之外,还有内存池等技术。例如,STL中的容器在进行空间申请时,并不直接使用系统调用,而是通过空间配置器(allocator)获取内存,这也是一种池化技术。
池化技术的核心思想:空间换时间。
池化技术比喻:
池化技术就像你提前将一部分钱从银行取出并放到支付宝中,这样你随时可以使用,而无需去银行排队取钱。同样,线程池通过提前创建线程,能够随时响应任务请求,提高效率。
1.2 线程池的优点
线程池的主要优点是:
-
高效:线程池中的线程在任务到来前已创建好,任务到来时可以直接交给线程执行,无需再创建新线程。
-
负载均衡:线程池会合理调度线程,确保任务和线程之间能够实现负载均衡。
-
控制线程数量:线程池中的线程数量不是越多越好,过多线程会导致调度复杂,具体线程数要根据实际业务场景调整,如处理器核心数、剩余内存、网络中的连接数量等。
另外,线程池还可以与生产者消费者模型结合使用,从而实现解耦和提高效率。生产者将任务放入任务队列,消费者从队列中取出并处理任务,线程池则负责调度线程执行任务。
1.3 线程池的应用场景
线程池的典型应用场景包括:
-
大量且短小的任务请求:如Web服务器中的网页请求,线程池非常适合处理这种场景。网页点击量大,但每次请求的处理时间短,使用线程池可以高效管理大量短小任务的处理。
-
对性能要求苛刻的应用:如游戏服务器,需要快速响应玩家操作。线程池能够确保请求得到及时处理,提高系统响应速度。
-
突发大量请求的场景:当系统突然面临大量请求时,使用线程池可以避免创建过多线程导致内存溢出。例如,当服务器在短时间内需要创建大量线程时,线程池可以帮助控制线程数,避免系统过载。
2. 线程池的实现
2.1 线程池_V1(朴素版)
朴素版:实现最基本的线程池功能,直接使用系统提供的接口。
所谓朴素版线程池,指的是不做任何优化设计,纯粹实现线程池的基础功能。这种实现方式便于理解线程池的基本概念。
ThreadPool_v1.hpp头文件:
线程池的核心由两部分组成:
-
一批线程,通过容器管理。
-
任务队列,用于存储待执行的任务。
此外,还需要使用互斥锁和条件变量:
-
互斥锁用于保护任务队列,确保线程安全。
-
条件变量用于当任务队列为空时,让线程进入等待状态,避免空转。
#pragma once#include <vector> #include <string> #include <queue> #include <memory> #include <unistd.h> #include <pthread.h>namespace Yohifo { #define THREAD_NUM 10template<class T>class ThreadPool{public:ThreadPool(int num = THREAD_NUM):_threads(num), _num(num){// 初始化互斥锁和条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){// 销毁互斥锁和条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_cond);}void init(){// 初始化其他信息(暂时不需要)}void start(){// 启动线程池for(int i = 0; i < _num; i++)pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); // (存疑)}// 线程回调函数static void* threadRoutine(void* args){// 业务处理// ...}private:std::vector<pthread_t> _threads;int _num; // 线程数量std::queue<T> _tasks; // 任务队列pthread_mutex_t _mtx;pthread_cond_t _cond;}; }
注意:
-
vector
需要预先扩容,避免后续越界访问。 -
threadRoutine()
需要设置为静态函数,因为静态函数无法访问类的实例成员。 -
start()
函数:创建并启动线程池中的线程。void start() {// 创建并启动线程for(int i = 0; i < _num; i++)pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); }
线程回调函数:
static void* threadRoutine(void* args) {// 避免等待线程,直接剥离pthread_detach(pthread_self());while (true){std::cout << "Thread Running... " << pthread_self() << std::endl;sleep(1);} }
主程序测试代码:
#include "ThreadPool_V1.hpp" #include <memory>int main() {std::unique_ptr<Yohifo::ThreadPool<int>> ptr(new Yohifo::ThreadPool<int>());ptr->init();ptr->start();return 0; }
任务装载:
void pushTask(const T& task) {pthread_mutex_lock(&_mtx);_tasks.push(task);pthread_cond_signal(&_cond);pthread_mutex_unlock(&_mtx); }
线程回调函数更新:获取并消费任务,若任务队列为空则等待。
static void* threadRoutine(void* args) {auto ptr = static_cast<ThreadPool<T>*>(args);while (true){ptr->lockQueue();while(ptr->isEmpty())ptr->threadWait();T task = ptr->popTask();ptr->unlockQueue();task();ptr->callBack(task); // 回调} }
2.2 线程池_V2(封装版)
为了增加对线程操作的控制,可以引入一个自封装的线程库
Thread.hpp
,提供更多对线程的操作。Thread.hpp
封装的线程库:#pragma once#include <iostream> #include <string> #include <pthread.h>enum class Status {NEW = 0, // 新建RUNNING, // 运行中EXIT // 已退出 };typedef void (*func_t)(void*);class Thread { public:Thread(int num = 0, func_t func = nullptr, void* args = nullptr):_tid(0), _status(Status::NEW), _func(func), _args(args){char name[128];snprintf(name, sizeof name, "thread-%d", num);_name = name;}~Thread() {}pthread_t getTID() const { return _tid; }std::string getName() const { return _name; }Status getStatus() const { return _status; }static void* runHelper(void* args){Thread* myThis = static_cast<Thread*>(args);myThis->_func(myThis->_args);}void run(){int ret = pthread_create(&_tid, nullptr, runHelper, this);if(ret != 0){std::cerr << "create thread fail!" << std::endl;exit(1);}_status = Status::RUNNING;}void join(){int ret = pthread_join(_tid, nullptr);if(ret != 0){std::cerr << "thread join fail!" << std::endl;exit(1);}_status = Status::EXIT;}private:pthread_t _tid;std::string _name;Status _status;func_t _func;void* _args; };
更新后的
ThreadPool_V2.hpp
:使用自封装的Thread
类来创建和管理线程。#pragma once#include <vector> #include <string> #include <queue> #include <memory> #include <functional> #include <unistd.h> #include <pthread.h> #include "Task.hpp" #include "Thread.hpp"namespace Yohifo { #define THREAD_NUM 10template<class T>class ThreadPool{using func_t = std::function<void(T&)>;public:ThreadPool(func_t func, int num = THREAD_NUM):_num(num), _func(func){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){for(auto& t : _threads)t.join();pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_cond);}void init(){for(int i = 0; i < _num; i++)_threads.push_back(Thread(i, threadRoutine, this));}void start(){for(auto& t : _threads)t.run();}static void threadRoutine(void* args){auto ptr = static_cast<ThreadPool<T>*>(args);while (true){ptr->lockQueue();while(ptr->isEmpty())ptr->threadWait();T task = ptr->popTask();ptr->unlockQueue();task();ptr->callBack(task);}}void pushTask(const T& task){lockQueue();_tasks.push(task);threadWakeUp();unlockQueue();}protected:void lockQueue() { pthread_mutex_lock(&_mtx); }void unlockQueue() { pthread_mutex_unlock(&_mtx); }void threadWait() { pthread_cond_wait(&_cond, &_mtx); }void threadWakeUp() { pthread_cond_signal(&_cond); }bool isEmpty() { return _tasks.empty(); }T popTask() { T task = _tasks.front(); _tasks.pop(); return task; }void callBack(T& task) { _func(task); }private:std::vector<Thread> _threads;std::queue<T> _tasks;pthread_mutex_t _mtx;pthread_cond_t _cond;func_t _func;}; }
总结:通过封装线程管理类
Thread.hpp
,我们对线程进行了更多的控制,使得线程池更为高效和可扩展。
3. 单例模式
3.1 什么是单例模式
单例模式(Singleton Pattern)是一种创建型设计模式,它确保一个类只有一个实例,并提供全局访问该实例的方式。在某些场景中,应用程序只需要一个类的实例,而不需要多个。例如,线程池就是一个典型的使用单例模式的场景,因为多线程池会增加调度成本,因此只需要一个线程池来进行任务的并发处理。
3.2 单例模式的特点
-
唯一性:单例模式保证在整个程序生命周期内,该类只有一个实例。
-
延迟加载:单例对象只有在第一次使用时才会被创建,而不是在程序启动时立即创建(这点与懒汉模式和饿汉模式的区别有关)。
3.3 单例模式的简单实现
单例模式的实现一般有两种方式:饿汉模式 和 懒汉模式,两者都通过私有化构造函数和删除拷贝构造函数来确保外部无法创建对象。
3.3.1 饿汉模式
在程序加载时就创建单例对象,不管后续是否使用它。这种方式简单,但可能造成资源浪费,尤其是单例类比较重时。
#pragma once#include <iostream>namespace Yohifo
{// 饿汉模式class Signal{private:Signal() {}Signal(const Signal&) = delete;public:static Signal* getInstance(){return _sigptr;}void print(){std::cout << "Hello Signal!" << std::endl;}private:static Signal* _sigptr;};Signal* Signal::_sigptr = new Signal();
}
使用示例:
#include <iostream>
#include "Signal.hpp"int main()
{Yohifo::Signal::getInstance()->print();return 0;
}
说明:
-
在程序加载时,
Signal::_sigptr
会被创建。 -
无论多少次调用
getInstance()
,始终返回同一个单例对象。
问题:饿汉模式的缺点是,如果单例对象很大,且后续并未使用它,会浪费启动时间和资源。
3.3.2 懒汉模式
懒汉模式只有在第一次调用时才创建单例对象。此时对象的创建被延迟,只有在实际需要时才会消耗资源。
#pragma once#include <iostream>namespace Yohifo
{class Signal{private:Signal() {}Signal(const Signal&) = delete;public:static Signal* getInstance(){if (_sigptr == nullptr){_sigptr = new Signal();}return _sigptr;}void print(){std::cout << "Hello Signal!" << std::endl;}private:static Signal* _sigptr;};Signal* Signal::_sigptr = nullptr;
}
使用示例:
#include <iostream>
#include "Signal.hpp"int main()
{Yohifo::Signal::getInstance()->print();return 0;
}
问题:在多线程环境中,如果多个线程同时调用 getInstance()
,可能会导致创建多个实例。因此,需要添加线程安全措施。
3.3.3 懒汉模式(线程安全版)
为了确保在多线程环境中只有一个实例,可以使用互斥锁来确保线程安全。
#pragma once#include <iostream>
#include <mutex>namespace Yohifo
{class Signal{private:Signal() {}Signal(const Signal&) = delete;public:static Signal* getInstance(){pthread_mutex_lock(&_mtx);if (_sigptr == nullptr){std::cout << "创建了一个单例对象" << std::endl;_sigptr = new Signal();}pthread_mutex_unlock(&_mtx);return _sigptr;}void print(){std::cout << "Hello Signal!" << std::endl;}private:static Signal* _sigptr;static pthread_mutex_t _mtx;};Signal* Signal::_sigptr = nullptr;pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;
}
问题:虽然线程安全,但每次访问 getInstance()
都需要加锁,这会带来性能损失。为了解决这个问题,可以使用“双重检查锁定”优化。
3.3.4 懒汉模式(双重检查锁定)
为了提高性能,在第一次检查时不加锁,只在需要创建对象时加锁。
#pragma once#include <iostream>
#include <mutex>namespace Yohifo
{class Signal{private:Signal() {}Signal(const Signal&) = delete;public:static Signal* getInstance(){if (_sigptr == nullptr){pthread_mutex_lock(&_mtx);if (_sigptr == nullptr){std::cout << "创建了一个单例对象" << std::endl;_sigptr = new Signal();}pthread_mutex_unlock(&_mtx);}return _sigptr;}void print(){std::cout << "Hello Signal!" << std::endl;}private:static Signal* _sigptr;static pthread_mutex_t _mtx;};Signal* Signal::_sigptr = nullptr;pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;
}
这样可以让一部分线程不在去竞争所资源了。
3.3.5 C++11 简化懒汉模式
在 C++11 及以后的版本中,可以通过静态局部变量来简化懒汉模式的实现,确保线程安全且不需要显式加锁。
#pragma once#include <iostream>namespace Yohifo
{class Signal{private:Signal() {}Signal(const Signal&) = delete;public:static Signal* getInstance(){static Signal _sig; // C++11 保证线程安全return &_sig;}void print(){std::cout << "Hello Signal!" << std::endl;}};
}
3.4 线程池_V4(最终版)
通过将线程池改为单例模式,我们可以确保程序中只有一个线程池实例,从而避免多个线程池实例带来的资源浪费和管理复杂度。
#pragma once#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型namespace Yohifo
{
#define THREAD_NUM 10template<class T>class ThreadPool{using func_t = std::function<void(T&)>; // 包装器private:ThreadPool(func_t func, int num = THREAD_NUM):_num(num), _func(func){}~ThreadPool(){for (auto& t : _threads)t.join();}// 删除拷贝构造ThreadPool(const ThreadPool<T>&) = delete;public:static ThreadPool<T>* getInstance(const func_t& func = [](T& task){ std::cout << task.getResult() << std::endl; }){if (_inst == nullptr){LockGuard lock(&_mtx);if (_inst == nullptr){_inst = new ThreadPool<T>(func);_inst->init();_inst->start();}}_inst->_func = func;return _inst;}public:void init(){for (int i = 0; i < _num; i++)_threads.push_back(Thread(i, threadRoutine, this));}void start(){for (auto& t : _threads)t.run();}static void threadRoutine(void* args){auto ptr = static_cast<ThreadPool<T>*>(args);while (true){T task = ptr->popTask();task();ptr->callBack(task); // 回调}}void pushTask(const T& task){_blockqueue.Push(task);}protected:func_t callBack(T& task){_func(task);}T popTask(){T task;_blockqueue.Pop(&task);return task;}private:std::vector<Thread> _threads;int _num;BlockQueue<T> _blockqueue;func_t _func;static ThreadPool<T>* _inst;static pthread_mutex_t _mtx;};template<class T>ThreadPool<T>* ThreadPool<T>::_inst = nullptr;template<class T>pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}
使用示例:
#include <iostream>
#include "ThreadPool_V4.hpp"typedef Yohifo::Task<int> type;void callBack(type& task)
{std::cout << task.getResult() << std::endl;
}int main()
{while (true){int x = 0, y = 0;char op = '+';std::cout << "输入 x: ";std::cin >> x;std::cout << "输入 y: ";std::cin >> y;std::cout << "输入 op: ";std::cin >> op;type task(x, y, op);Yohifo::ThreadPool<type>::getInstance(callBack)->pushTask(task);}
}
通过使用单例模式,确保整个程序中只有一个线程池对象,从而提高效率并避免资源浪费。懒汉模式、饿汉模式以及线程安全的懒汉模式都可以用于实现单例模式。
4. 周边问题补充
4.1 STL 线程安全问题
STL(标准模板库)中的容器并非天生线程安全。STL 的设计目标是高性能和灵活性,因而没有加入锁机制。如果你在多线程环境中使用 STL 容器(如 vector
, queue
, string
等),你需要自行确保线程安全,特别是在并发访问时,需要通过互斥锁或其他同步机制来保证线程安全。
一些常见的做法是:
-
使用
std::mutex
或pthread_mutex_t
对容器进行加锁、解锁操作。 -
根据容器的特性(如哈希表的锁表、锁桶机制等)选择合适的同步策略。
4.2 智能指针线程安全问题
C++ 标准库中的智能指针有三种:unique_ptr
, shared_ptr
, 和 weak_ptr
。它们在多线程环境中的线程安全性表现不同:
-
unique_ptr
:它是独占式的,不支持拷贝操作,因此没有线程安全问题。 -
shared_ptr
:支持多个线程共享,且对引用计数进行了原子操作(如使用 CAS 算法),因此是线程安全的。 -
weak_ptr
:weak_ptr
是shared_ptr
的弱引用,它本身不会管理对象的生命周期,只能与shared_ptr
配合使用,因此也继承了shared_ptr
的线程安全特性。
4.3 其他常见锁概念
悲观锁 和 乐观锁 是常见的两种锁策略:
-
悲观锁:假设数据可能会被其他线程修改,因此在访问数据前必须加锁,确保其他线程无法同时访问。这种方式容易导致性能瓶颈,因为所有线程都需要等待锁。
-
乐观锁:假设数据不会被其他线程修改,因此在访问数据前不加锁,只有在更新数据前检查是否发生了变化。如果数据没有被修改,则直接更新,否则重试。这通常通过版本号机制或 CAS(Compare-And-Swap)操作实现。
自旋锁 是一种简单的锁机制,当线程请求锁时,如果锁不可用,它会反复尝试(“自旋”)获取锁,而不是被挂起。自旋锁适用于临界区小且短暂的情况,不适用于较长时间的锁持有,因为它会消耗大量 CPU 时间。
自旋锁相关接口:
#include <pthread.h>pthread_spinlock_t lock; // 自旋锁类型int pthread_spin_init(pthread_spinlock_t *lock, int pshared); // 初始化自旋锁int pthread_spin_destroy(pthread_spinlock_t *lock); // 销毁自旋锁int pthread_spin_lock(pthread_spinlock_t *lock); // 自旋锁加锁
int pthread_spin_trylock(pthread_spinlock_t *lock); // 自旋锁非阻塞加锁int pthread_spin_unlock(pthread_spinlock_t *lock); // 自旋锁解锁
公平锁与非公平锁:
-
公平锁:锁的获取按顺序进行,防止某些线程长期无法获得锁,确保公平性。
-
非公平锁:更倾向于性能优化,不保证按顺序获取锁,允许一些线程优先获取锁,可能导致一些线程饥饿。
4.4 读者写者问题
读者写者问题 是并发编程中的经典问题,指的是如何管理多个线程同时访问共享资源时的读和写操作。核心思想是:多个读者可以同时访问共享数据,但当写者访问数据时,其他读者和写者必须等待。
读者写者问题的规则:
-
读者与读者之间无关:多个读者可以同时访问资源。
-
写者与写者之间互斥:只有一个写者可以访问资源。
-
读者与写者之间互斥:如果有写者在修改资源,读者必须等待。
解决方案:可以使用 读写锁(pthread_rwlock_t
)来解决。读写锁允许多个读者并行访问,但写者在写数据时需要独占锁。
读写锁接口:
#include <pthread.h>pthread_rwlock_t rwlock; // 读写锁类型// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *__restrict__ __rwlock, const pthread_rwlockattr_t *__restrict__ __attr); // 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *__rwlock); // 读者加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *__rwlock); // 阻塞式
int pthread_rwlock_tryrdlock(pthread_rwlock_t *__rwlock); // 非阻塞式// 写者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *__rwlock); // 阻塞式
int pthread_rwlock_trywrlock(pthread_rwlock_t *__rwlock); // 非阻塞式// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t *__rwlock);
示例代码(伪代码):
-
读者:
-
加锁时,读者和读者之间不冲突,但读者与写者之间是互斥的。
-
当第一个读者进入时,它申请写锁,确保没有写者修改数据。
int reader_cnt = 0; // 统计读者数量 pthread_mutex_t lock; // 互斥锁 sem_t w(1); // 信号量// 读者进入 {pthread_mutex_lock(&lock);if (reader_cnt == 0) {P(w); // 第一个读者进入,申请信号量}reader_cnt++;pthread_mutex_unlock(&lock); }// 读取数据 {// 数据读取操作 }// 读者离开 {pthread_mutex_lock(&lock);reader_cnt--;if (reader_cnt == 0) {V(w); // 最后一个读者离开,归还信号量}pthread_mutex_unlock(&lock); }
-
写者:
-
写者需要先申请信号量,确保没有读者访问。
-
一旦写者写入完成,会通知其他读者可以继续读取。
// 写者进入
{P(w); // 申请信号量if (reader_cnt > 0) {V(w); // 归还信号量,等待// 等待其他读者离开}
}// 写入数据
{// 数据写入操作
}// 写者离开
{V(w); // 归还信号量
}
问题:由于读者的数量通常大于写者,写者有时可能无法获得信号量,这可能导致死锁。为了解决这个问题,可以使用 写者优先策略,即优先让写者获取资源,防止死锁。
总结:
-
STL 容器需要开发者手动保证线程安全,特别是在多线程并发访问时。
-
智能指针如
shared_ptr
由于使用原子操作,保证了线程安全。 -
锁机制(如悲观锁、乐观锁、自旋锁等)各有优缺点,需要根据实际情况选择。
-
读写锁适用于读者写者问题,可以实现多读单写的并发控制。