【Linux手册】多线程编程的关键支撑:线程池与线程安全

文章目录
- 一. 线程池
- 二. 线程安全
在计算机技术日新月异的发展进程中,多线程编程已成为充分利用多核处理器性能、提升程序响应速度的核心手段。然而,多线程环境下的资源调度、数据一致性维护等问题,始终是开发者面临的严峻挑战。
线程池作为一种有效的资源管理机制,通过预先创建一定数量的线程并对其进行复用,极大地减少了线程创建与销毁的开销,显著提升了程序的运行效率;
线程安全则是多线程编程的基石,它确保了多个线程在并发访问共享资源时,数据的完整性与正确性不被破坏;
深入探究这线程池实现的内在原理、实现方式及关联知识。本文将围绕线程池、线程安全展开详细探讨。
本文将分为2个部分:
- 线程池的实现;
- 线程安全;
一. 线程池
线程池的使用类似于生产消费者模型(可以进行跳转阅读->生产消费者模型),预先准备好多个线程存储起来,作为消费者,生产者将数据放入后,预先准备的进程就可以从中获取数据,对数据进行加工和处理。
此处需要使用到线程安全方面的知识,之前已经有一篇详细文章进行该方面的介绍了:【Linux手册】解决多线程共享资源访问冲突:互斥锁与条件变量的使用及底层机制
但是在定义线程池之前,为了方便使用,我们先对互斥锁和条件变量进行封装使用:
class Mutex_Base
{
public:Mutex_Base(const Mutex_Base&) = delete; // 不允许进行拷贝和赋值const Mutex_Base& operator=(const Mutex_Base& ) = delete;Mutex_Base(){pthread_mutex_init(&mutex_ , 0);}void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}~Mutex_Base() {pthread_mutex_destroy(&mutex_);}pthread_mutex_t& GetMutexOriginal() // 返回原生锁,为条件变量使用{return mutex_;}
private:pthread_mutex_t mutex_;
};
通过这种形式让我们在使用线程池的时候更方便。
还可以再进行一次封装,使用RAII的方式,当出作用域的时候自动释放锁,不需要主动进行释放,更安全:
// 可以再进行一层封装,使用RAII的方式使用锁
class Mutex
{
public:Mutex(Mutex_Base& mutex):mutex_(mutex){mutex_.Lock();}~Mutex(){mutex_.Unlock();}
private:Mutex_Base& mutex_;
}
上面对锁进行封装之后,还可以再对条件变量也进行封装以下:
class Cond
{
public:Cond(){pthread_cond_init(&cond_ , nullptr );}void Wait(pthread_mutex_t& mutex){pthread_cond_wait(&cond_ , &mutex);}void Notify(){pthread_cond_signal(&cond_);}void NotifyAll(){pthread_cond_broadcast(&cond_);}
private:pthread_cond_t cond_;
};
以上都封装完成后,就可以开始实现线程池了:
实现线程池:
- 需要预先将线程存储起来,准备后续从缓冲区中读取数据。因此需要一个容器来管理所有的线程,此处使用
unordered_map
来进行管理,因为我们要记录线程的ID和线程的名称,并且希望快速找到它; - 需要一个队列作为缓冲区,来实现进程间通信;
- 队列属于临界资源,不能并发式的访问,因此要使用互斥锁,对临界区进行加锁;
- 临界区访问需要是有序的,否则一个线程的竞争能力过强,导致进程饥饿问题,所以要使用条件变量来让多线程同步;
- 还需要一个变量保存线程池中线程的个数。
根据上面的需求我们就可以定义出一个线程池了。
#include "Mutex.hpp"
#include "Cond.hpp"Mutex_Base mutex_base_;
Cond cond_;
const int defaultnum = 10;template<class T>
class thread_poll // 线程池
{
public:thread_poll(const int num = defaultnum):num_(num){}~thread_poll(){}
private: std::unordered_map<pthread_t , std::string> thpoll_; // 将线程ID与线程名对应std::queue<T> thq_; // 任务队列int num_; // 线程池中线程的个数
};
提供一些基础函数:
- 任务队列中的任务数量;
- 根据线程ID获取线程名称,方便后续测试;
public:size_t size() // 队列中资源个数{return thq_.size();}std::string GetThread_name(pthread_t tid) // 根据ID,获取线程名称{return thpoll_[tid];}
我们不直接在初始化时进行线程池中线程的构建,而是向外提供一个run()
接口,让外部能够选择性的打开线程池:
void run() // 运行线程池{for (size_t i = 0; i < num_; i++) // 创建num_个线程{pthread_t tid; std::string thstr = "Process-" + std::to_string(i + 1);pthread_create(&tid , nullptr , pthread_func , this);thpoll_[tid] = thstr; // 放入到map中进行管理}}
注意此处的线程调用函数:
static void* pthread_func(void* args){thread_poll* tp = (thread_poll*)args;while(1){mutex_base_.Lock();while(tp->size() == 0)cond_.Wait(mutex_base_.GetMutexOriginal());T mes = tp->pop();mutex_base_.Unlock();std::cout << tp->GetThread_name(pthread_self()) << " : get a message : " << mes << std::endl; }return nullptr;}T pop(){T ret = thq_.front();thq_.pop();return ret;}
我们需要间其设置为静态成员函数,因为如果是类的普通成员函数,就会导致在参数的位置包含一个隐含的this指针,而我们调用的函数参数只能是void*
,因此要用静态成员函数。
但是在线程的方法中我们有必须使用类中的成员变量和函数,所以在参数位置要传递this
指针,保证让线程能够访问到类内的成员,而且也不能访问私有成员,所以要提供一些接口。
最后一个函数,先队列中插入元素:该函数实现就比较直接了。
void push(const T& data){Mutex m(mutex_base_);thq_.push(data);cond_.NotifyAll();}
以上就是线程池的设计,可以直接通过该类实现先缓冲区中加入数据,让各个线程执行。
二. 线程安全
- STL中的容器不是线程安全的,需要使用者执行保证线程安全;
unique_ptr
是不保证线程安全的,shared_ptr
保证线程安全;
单例模式:只能创建一个对象的类
- 饿汉模式:用的时候可以直接使用,程序运行的时候就创建;
- 懒汉模式:“延迟加载”,用的时候才创建。
还要将赋值和拷贝删除,以及将构造私有化。
下面使用懒汉模式,将上面的线程池变成单例模式:
class thread_poll
{static std::unique_ptr<thread_poll<T>>& GetInstance(){if(tp_ == nullptr){tp_.reset(new thread_poll<T>());}return tp_;} private:std::unordered_map<pthread_t , std::string> thpoll_;std::queue<T> thq_;int num_;static std::unique_ptr<thread_poll<T>> tp_;
};template<class T>
std::unique_ptr<thread_poll<T>> thread_poll<T>::tp_;
该单例模式设计的有没有问题,会不会出现多线程访问冲突???
会的,可能有多个线程都在访问该单例类,就会导致可能存在多个线程都进入if判断内部,导致创建多个类对象。
因此我们要进行加锁,确保只有一个执行流申请空间,创建对象
public: static std::unique_ptr<thread_poll<T>> &GetInstance(){mutex_base_.Lock();if (tp_ == nullptr){tp_.reset(new thread_poll<T>());}mutex_base_.Unlock();return tp_;}
private:
static thread_poll<T>* tp_;
上面的加锁方式好不好???
在创建完一个对象后,后面的线程都不会进入if条件内部了,所以后面锁是多余的,并且会影响效率,因为加锁就导致在该临界区中线程是串联运行的。
通过两个if条件来进行优化:
public: static std::unique_ptr<thread_poll<T>> &GetInstance(){if (tp_ == nullptr){ mutex_base_.Lock();if (tp_ == nullptr){tp_.reset(new thread_poll<T>());}mutex_base_.Unlock();}return tp_;}
private:
static thread_poll<T>* tp_;
通过两个if判断就可以保证,在创建完对象之后,所有线程并发的进行id判断;在没有创建完对象之前只有一个线程进入到内层if中。
即提升了效率,有保证了线程安全。