当前位置: 首页 > news >正文

【Linux】线程池与封装线程

目录

一、线程池:

1、池化技术:

2、线程池优点:

3、线程池应用场景:

4、线程池实现:

二、封装线程:

三、单例模式:

四、其他锁:

五、读者写者问题


一、线程池:

1、池化技术:

池化技术是以空间换时间的一种技术,上述的线程池实际上就是先提前创建一批线程用容器存储着,然后当有任务来临,线程直接从任务列表中获取并且执行任务,这样通过空间来换取时间,可以极大地提高效率

2、线程池优点:

 高效,方便

  • 线程池是已经创建好线程了,当有任务来临时,只需交给线程即可
  • 在调度线程时保证合理性,防止线程的过度调度,保证内核的充分利用
  • 避免了在处理短时间任务时创建与销毁线程的代价

但是线程池的数量也不是越多越好,更多的是要与实际开发相结合

3、线程池应用场景:

  • 需要大量的线程来完成任务,并且每一个任务耗时短
  • 对性能要求苛刻,要快速响应,比如打游戏时候放技能
  • 当有可能突然出现大量请求的时候,但是不至于使服务器产生大量线程,也就是短时间产生大量线程使服务器内存达到最大,此时可以用线程池提升该效率问题

4、线程池实现:

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>

struct ThreadInfo
{
    pthread_t _tid;
    std::string ThreadName;
};

static int defaultnum = 5;

template <class T>
class ThreadPool
{
public:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    bool IsEmpty()
    {
        return _task.empty();
    }

    std::string GetThreadName(pthread_t tid)
    {
        for(auto &it : _threads)
        {
            if(it._tid == tid)
            {
                return it.ThreadName;
            }
        }
        return "NONE";
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }
public:
    ThreadPool(int num = defaultnum)
        :_threads(num)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }

    static void *myhander(void *args)
    {   
        ThreadPool<T> * tp = static_cast<ThreadPool<T> *>(args);
        std::string name = tp->GetThreadName(pthread_self());

        while(true)
        {
            tp->Lock();
            while(tp->IsEmpty())
            {
                tp->ThreadSleep();
            }
            T ret = tp->Pop();
            tp->UnLock();

            ret();
            std::cout << name << "正在运行,结果: " << ret.Getresult() << std::endl;
        }
    }

    T Pop()
    {
        T t = _task.front();
        _task.pop();
        return t;
    }

    void Push(const T &t)
    {
        Lock();
        _task.push(t);
        Wakeup();
        UnLock();
    }

    void Start()
    {
        int num = _threads.size();
        for(int i = 0;i < num;i++)
        {
            _threads[i].ThreadName = "thread-" + std::to_string(i + 1);
            pthread_create(&(_threads[i]._tid),nullptr,myhander,this);
        }
    }


    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<ThreadInfo> _threads;
    std::queue<T> _task;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

对于上述线程池代码,有三个要注意的点:

1、myhander函数要设计成静态的,因为如果不设计成静态的,myhander就是成员方法,而成员方法有this指针,这样导致其形参不是void*,所以不能在创建线程的时候直接传,要设置成静态的

2、但是设置成静态的函数的话,发现编译不过,因为静态的函数可以访问其静态的函数和变量,无法访问成员函数和成员变量,所以在创建线程的时候要传递当前对象,也就是this指针,这样就能够通过对象访问其成员函数了

3、这里要引入互斥锁,这是因为线程池中的任务队列会被多个执行流访问,要加锁对其进行保护,我们知道线程执行任务的时候要去任务队列中拿任务,此时就需要判断任务队列是否为空,所以加锁要在判断之前

判断就是如果队列为空,就让当前线程去条件变量中休眠,还有为了防止伪唤醒,所以这里的if要改为while

4、当线程拿到任务的时候,这个任务就是当前线程私有的了,所以处理任务就不要放在临界区了,因为要保证在处理任务的线程是并行的,如果一个线程要等待另一个线程处理完任务就不是并行的了,效率会大大降低

