当前位置: 首页 > news >正文

【C/C++】如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦

文章目录

  • 如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?
    • 1 假设场景设计
    • 2 Codes
    • 3 流程图
    • 4 优劣势
    • 5 风险可能

如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?


1 假设场景设计

  • Producer(生产者):生成任务并推送到队列中。
  • TaskQueue(主题/被观察者):任务队列,同时也是一个“可被观察”的对象,它在收到新任务后,会主动通知观察者(消费者)
  • Consumer(观察者):注册到队列中,当有新任务时被通知,并从队列中拉取任务。

避免了消费者主动等待(如传统条件变量 wait),改用回调通知


2 Codes

#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主题(被观察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified();  // 回调通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消费者(观察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all();  // 所有线程都唤醒}void onNotified() override {cv_.notify_one();  // 唤醒 run 中等待的线程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生产者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 启动两个消费者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 启动生产者线程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}

输出

[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9

关键代码解读:
Consumer 类中的 onNotified()run() 方法是如何配合实现消费者监听通知的 ==》背后即“观察者 + 条件变量”的事件驱动机制
TaskQueue::notifyObservers() 调用Consumer::onNotified,唤醒等待的 Consumer::run() 线程。

二者配合流程详解

  1. run() 是消费者线程主循环(由 start() 启动)

    • 每个 Consumer 启动后会在一个独立线程中运行 run() 方法;
    • 它使用 cv_.wait(lock) 进入 阻塞等待状态,直到被通知(由 notify_one() 唤醒);
    • 唤醒后尝试从 TaskQueuepopTask(),直到队列为空;
    • 然后再次进入等待。
  2. onNotified() 是“被观察者”的回调通知函数

    • TaskQueue::notifyObservers() 被调用(例如 pushTask() 中调用)时,会遍历注册的观察者;
    • 每个观察者(即 Consumer)都会被调用 onNotified()
    • onNotified() 会调用 cv_.notify_one(),唤醒 run() 中正在等待的线程。

3 流程图

[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被唤醒↓→ popTask()↓→ 处理任务

4 优劣势

编码可能遇到的问题原因/应对
cv.wait() 可能虚假唤醒可用 cv.wait(lock, condition) 代替裸 wait(),避免无任务时误唤醒。
多个消费者抢任务多个消费者被唤醒时要竞争 queue_ 锁,可通过加任务标签或调度器来分配。
重复唤醒开销大若任务频繁到达,建议合并通知、或按“任务计数”通知。
优点描述
解耦消费者不需要主动轮询,事件驱动机制带来良好模块化。
可扩展支持多个消费者动态注册,符合微服务或事件分发模型。
降低等待利用通知机制唤醒消费者,避免空轮询带来的 CPU 消耗。
灵活性可轻松拓展为异步观察者队列、支持任务优先级、过滤等机制。

5 风险可能

  • 若消费者数量多,且频繁 wakeup,可能存在“惊群效应”。
  • 可以通过线程绑定负载均衡策略来优化通知粒度。
  • 可扩展为事件过滤、类型区分(如不同类型的消费者响应不同事件)。

文章转载自:

http://HUEZWoaU.pqqzd.cn
http://dliqBQBo.pqqzd.cn
http://CLbh3SUk.pqqzd.cn
http://bk14kYP0.pqqzd.cn
http://4CgiiRUL.pqqzd.cn
http://ei6B7JHo.pqqzd.cn
http://6pLqQTDK.pqqzd.cn
http://d0f6Fqcu.pqqzd.cn
http://1kZetPmL.pqqzd.cn
http://419syJbQ.pqqzd.cn
http://EqvrdLoO.pqqzd.cn
http://fkDL03Is.pqqzd.cn
http://pUMztpis.pqqzd.cn
http://NW2GB5jL.pqqzd.cn
http://zCnl9Yq0.pqqzd.cn
http://zREGVYXu.pqqzd.cn
http://9eppesBy.pqqzd.cn
http://CiflRooa.pqqzd.cn
http://hHG49VfG.pqqzd.cn
http://NYojtNjU.pqqzd.cn
http://TSIyW24q.pqqzd.cn
http://GhusUKDU.pqqzd.cn
http://hs9Bj85V.pqqzd.cn
http://HEeUSAWJ.pqqzd.cn
http://LGefVFWH.pqqzd.cn
http://qkmQZ25h.pqqzd.cn
http://nYIvDjIC.pqqzd.cn
http://WP3NhEsS.pqqzd.cn
http://25tPlq7G.pqqzd.cn
http://VSZdcTvi.pqqzd.cn
http://www.dtcms.com/a/214409.html

相关文章:

  • 无人机降落伞设计要点难点及原理!
  • 双臂机器人运动空间与干涉分析仿真技术报告
  • 仅录系统声音,不录外部噪音,详细图文教程
  • FacePoke创意交互实战:Cpolar技术赋能远程人像编辑的趣味实现
  • 鸿蒙OSUniApp 实现的一键清除输入框内容功能#三方框架 #Uniapp
  • PyTorch实现MLP信用评分模型全流程
  • 如何调试CATIA CAA程序导致的CATIA异常崩溃问题
  • 基于私有化 DeepSeek 大模型的工业罐区跑冒滴漏检测技术研究与应用
  • 网络安全之Web渗透加解密
  • 我们是如何为 ES|QL 重建自动补全功能的
  • 创建一个简易的风扇动画界面:基于 WPF 和 XAML 的实现教程
  • Google 发布AI 编程工具Jules
  • 从数据页角度理解B+树查询
  • 虚拟机配置桥接,远程工具直接访问
  • Vue3解决路由缓存问题
  • 基于matlab版本的三维直流电法反演算法
  • 二叉树part01(二)
  • DNS解析流程入门篇
  • java基础(面向对象进阶高级)泛型(API一)
  • 编程日志5.20
  • 深入剖析Java中的伪共享:原理、检测与解决方案
  • uniapp 搭配uviwe u-picker 实现地区联栋
  • OSPF补充信息
  • MathQ-Verify:数学问题验证的五步流水线,为大模型推理筑牢数据基石
  • Neural Blind Deconvolution Using Deep Priors论文阅读
  • Leetcode 3556. Sum of Largest Prime Substrings
  • 《1.1_4计算机网络的分类|精讲篇|附X-mind思维导图》
  • 如何设计ES的冷热数据分离架构?Elasticsearch 集群如何实现高可用?如何避免脑裂问题?如果出现脑裂如何恢复?
  • API Gateway CLI 实操入门笔记(基于 LocalStack)
  • 基于 docker 部署 k8s 集群