当前位置: 首页 > news >正文

【Linux】线程池

线程池

  • 一.线程池
    • 1.日志和策略模式
    • 2.线程池
      • 1.Task.hpp
      • 2.Thread.hpp
      • 3.ThreadPool.hpp
      • 4.ThreadPool.cc

本节重点:

  • 设计日志和线程池。
  • 理解线程安全和可重入,掌握锁相关概念。

一.线程池

在写线程池之前,我们要做如下准备:

  • 准备线程的封装。
  • 准备锁和条件变量的封装。
  • 引入日志,对线程进行封装。

1.日志和策略模式

  • 日志:记录系统和软件运行中发生事件的文件,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具。
  • 日志格式中的某些指标是必须有:时间戳、日志等级、日志内容。存在几个指标是可选的:文件名行号、进程,线程相关id信息等。
  • 日志有现成的解决方案:spdlog、glog、Boost.Log、Log4cxx等。日志位于 /var/log/ 路径下
  • 设计模式:在软件开发过程中,针对反复出现的问题所总结归纳出的通用解决方案。

策略模式:

  • 抽象策略类(基类):包含一个或多个纯虚函数,用于声明具体策略类需要实现的接口。
  • 具体策略类(派生类):重写了抽象策略类中定义的接口,每个具体策略类代表一个具体的接口。
  • 上下文类:持有一个抽象策略类的指针/引用,负责根据需要选择和使用具体的策略类。

抽象策略类的作用:定义统一接口,运行时多态,提高代码的可维护性和可扩展性。

这里采用 设计模式 - 策略模式 来进行日志的设计,我们想要的日志格式如下:

[可读性很好的时间] [日志等级] [进程pid] [打印对应日志的文件名][行号] - 消息内容, 支持可变参数

[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world
// Log.hpp
#pragma once

#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> // C++17文件系统
#include <fstream>    // 文件流
#include <sstream>    // 字符串流
#include <memory>
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace MutexModule;

    // 获取系统时间
    std::string CurrentTime()
    {
        time_t time_stamp = ::time(nullptr); // 获取时间戳
        struct tm curr;
        localtime_r(&time_stamp, &curr); // 将时间戳转化为可读性强的信息

        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",
                 curr.tm_year + 1900,
                 curr.tm_mon + 1,
                 curr.tm_mday,
                 curr.tm_hour,
                 curr.tm_min,
                 curr.tm_sec);

        return buffer;
    }

    // 日志文件: 默认路径和默认文件名
    const std::string defaultlogpath = "./log/";
    const std::string defaultlogname = "log.txt";

    // 日志等级
    enum class LogLevel
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string Level2String(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "NONE";
        }
    }

    // 3. 策略模式: 刷新策略
    class LogStrategy
    {
    public:
        virtual ~LogStrategy() = default; //???
        // 纯虚函数: 无法实例化对象, 派生类可以重载该函数, 实现不同的刷新方式
        virtual void SyncLog(const std::string &message) = 0;
    };

    // 3.1 控制台策略
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        ConsoleLogStrategy() {}
        ~ConsoleLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::cout << message << std::endl;
        }

    private:
        Mutex _mutex;
    };

    // 3.2 文件级(磁盘)策略
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)
            : _logpath(logpath), _logname(logname)
        {
            // 判断_logpath目录是否存在
            if (std::filesystem::exists(_logpath))
            {
                return;
            }
            try
            {
                std::filesystem::create_directories(_logpath);
            }
            catch (std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << "\n";
            }
        }
        ~FileLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::string log = _logpath + _logname;
            std::ofstream out(log, std::ios::app); // 以追加的方式打开文件
            if (!out.is_open())
            {
                return;
            }
            out << message << "\n"; // 将信息刷新到out流中
            out.close();
        }

    private:
        std::string _logpath;
        std::string _logname;
        Mutex _mutex;
    };

    // 4. 日志类: 构建日志字符串, 根据策略进行刷新
    class Logger
    {
    public:
        Logger()
        {
            // 默认往控制台上刷新
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }
        ~Logger() {}

        void EnableConsoleLog()
        {
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }

        void EnableFileLog()
        {
            _strategy = std::make_shared<FileLogStrategy>();
        }

        // 内部类: 记录完整的日志信息
        class LogMessage
        {
        public:
            LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)
                : _currtime(CurrentTime()), _level(level), _pid(::getpid())
                , _filename(filename), _line(line), _logger(logger)
            {
                std::stringstream ssbuffer;
                ssbuffer << "[" << _currtime << "] "
                         << "[" << Level2String(_level) << "] "
                         << "[" << _pid << "] "
                         << "[" << _filename << "] "
                         << "[" << _line << "] - ";

                _loginfo = ssbuffer.str();
            }
            ~LogMessage()
            {
                if(_logger._strategy)
                {
                    _logger._strategy->SyncLog(_loginfo);
                }
            }

            template <class T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ssbuffer;
                ssbuffer << info;
                _loginfo += ssbuffer.str();
                return *this;
            }

        private:
            std::string _currtime;  // 当前日志时间
            LogLevel _level;       // 日志水平
            pid_t _pid;            // 进程pid
            std::string _filename; // 文件名
            uint32_t _line;        // 日志行号
            Logger &_logger;       // 负责根据不同的策略进行刷新
            std::string _loginfo;  // 日志信息
        };

        // 故意拷贝, 形成LogMessage临时对象, 后续在被<<时,会被持续引用,
        // 直到完成输入,才会自动析构临时LogMessage, 至此完成了日志的刷新,
        // 同时形成的临时对象内包含独立日志数据, 未来采用宏替换, 获取文件名和代码行数
        LogMessage operator()(LogLevel level, const std::string &filename, int line)
        {
            return LogMessage(level, filename, line, *this);
        }

    private:
        // 纯虚类不能实例化对象, 但是可以定义指针
        std::shared_ptr<LogStrategy> _strategy; // 日志刷新策略方案
    };

    // 定义全局logger对象
    Logger logger;