任务设计

我们的任务在之前章节就已经完成了,直接拿过来用即可

#pragma once
#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    Divzero = 1,
    Modzero,
    Unknow
};

class Task
{
public:
    Task()
    {
    }
    Task(int data1, int data2, char oper)
        : _data1(data1), _data2(data2), _oper(oper), _exitcode(0),_result(0)
    {
    }
    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
        {
            if (_data2 == 0)
                _exitcode = Divzero;
            else
                _result = _data1 / _data2;
        }
        break;
        case '%':
        {
            if (_data2 == 0) _exitcode = Modzero;
            else _result = _data1 % _data2;
        }
        break;
        default:
            _exitcode = Unknow;
            break;
        }
    }

    void operator ()()
    {
        run();
    }

    std::string Getresult()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=";
        ret += std::to_string(_result);
        ret += "[exitcode=";
        ret += std::to_string(_exitcode);
        ret += "]";
        return ret;
    }

    std::string GetTask()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=?";
        return ret;
    }

    ~Task()
    {
    }

private:
    int _data1;
    int _data2;
    char _oper;

    int _exitcode;
    int _result;
};

主函数逻辑

主线程就不断地向线程池中push任务即可,然后线程池就会从任务队列中拿任务并处理,怎么处理不用关心,所以这里对我们来说相当于一个黑盒

#include "ThreadPool.hpp"
#include "Task.hpp"

int main()
{
    srand(time(nullptr)^getpid());
    ThreadPool<Task> *tp = new ThreadPool<Task>();
   
    tp->Start();
    while(true)
    {
        int data1 = rand()%10+1;
        usleep(10);
        int data2 = rand()%5;
        char op = opers[rand()%opers.size()];
        Task t(data1,data2,op);
        usleep(10);
        tp->Push(t);
        std::cout<<"主线程启动:" << t.GetTask() <<std::endl;
        sleep(1);
    }
    return 0;
}

实验结果:

如下,并且我们发现这里的线程是有顺序性的,这是因为主线程每秒只会push一个任务,然后线程池中每次都只有一个线程获取到任务,其他线程就会在队列中等待,当这个线程处理完任务后,就会到队列的最后等待,此时若主线程有push任务,就会唤醒队列第一个线程,完成任务后又会到队列尾部,这样我们就能够看到顺序性

二、封装线程:

接下来我们尝试封装线程,就像C++中的线程库那样使用而不是使用原生接口

对于一个线程,我们可以给他设置很多属性,其中_data是传进来的参数,_cb是一个回调函数

private:
    pthread_t _tid;
    std::string _name;
    bool _isrunning;
    uint64_t _start_time;

    T _data;
    callback_t _cb;

将这些在构造函数进行初始化:

    Thread(callback_t cb,T data)
        :_tid(0),_name(""),_isrunning(false),_start_time(0),_cb(cb),_data(data)
    {}

然后设计对外开放的接口,能够像C++库中的线程使用起来

    void Run()
    {
        _name = "thread-" + std::to_string(num++);
        _isrunning = true;
        _start_time = time(nullptr);

        pthread_create(&_tid,nullptr,Running,this);
    }

    void Entry()
    {
        _cb(_data);
    }

    void Join()
    {
        _isrunning = false;
        pthread_join(_tid,nullptr);
    }

    std::string GetName()
    {
        return _name;
    }

    uint64_t Start_Time()
    {
        return _start_time;
    }

    bool IsRunning()
    {
        return _isrunning;
    }

其中,Running方法要设计成静态的,这是因为和线程池那里一样,如果不是静态方法就是成员函数,这样的话就会多一个形参,所以要设置成静态的,但是静态方法不能直接访问成员变量,所以在创建线程的时候要将this指针也就是当前对象传过去,这样就能够通过对象访问成员函数或成员变量了

    static void* Running(void *args)
    {
        Thread* thread = static_cast<Thread*>(args);
        thread->Entry();
        return nullptr;
    }

