C++实现线程池(5)计划线程池
五. ScheduledThreadPool 的实现
5.1 需求
在指定的时间间隔或时间点异步地执行任务。
5.2 SyncQueue 的设计和实现
SyncQueue 类代码
#ifndef SYNCQUEUE5_HPP
#define SYNCQUEUE5_HPP#include<vector>
#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
#include<chrono>
#include<queue>
#include<functional>using namespace std;
using namespace std::chrono;using Task = std::function<void(void)>;struct PairTask
{size_t first;Task second;
public:PairTask() :first(0), second(nullptr) {}PairTask(const size_t& tp, const Task& s):first(tp), second(s){}operator size_t() const { return first; }~PairTask(){//cout << "destroy PairTask" << endl;}
};class SyncQueue
{
private:std::priority_queue<PairTask,std::vector<PairTask>,std::greater<PairTask>> m_taskQueues;size_t m_maxSize; // 队列的大小mutable std::mutex m_mutex;std::condition_variable m_notEmpty; //对应于消费者std::condition_variable m_notFull; //对应于生产者std::condition_variable m_condition_;size_t m_waitTime; //任务队列满等待时间 sbool m_needStop; // true 同步队列停止工作bool IsFull() const{bool full = m_taskQueues.size() >= m_maxSize;if (full){//clog << " m_queue 已经满了,需要等待..." << endl;}return full;}bool IsEmpty() const{bool empty = m_taskQueues.empty();if (empty){// clog << "m_queue 已经空了,需要等待..." << endl;}return empty;}int Add(const PairTask& task){std::unique_lock<std::mutex> locker(m_mutex);bool waitret = m_notFull.wait_for(locker,std::chrono::seconds(m_waitTime),[=] { return m_needStop || !IsFull(); });if (!waitret){return 1;}if (m_needStop){return 2;}m_taskQueues.push(task);m_notEmpty.notify_all();return 0;}public:SyncQueue(int bucketsize = 10, int maxsize = 200, size_t timeout = 1):m_maxSize(maxsize),m_needStop(false),m_waitTime(timeout){}~SyncQueue(){}int Put(const PairTask& task) // 0 ..m_bucketSize-1{return Add(task);}int Take(PairTask& task){std::unique_lock<std::mutex> locker(m_mutex);while (IsEmpty() && !m_needStop){m_notEmpty.wait(locker);}if (m_needStop)return 2;task = m_taskQueues.top();m_taskQueues.pop();m_notFull.notify_all();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && !IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.size() >= m_maxSize;}size_t Size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.size();}size_t Count() const{return Size();}
};#endif
5.3 ScheduledThreadPool 的设计和实现
ScheduledThreadPool 类代码
#ifndef SCHEDULEDTHREADPOOL_HPP
#define SCHEDULEDTHREADPOOL_HPP
#include"SyncQueue5.hpp"
#include<functional>
#include<future>
#include<memory>
#include<chrono>using namespace std;
using namespace std::chrono;class ScheduledThreadPool
{
private:SyncQueue m_queue;std::list<std::shared_ptr<std::thread>> m_threadgroup;std::atomic_bool m_running; // false; // true;std::once_flag m_flag;void Start(int numthreads){m_running = true;for (int i = 0; i < numthreads; ++i){m_threadgroup.push_back(std::make_shared<std::thread>(std::thread(&ScheduledThreadPool::RunInThread, this)));}}void RunInThread(){while (m_running){PairTask task;if (m_queue.Take(task) == 0){task.second();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& tha : m_threadgroup){if (tha && tha->joinable()){tha->join();}}m_threadgroup.clear();}public:ScheduledThreadPool(const int qusize = 10, const int numthreads = 8):m_queue(qusize),m_running(false){std::call_once(m_flag, &ScheduledThreadPool::Start, this, numthreads);}~ScheduledThreadPool(){Stop();}void Stop(){std::call_once(m_flag, [this]() { StopThreadGroup(); });}template<class Func, class... Args>void Execute(int interval, Func&& func, Args&&... args){using RetType = decltype(func(args...));auto task = std::make_shared<std::function<void(void)>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));auto tk = PairTask(interval, [task]() { (*task)(); });if (m_queue.Put(tk) != 0){(*task)();}}template<class Func, class... Args>auto Submit(int interval, Func&& func, Args&&... args){using RetType = decltype(func(args...));auto task = std::make_shared<std::packaged_task<RetType(void)>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<RetType> result = task->get_future();auto tk = PairTask(interval, [task]() { (*task)(); });if (m_queue.Put(tk) != 0){(*task)();}return result;}
};#endif
测试代码
#include"ScheduledThreadPool.hpp"
#include<iostream>using namespace std;void Func(int a)
{cout << "func : " << a << endl;
}int main()
{ScheduledThreadPool mypool(10, 1);mypool.Execute(3, Func, 3);mypool.Execute(5, Func, 5);mypool.Execute(1, Func, 1);mypool.Execute(8, Func, 8);std::this_thread::sleep_for(seconds(2));cout << "main end ..." << endl;return 0;
}
5.4 适用场景
ScheduledThreadPool 适用于以下场景:
- 定时任务调度:ScheduledThreadPool 可以用于执行在固定时间间隔内运行的任务。例如,定期备份数据、清理临时文件、定时发送邮件等。
- 任务调度器:ScheduledThreadPool 可以用作任务调度器,用于安排并执行后续任务。例如,可以在程序启动后安排一些初始化任务执行,或者在程序结束前执行一些清理任务。
- 心跳任务:ScheduledThreadPool 可以用于执行心跳任务,即周期性地检测系统状态或发送心跳信号。例如,检查网络连接、检测硬件设备的状态等。
- 定时数据处理:ScheduledThreadPool 可以用于定时处理一些数据任务,例如在特定时间点对数据库进行优化、对日志进行归档等。