当前位置: 首页 > news >正文

轻量级消息总线实现 (C++)

轻量级消息总线实现 (C++)

消息总线是一种用于组件间通信的架构模式,它允许系统中的不同部分通过发布/订阅机制进行松耦合的交互。下面是一个轻量级的C++消息总线实现。

基本实现

1. 消息总线核心类

#include <functional>
#include <map>
#include <vector>
#include <memory>
#include <mutex>
#include <any>class MessageBus {
public:using Handler = std::function<void(const std::any&)>;// 订阅消息template <typename MessageType>void subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));handlers_[typeIndex].emplace_back([handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});}// 发布消息template <typename MessageType>void publish(const MessageType& message) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {for (auto& handler : it->second) {handler(message);}}}private:std::map<std::type_index, std::vector<Handler>> handlers_;std::mutex mutex_;
};

2. 使用示例

#include <iostream>
#include <string>// 定义消息类型
struct TestMessage {int id;std::string content;
};struct AnotherMessage {float value;
};int main() {MessageBus bus;// 订阅TestMessagebus.subscribe<TestMessage>([](const TestMessage& msg) {std::cout << "Received TestMessage: " << msg.id << ", " << msg.content << std::endl;});// 订阅AnotherMessagebus.subscribe<AnotherMessage>([](const AnotherMessage& msg) {std::cout << "Received AnotherMessage: " << msg.value << std::endl;});// 发布消息bus.publish(TestMessage{1, "Hello"});bus.publish(AnotherMessage{3.14f});return 0;
}

高级特性实现

1. 线程安全的单例模式

class MessageBus {
public:// 获取单例实例static MessageBus& instance() {static MessageBus instance;return instance;}// 删除拷贝构造函数和赋值运算符MessageBus(const MessageBus&) = delete;MessageBus& operator=(const MessageBus&) = delete;// ... 其他成员函数同上 ...private:MessageBus() = default;~MessageBus() = default;// ... 其他成员变量同上 ...
};

2. 支持取消订阅

class MessageBus {
public:using HandlerId = size_t;template <typename MessageType>HandlerId subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));HandlerId id = nextHandlerId_++;handlers_[typeIndex].emplace_back(id,[handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});return id;}void unsubscribe(std::type_index typeIndex, HandlerId id) {std::lock_guard<std::mutex> lock(mutex_);auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {auto& handlers = it->second;handlers.erase(std::remove_if(handlers.begin(), handlers.end(),[id](const auto& pair) { return pair.first == id; }),handlers.end());}}private:std::map<std::type_index, std::vector<std::pair<HandlerId, Handler>>> handlers_;HandlerId nextHandlerId_ = 0;// ... 其他成员变量 ...
};

3. 异步消息处理

#include <future>
#include <queue>class MessageBus {
public:// 异步发布消息template <typename MessageType>std::future<void> publishAsync(const MessageType& message) {return std::async(std::launch::async, [this, message] {this->publish(message);});}// 处理消息队列void processQueue() {while (!messageQueue_.empty()) {auto task = std::move(messageQueue_.front());messageQueue_.pop();task();}}// 延迟发布template <typename MessageType>void publishDelayed(const MessageType& message) {std::lock_guard<std::mutex> lock(queueMutex_);messageQueue_.emplace([this, message] {this->publish(message);});}private:std::queue<std::function<void()>> messageQueue_;std::mutex queueMutex_;
};

性能优化建议

  1. 使用对象池:对于频繁创建销毁的消息对象,可以使用对象池减少内存分配开销

  2. 类型擦除优化:使用更高效的类型擦除技术替代std::any

  3. 无锁队列:在高并发场景下,考虑使用无锁队列替代互斥锁

  4. 批量处理:支持批量消息发布以减少锁竞争

完整实现示例