这里还要设置回调函数

    typedef void (*callback_t)(T);

其中callback_t是 函数指针类型,他的参数是T,在定义一个Thread类的时候,需要传递一个函数给构造函数,这个cb就是一个函数指针,然后赋值给_cb,后面_cb()调用的就是传递进来的函数

在Entry中的cb(_data)调用的是这个uPrint

主函数逻辑

初始化好要传的参数,准备好回调函数所用的函数,就可以像C++中的线程库进行使用了

class udata
{
public:
    std::string _s;
    int _data;
};

void uPrint(udata x)
{
    while(true)
    {
        std::cout << "我是一个封装后的线程,我得到了参数 : " << x._s << " 和 " << x._data <<std::endl;
        sleep(1);
    }
}


int main()
{
    udata da;
    da._s = "666";
    da._data = 111;
    
    Thread<udata> t(uPrint,da);
    t.Run();
    
    std::cout << "该线程的名字 : " << t.GetName() <<std::endl;
    std::cout << "该线程是否运行 : " << t.IsRunning() <<std::endl;
    std::cout << "该线程创建时间 : " << t.Start_Time() <<std::endl;

    t.Join();
    return 0;
}

三、单例模式:

具体可以看看下面这篇文章

【C++11】特殊类的设计 && 单例模式 && 类型转换-CSDN博客https://blog.csdn.net/2303_80828380/article/details/147028880?spm=1001.2014.3001.5501

在这里我们是将单例模式在代码中实现出来,也就是将线程池设计成单例模式

1、将构造函数私有化,保证不能够在外部随便创建变量