// 编译时进行宏替换: 方便随时获取行号和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)

// 提供选择使用何种日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}

// Main.cc
#include <iostream>
#include "Log.hpp"
using namespace LogModule;

int main()
{
    // 往显示器中写入
    ENABLE_CONSOLE_LOG();
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";

    // 往文件中写入
    ENABLE_FILE_LOG();
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";

    return 0;
}
xzy@hcss-ecs-b3aa:~$ ./testLog 
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world

2.线程池

  • 线程池:创建一定数量线程,这些线程处于等待任务的状态。如果没有任务,线程在条件变量下等待,直到有任务到来。当有新的任务到来时,线程池会唤醒一个线程执行任务。当任务执行完毕后,线程不会被销毁,而是返回到线程池中等待下一个任务,从而实现线程的复用。

线程池使用场景:

  • 高并发场景:在处理大量并发请求的场景中,如 Web 服务器、数据库服务器等,使用线程池可以有效地处理并发请求,提高系统的吞吐量。
  • 任务执行频繁的场景:当程序中需要频繁地执行一些小任务时,使用线程池可以避免频繁地创建和销毁线程,提高程序的效率。
  • 需要控制线程数量的场景:在一些对系统资源有限制的场景中,如嵌入式系统、移动设备等,使用线程池可以控制线程的数量,避免系统资源耗尽。

这里我们实现:创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口。

在这里插入图片描述

1.Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include "Log.hpp"

using namespace LogModule;

void MySql(std::string name)
{
    LOG(LogLevel::DEBUG) << "我是一个数据任务, 我正在被执行" << "[" << name << "]";
}

void UpLoad(std::string name)
{
    LOG(LogLevel::DEBUG) << "我是一个上传任务, 我正在被执行" << "[" << name << "]";
}

void DownLoad(std::string name)
{
    LOG(LogLevel::DEBUG) << "我是一个下载任务, 我正在被执行" << "[" << name << "]";
}

using task_t = std::function<void(std::string name)>;
std::vector<task_t> tasks;

2.Thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>

