当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列--C++11 异步操作实现线程池

目录

std::future

介绍

应用场景

 用法示例

使用 std::async 关联异步任务

 使用 std::packaged_task 和 std::future 配合

 使用 std::promise 和 std::future 配合

c++11 线程池实现


std::future

介绍

         std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。  

应用场景

  1. 异步任务: 当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率。
  2. 并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作。
  3. 结果获取:std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果。

 用法示例

使用 std::async 关联异步任务

        std::async 是一种将任务与 std::future 关联的简单方法。它创建并运行一个异步任务,并返回一个与该任务结果关联的 std::future 对象。默认情况下,std::async 是否启动一个新线程,或者在等待 future 时,任务是否同步运行都取决于你给的 参数。这个参数为 std::launch 类型:

  1. ○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或者 wait()才会开始执行任务。
  2. std::launch::async 表明函数会在自己创建的线程上运行。
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略。

deferred策略:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>int add(int num1, int num2)
{std::cout << "加法:" << num1+num2 << std::endl;return num1+num2;
}int main()
{std::cout << "------------------------1---------------------------" << std::endl;// std::launch::deferred: 调用get才会执行加法函数std::future<int> result = std::async(std::launch::deferred, add, 11, 22);// std::future<int> result = std::async(std::launch::async, add, 11, 22);sleep(1);std::cout << "------------------------2---------------------------" << std::endl;int sum = result.get(); std::cout << sum << std::endl;return 0;
}

运行结果:

 async策略:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>int add(int num1, int num2)
{std::cout << "加法:" << num1+num2 << std::endl;return num1+num2;
}int main()
{std::cout << "------------------------1---------------------------" << std::endl;// std::launch::deferred: 调用get才会执行加法函数// std::future<int> result = std::async(std::launch::deferred, add, 11, 22);std::future<int> result = std::async(std::launch::async, add, 11, 22);sleep(1);std::cout << "------------------------2---------------------------" << std::endl;int sum = result.get(); std::cout << sum << std::endl;return 0;
}

运行结果:

 使用 std::packaged_task 和 std::future 配合

        std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。

        可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。

演示demo:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>int add(int num1, int num2)
{sleep(1);return num1+num2;
}int main()
{std::packaged_task<int(int, int)> pt(add);// pt 可以看做是一个可调用对象, 但不能作为一个完全的函数来使用std::future<int> fu = pt.get_future();pt(11, 22);int result = fu.get();std::cout << result << std::endl;return 0;
}

运行结果:

异步执行 std::packaged_task 任务:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>int add(int num1, int num2)
{sleep(1);return num1+num2;
}int main()
{// pt虽然重载了()运算符,但pt并不是一个函数,所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错// 而 packaged_task 禁止了拷贝构造,// 且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法当作参数一样传递// 传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用// 因此想要将一个 packaged_task 进行异步调用,// 简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用// 而类型不同的问题,在使用的时候可以使用类型推导来解决// 将pt定义为一个智能指针auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(add);std::future<int> fu = ptask->get_future();std::thread th([ptask](){(*ptask)(11, 22);});int result = fu.get();std::cout << result << std::endl;th.join();return 0;
}

运行结果:

 使用 std::promise 和 std::future 配合

        std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪。

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>void add(int num1, int num2, std::promise<int> &prom)
{sleep(1);prom.set_value(num1+num2);return;
}int main()
{std::promise<int> prom;std::future<int> fu = prom.get_future();std::thread th(add, 11, 22, std::ref(prom));int result = fu.get();std::cout << result << std::endl;th.join();return 0;
}

运行结果:

c++11 线程池实现

        基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择std::packaged_task 加上std::future 的组合来实现。

线程池的工作思想:
        用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的
工作线程来执行函数完成任务

实现:

  • 管理的成员
  1. 任务池:用 vector 维护的一个函数任务池子▪ 互斥锁 & 条件变量: 实现同步互斥
  2. 一定数量的工作线程:用于不断从任务池取出任务执行任务▪ 结束运行标志:以便于控制线程池的结束。
  • 管理的操作:
  1. 入队任务:入队一个函数和参数
  2. 停止运行:终止线程池
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <memory>
#include <vector>
#include <atomic>class ThreadPool
{
public:using Func = std::function<void(void)>;ThreadPool(int thread_num = 1):_stop(false){for (int i = 0; i < thread_num; i++){_threads.emplace_back(&ThreadPool::entry, this);}}~ThreadPool(){stop();}template <class F, class ...Args>auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>{using return_type = decltype(func(args...));// 由于不确定参数,就先绑定进函数auto Func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);// 利用智能指针管理一个new出来的packaged_task对象auto task = std::make_shared<std::packaged_task<return_type()>>(Func);std::future<return_type> fu = task->get_future();{// 推入任务池std::unique_lock<std::mutex> lock(_mutex);_tasks.emplace_back([task](){(*task)();});_cv.notify_one();}return fu;}void stop() {if(_stop) return;_stop = true;_cv.notify_all();for (auto &thread : _threads){thread.join();}}private:void entry(){while(!_stop){std::vector<Func> tmp_tasks;{// 加锁std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空或线程池停止_cv.wait(lock, [this](){return !_tasks.empty() || _stop;});// 取出任务tmp_tasks.swap(_tasks);}// 执行任务for (auto &task : tmp_tasks){task();}}}private:std::mutex _mutex;std::condition_variable _cv;std::vector<std::thread> _threads;std::vector<Func> _tasks;std::atomic<bool> _stop;
};

http://www.dtcms.com/a/283809.html

相关文章:

  • InfluxDB 3与Apache Parquet:打造高性能时序数据存储与分析解决方案
  • Apache DolphinScheduler介绍与部署
  • UE5 Nanite使用
  • 下班倒计时
  • 链路聚合实训
  • 管家婆价格折扣跟踪管理:查询、新增、修改、删除
  • JAVA中的Map集合
  • 【01背包】P1466 [USACO2.2] 集合 Subset Sums
  • 华为云容器产品分析
  • HTML表格基础
  • 【Linux】第一个小程序—进度条
  • HikariCP数据库连接池高性能优化实战指南
  • Spring Boot 参数校验:@Valid 与 @Validated
  • 线上协同办公时代:以开源AI大模型等工具培养网感,拥抱职业变革
  • 【前沿技术动态】【AI总结】Spring Boot 4.0 预览版深度解析:云原生时代的新里程碑
  • Fair-code介绍(Fair code)(一套新型软件模型:旨在“开源”“商业可持续性”中找到平衡)
  • Spring Boot Jackson 序列化常用配置详解
  • redis速记
  • Jenkins Git Parameter 分支不显示前缀origin/或repo/
  • 【37】MFC入门到精通——MFC中 CString 数字字符串 转 WORD ( CString, WORD/int 互转)
  • 我爱学算法之—— 前缀和(下)
  • 破局 Meme 币永续:跨界融合 Ormer + AI + 舆情监控 的颠覆性框架
  • 日志采集——ZeroMQ的配置
  • MyBatis 之配置与映射核心要点解析
  • 林曦词典|文质彬彬
  • 如何查询pg账号权限 能否创建模式 删表建表
  • Vim多列打开不同文件操作指南
  • 什么是AI-AIGC-AGI-Agent?基本概念与区别的详细解析
  • 【SAP SD】跨公司销售、第三方销售、STO采购(公司间合同配件)
  • 【困难】题解力扣23:合并K个升序链表