private:
    ThreadPool(int num = defaultnum)
        : _threads(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

2、将拷贝构造和赋值重载删掉,保证不能够通过拷贝来创建新对象

    ThreadPool(const ThreadPool<T>& copy) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;

3、设计静态的指针指向堆空间,还要有锁,_tp要设计成静态的原因实现单例模式,保证全局唯一实例,_lock为静态的提供全局同步锁,确保多线程下安全创建单例

静态成员变量在类里面声明,在类外面定义,是语法规定的

类内定义,类外初始化,在初始化的时候不要带上static,并且要指定类域,不然的话编译器编译的时候就找不到对应的变量了

template <class T>
class ThreadPool
{
private:
    static ThreadPool<T>* _tp;
    //不可以像下面初始化
    //static ThreadPool<T>* _tp = nullptr;
    static pthread_mutex_t _lock;
};

template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

4、这里采用懒汉模式创建单例,所以在GetInstance接口中new对象

这里这个函数为静态的,为了访问静态变量_tp,如果不是静态函数的话就访问不了_tp了

    static ThreadPool<T>* GetInstance()
    {
        if(_tp == nullptr) 
        {
            pthread_mutex_lock(&_lock);
            if(_tp == nullptr)
            {
                _tp = new ThreadPool<T>;
            }
            pthread_mutex_unlock(&_lock);
        }

        return _tp;
    }

还要进行两次判空:

我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率,所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了第一个if,但是,依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率

完整代码:

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>

struct ThreadInfo
{
    pthread_t _tid;
    std::string ThreadName;
};

static int defaultnum = 5;

template <class T>
class ThreadPool
{
public:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsEmpty()
    {
        return _task.empty();
    }

    std::string GetThreadName(pthread_t tid)
    {
        for (auto &it : _threads)
        {
            if (it._tid == tid)
            {
                return it.ThreadName;
            }
        }
        return "NONE";
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

public:
    static void *myhander(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        std::string name = tp->GetThreadName(pthread_self());

        while (true)
        {
            tp->Lock();
            while (tp->IsEmpty())
            {
                tp->ThreadSleep();
            }
            T ret = tp->Pop();
            tp->UnLock();

            ret();
            std::cout << name << "正在运行,结果: " << ret.Getresult() << std::endl;
        }
    }

    T Pop()
    {
        T t = _task.front();
        _task.pop();
        return t;
    }

    void Push(const T &t)
    {
        Lock();
        _task.push(t);
        Wakeup();
        UnLock();
    }

    void Start()
    {
        int num = _threads.size();
        for (int i = 0; i < num; i++)
        {
            _threads[i].ThreadName = "thread-" + std::to_string(i + 1);
            pthread_create(&(_threads[i]._tid), nullptr, myhander, this);
        }
    }

    //懒汉模式
    static ThreadPool<T>* GetInstance()
    {
        if(_tp == nullptr) //我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率
        {                  //所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了235行,但是
                           //依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就
                           //会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率
            pthread_mutex_lock(&_lock);
            if(_tp == nullptr)
            {
                _tp = new ThreadPool<T>;
            }
            pthread_mutex_unlock(&_lock);
        }

        return _tp;
    }

private:
    ThreadPool(int num = defaultnum)
        : _threads(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    ThreadPool(const ThreadPool<T>& copy) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;
private:
    std::vector<ThreadInfo> _threads;
    std::queue<T> _task;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static ThreadPool<T>* _tp;
    //static ThreadPool<T>* _tp = nullptr;
    static pthread_mutex_t _lock;
};

template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

四、其他锁:

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁:申请锁失败的时候不将自己挂起,而是不断尝试申请锁

自旋就是不把自己挂起,而是由线程不断地去周而复始的去申请锁,如果申请锁成功则进入,失败则返回重新检测锁的状态

自旋锁相关接口:

自旋锁的类型是pthread_spinlock_t

加解自旋锁:

其中:

int pthread_spin_lock(pthread_spinlock_t *lock)失败就不断重试(阻塞式)

int pthread_spin_trylock(pthread_spinlock_t *lock);失败就继续向后运行(非阻塞式)

其实是类似于之前学习的互斥锁,使用和其大差不差

什么时候使用自旋锁:

1、锁持有时间极短
当临界区代码执行时间非常短(如几个指令周期)时,自旋锁的“忙等待”不会显著浪费CPU资源
2、多核/多CPU环境
在多核系统中,等待线程可能在另一个核心上运行,避免因睡眠-唤醒带来的上下文切换开销
3、不可睡眠的上下文
在中断上下文、内核抢占禁用区域等不能睡眠的代码路径中,必须使用自旋锁
4、对实时性要求高
自旋锁的响应延迟低(无上下文切换),适合实时系统或低延迟任务

特性自旋锁互斥锁
等待方式忙等待(循环检测)睡眠等待(进入阻塞状态)
CPU消耗高(持续占用CPU)低(释放CPU给其他线程)
适用场景短临界区、多核、不可睡眠长临界区、通用场景
性能优势低延迟(无上下文切换)高吞吐量(减少CPU浪费)

五、读者写者问题

这个也是遵循"321"原则的

3种关系:

读者和读者,写者和写者,读者和写者

2个角色:

读者和写者

1个交易场所:

阻塞队列或者缓冲区

以上基本是和生产消费者模型是差不多的,不同的是读者和读者之间是没有互斥关系的,最大的原因在于读者是只会读缓冲区中的数据,是不会将数据拿走或者修改的,所以不需要维护读者和读者之间的互斥关系

相关接口:

注意:读者和写者的锁不是同一个锁

初始化,销毁自旋锁:

读者进行加锁,解锁

写者进行加锁,解锁

解锁,这个读者写者的锁都能解

读者写者的伪代码:

理解:

读者在读数据的时候,写者是不能够写的,但是其他读者可以读

写者在写数据的时候,读者是不能够读数据的

当所有读者读完数据的时候,再让写者写数据

当写者写完数据的时候,再让读者读数据

接下来实现一个读者优先的伪代码:

void Reader()
{
        // 首先就是读者的加锁和解锁操作:
    // 首先对读者进行加锁
    pthread_mutex_lock(&rlock);
    reader_count++; //在这里将读者的数量+1
    // 接着判断读者是否为1,如果为1就证明这是第一个读者,此时申请写者的锁,就会有两种情况
    // 1、写者没有进行写入,所以写者也就没有申请到锁,所以读者申请成功锁,让以后写者无法进行写入
    // 2、写者正在进行写入,所以写者也就持有锁,那么第一个读者也就在这里申请失败锁,进入阻塞队列中等待
    if (reader_count == 1) 
    {
        pthread_mutex_lock(&wlock);
    }
    // 这里是释放的读者自己的锁,如果读者正在读,一定会持有写者的锁的
    pthread_mutex_unlock(&rlock);

    // 在这里进行读取,因为不需要维护读者之间的互斥关系也就不需要锁,直接进行读取即可
    
    //当读取完后,下面就是减少读者的代码
    pthread_mutex_lock(&rlock);

    reader_count--; // 将读者的数量-1
    //这里进行判断,如果读者的数量为0的时候释放写者的锁,此时写者就能够进行写入了
    if (reader_count == 0) 
    {
        pthread_mutex_lock(&wlock);
    }

    pthread_mutex_unlock(&rlock);
}
void Writer()
{
    // 然后是写者的加锁,写入数据,解锁
    pthread_mutex_lock(&wlock);
    
    //写入数据
    
    pthread_mutex_unlock(&wlock);
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dtcms.com/a/127490.html

相关文章:

  • apijson 快速上手
  • 5.DJI-PSDK:Psdk开发负载与Msdk的应用app进行交互:
  • Spring Boot 常用依赖介绍
  • 什么是自动化测试?
  • 既然安装了WSL2和Ubuntu,那么怎么和windows传递文件(1)
  • Redis安装与基础配置(Windows及linux)
  • 《Vue Router实战教程》22.导航故障
  • 前端工程化:构建高效可维护的现代Web应用
  • 【特权FPGA】之数码管
  • vscode报错:unins000.exe 尝试在目标目录创建文件时发生一个错误
  • Pyside6使用QtWebEngine实现GUI嵌入网页内容
  • 【愚公系列】《Python网络爬虫从入门到精通》047-验证码识别(第三方验证码识别)
  • DeepSeek 助力 Vue3 开发:打造丝滑的日历(Calendar),日历_基础功能示例(CalendarView01_01)
  • 【ESP32-C6】Base on esptool commands to enable Flash Encryption and Secure Boot
  • 5G中的DU和CU的作用
  • 【C++篇】C++模板初阶:从泛型编程到函数模板与类模板的全面解析
  • 【closerAI ComfyUI】nunchaku加持下,FLUX四重控制万物迁移秒出图,晋升生产力工具,开源界福音!收藏学习
  • 如何打通虚拟化-容器环境并保障流量安全?SmartX VCCI 方案升级!
  • Google A2A协议,是为了战略性占领标准?
  • 【端到端】端到端自动驾驶依赖Occupancy进行运动规划?还是可以具有生成局部地图来规划?
  • 在新一代人工智能技术引领下的,相互联系、层层递进的明厨亮灶开源了
  • 加载js/mjs模块时服务器返回的 MIME 类型不对导致模块被拒绝执行
  • DataFrame操作(扩充)
  • 【数据结构】红黑树超详解 ---一篇通关红黑树原理(含源码解析+动态构建红黑树)
  • 用python实现了一个推箱子游戏的环境,后面准备以此为基础实现强化学习。
  • Linux 第二讲 --- 基础指令(二)
  • MySQL--基础知识点--81.1--存储过程 vs 存储函数
  • GPT - GPT(Generative Pre-trained Transformer)模型框架
  • 安宝特案例 | Fundació Puigvert 医院应用AR技术开创尿石症治疗新纪元
  • 【AI+Java学习】AI时代Spring AI学习路径