namespace ThreadModule
{
    using func_t = std::function<void(std::string)>;
    static int number = 1;

    // 强类型枚举: 枚举的成员名称被限定在枚举类型的作用域内
    enum class TSTATUS
    {
        NEW,
        RUNNING,
        STOP
    };

    class Thread
    {
    private:
        // 成员方法: 需要加上static表示不需要this指针, 否则回调函数报错
        // 而要执行_func()函数又需要由this指针, 所以Routine函数传this指针
        static void *Routine(void *args)
        {
            Thread *t = static_cast<Thread *>(args);
            t->_func(t->Name());
            return nullptr;
        }

        void EnableDetach() { _joinable = false; }

    public:
        Thread(func_t func)
            : _func(func), _status(TSTATUS::NEW), _joinable(true)
        {
            _name = "Thread-" + std::to_string(number++);
            _pid = getpid();
        }

        ~Thread() {}

        // 线程创建
        bool Start()
        {
            if (_status != TSTATUS::RUNNING)
            {
                int n = pthread_create(&_tid, nullptr, Routine, this);
                if (n != 0)
                    return false;
                _status = TSTATUS::RUNNING;
                return true;
            }
            return false;
        }

        // 线程退出
        bool Stop()
        {
            if (_status == TSTATUS::RUNNING)
            {
                int n = ::pthread_cancel(_tid);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }

        // 线程等待
        bool Join()
        {
            if (_joinable)
            {
                int n = ::pthread_join(_tid, nullptr);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }

        // 线程分离
        bool Detach()
        {
            EnableDetach();
            int n = ::pthread_detach(_tid);
            if (n != 0)
                return false;
            return true;
        }

        // 线程是否分离
        bool IsJoinable() {  return _joinable; }
        std::string Name() { return _name; }

    private:
        std::string _name;
        pthread_t _tid;
        pid_t _pid;
        bool _joinable; // 线程是否是分离的, 默认不是
        func_t _func;
        TSTATUS _status;
    };
}

3.ThreadPool.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Log.hpp"

namespace ThreadPoolModule
{
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    using namespace LogModule;

    using thread_t = std::shared_ptr<Thread>;
    const static int defaultnum = 5;

    template <class T>
    class ThreadPool
    {
    private:
        bool IsEmpty() { return _taskq.empty(); }

        void HandlerTask(std::string name)
        {
            LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask执行逻辑";
            while (true)
            {
                // 1. 拿任务: 访问共享资源, 需要加锁
                T task;
                {
                    LockGuard lockguard(_mutex);
                    while (IsEmpty() && _isrunning) // while替代if: 防止伪唤醒
                    {
                        _wait_num++;
                        _cond.Wait(_mutex); // 没任务时: 线程在条件变量上阻塞等待
                        _wait_num--;
                    }
                    // 2. 任务队列不为空 && 线程池退出
                    if (IsEmpty() && !_isrunning)
                        break;

                    task = _taskq.front();
                    _taskq.pop();
                }

                // 3. 处理任务: 并发处理, 不需要持有锁
                task(name);
            }
            LOG(LogLevel::INFO) << "线程: " << name << ", 退出";
        }

    public:
        ThreadPool(int num = defaultnum)
            : _num(num), _wait_num(0), _isrunning(false)
        {
            for (int i = 0; i < _num; i++)
            {
                // 在类中: bind类的公有方法, 需要取地址 + 传入this指针
                // 在类外: bind类的公有方法, 需要取地址 + 传入类的匿名对象
                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // push_back()会调用移动构造
                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";
            }
        }

        ~ThreadPool() {}

        void Equeue(const T &in)
        {
            LockGuard lockguard(_mutex);
            if (!_isrunning) return;
            _taskq.push(in);
            if (_wait_num > 0)
            {
                _cond.Signal(); // 唤醒线程
            }
        }

        void Start()
        {
            if (_isrunning) return;
            _isrunning = true;
            for (auto &thread_ptr : _threads)
            {
                thread_ptr->Start();
                LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";
            }
        }