#include <functional>
#include <map>
#include <vector>
#include <memory>
#include <mutex>
#include <any>
#include <typeindex>
#include <queue>
#include <future>class MessageBus {
public:using HandlerId = size_t;using Handler = std::function<void(const std::any&)>;// 获取单例实例static MessageBus& instance() {static MessageBus instance;return instance;}// 订阅消息template <typename MessageType>HandlerId subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));HandlerId id = nextHandlerId_++;handlers_[typeIndex].emplace_back(id,[handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});return id;}// 取消订阅void unsubscribe(std::type_index typeIndex, HandlerId id) {std::lock_guard<std::mutex> lock(mutex_);auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {auto& handlers = it->second;handlers.erase(std::remove_if(handlers.begin(), handlers.end(),[id](const auto& pair) { return pair.first == id; }),handlers.end());}}// 同步发布template <typename MessageType>void publish(const MessageType& message) {std::vector<Handler> handlersToCall;{std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {for (auto& [id, handler] : it->second) {handlersToCall.push_back(handler);}}}for (auto& handler : handlersToCall) {handler(message);}}// 异步发布template <typename MessageType>std::future<void> publishAsync(const MessageType& message) {return std::async(std::launch::async, [this, message] {this->publish(message);});}// 延迟发布template <typename MessageType>void publishDelayed(const MessageType& message) {std::lock_guard<std::mutex> lock(queueMutex_);messageQueue_.emplace([this, message] {this->publish(message);});}// 处理延迟队列void processQueue() {std::queue<std::function<void()>> queue;{std::lock_guard<std::mutex> lock(queueMutex_);queue.swap(messageQueue_);}while (!queue.empty()) {auto task = std::move(queue.front());queue.pop();task();}}private:MessageBus() = default;~MessageBus() = default;MessageBus(const MessageBus&) = delete;MessageBus& operator=(const MessageBus&) = delete;std::map<std::type_index, std::vector<std::pair<HandlerId, Handler>>> handlers_;std::queue<std::function<void()>> messageQueue_;std::mutex mutex_;std::mutex queueMutex_;HandlerId nextHandlerId_ = 0;
};

这个轻量级消息总线实现提供了:

  • 线程安全的发布/订阅机制

  • 同步和异步消息发布

  • 延迟消息处理

  • 单例模式访问

  • 取消订阅功能

可以根据具体需求进一步扩展或优化。

http://www.dtcms.com/a/283523.html

相关文章:

  • 适用于高性能封装的TGV视觉检测方案
  • 版本更新 | 华望M-Design 4400版本功能更新详解(下篇)
  • [特殊字符] Electron 中的 `global` 变量
  • LVGL 列表表格控件颜色修改
  • 【时时三省】(C语言基础)通过指针引用多维数组2
  • oracle服务器定时备份Windows Server
  • 怎么用快鲸aiseo提升百度搜索排名?
  • 在 IntelliJ IDEA 中添加框架支持的解决方案(没有出现Add Framework Support)
  • 因果发现PCMCI 算法简述、Tigramite库的简单实践
  • 了解Java21
  • 项目流程管理系统使用建议:推荐13款
  • 【前端】【Echarts】【热力图】ECharts 热力图配置详解:从分割线到数据标签的全面指南
  • SQLSERVER清理日志
  • FreeRTOS学习笔记之任务调度
  • Maple2025 软件安装教程(Win版)
  • C++基础语法/C++语言新特性
  • 嵌入式Linux内存管理子系统控制与实现
  • 视觉SLAM:使用 Sophus 的 SE3 类,自己设计 g2o 的节点与边,实现 PnP 和 ICP 的优化
  • 交易日历接口api,股票/板块日,周,月K线行情接口api,情绪周期api,Level2实时数据api
  • 【python】sys.executable、sys.argv、Path(__file__) 在PyInstaller打包前后的区别
  • 《Java语言程序设计》1.2.3复习题
  • PHP 社区正在讨论变更许可证,预计 PHP 9.0 版本将完全生效
  • LangChain面试内容整理-知识点20:LangChain Expression Language (LCEL)
  • SAP学习笔记 - 开发46 - RAP开发 Managed App Metadata Extension 2 - Booking_M,BookSuppl_M
  • 2.PCL 对于点云的读写
  • 手写和印刷体混合怎么识别
  • ESP32——基于idf框架开发GPIO设备
  • 高性能架构模式——高性能NoSQL
  • 如何打造全场景数字化OA办公平台?
  • 网络(数据库1)