        void Stop()
        {
            LockGuard lockguard(_mutex);
            if (_isrunning)
            {
                // 1. 不能再新增任务了
                _isrunning = false;

                // 2. 让线程自己退出(唤醒所有的线程) && 历史任务被执行完
                if (_wait_num > 0)
                {
                    _cond.Broadcast();
                }
            }
        }

        void Wait()
        {
            for (auto &thread_ptr : _threads)
            {
                thread_ptr->Join();
                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";
            }
        }

    private:
        int _num;                       // 线程的个数
        std::vector<thread_t> _threads; // 线程池
        std::queue<T> _taskq;           // 共享资源: 任务队列
        int _wait_num;                  // 等待的线程数目
        bool _isrunning;                // 线程池是否运行

        Mutex _mutex; // 锁
        Cond _cond;   // 条件变量
    };
}

4.ThreadPool.cc

#include <iostream>
#include <memory>
#include <unistd.h>
#include "ThreadPool.hpp"
#include "Task.hpp"

using namespace ThreadPoolModule;

int main()
{
    ENABLE_CONSOLE_LOG();

    tasks.push_back(MySql);
    tasks.push_back(UpLoad);
    tasks.push_back(DownLoad);

    std::shared_ptr<ThreadPool<task_t>> tp = std::make_shared<ThreadPool<task_t>>(3);

    tp->Start();
    int cnt = 0;
    while(cnt < 6)
    {
        tp->Equeue(tasks[cnt % 3]);
        cnt++;
        sleep(1);
    }
    tp->Stop();
    sleep(3);
    tp->Wait();

    return 0;
}
xzy@hcss-ecs-b3aa:~$ ./thread_pool 
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-1对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-2对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-3对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-1...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-2...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-3...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-1, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-2, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-3, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [DEBUG] [1030242] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-1]
[2025-03-11 17:32:02] [DEBUG] [1030242] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-2]
[2025-03-11 17:32:03] [DEBUG] [1030242] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-3]
[2025-03-11 17:32:04] [DEBUG] [1030242] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-1]
[2025-03-11 17:32:05] [DEBUG] [1030242] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-2]
[2025-03-11 17:32:06] [DEBUG] [1030242] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-3]
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-2, 退出
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-3, 退出
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-1, 退出
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-1...成功
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-2...成功
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-3...成功

相关文章:

  • 解锁 Ryu API:从 Python 接口到 REST 设计全解析
  • Markdown 语法入门指南(VSCode 版)
  • NVSHMEM介绍、InfiniBand GPUDirect、和NVshmem使用案例说明
  • Scala编程_数组、列表、元组、集合与映射
  • GStreamer —— 2.18、Windows下Qt加载GStreamer库后运行 - “播放教程 6:音频可视化“(附:完整源码)
  • ubuntu挂载新硬盘
  • 5G工业路由器赋能无人码头,港口物流智能化管理
  • 大语言模型-语言模型发展历程
  • 安徽通信施工安全员ABC证备考练习题及答案
  • 项目部署到生产上遇到的网络问题
  • 【鸿蒙开发】MongoDB入门
  • minio数据迁移
  • 利用微软的 HTML 应用程序宿主程序的攻击
  • 【2025】基于python+django的考研自习室预约系统(源码、万字文档、图文修改、调试答疑)
  • 简要分析NETLINK_KOBJECT_UEVENT参数
  • SegMAN模型详解及代码复现
  • 预防痉挛性斜颈的护理方法
  • 华为hcia——Datacom实验指南——以太网帧和IPV4数据包格式(一)
  • DeepSeek-R1 论文阅读总结
  • skywalking部署与应用
  • 夜读丨母亲为燕子打开家门
  • 现场丨在胡适施蛰存等手札与文献间,再读百年光华
  • 联合国第二届运动会闭幕,刘国梁受邀成为“联合国运动会大使”
  • 孙简任吉林省副省长
  • 北京航空航天大学首个海外创新研究院落户巴西
  • 演员黄晓明、金世佳进入上海戏剧学院2025年博士研究生复试名单