轻量级消息总线实现 (C++)
轻量级消息总线实现 (C++)
消息总线是一种用于组件间通信的架构模式,它允许系统中的不同部分通过发布/订阅机制进行松耦合的交互。下面是一个轻量级的C++消息总线实现。
基本实现
1. 消息总线核心类
#include <functional> #include <map> #include <vector> #include <memory> #include <mutex> #include <any>class MessageBus { public:using Handler = std::function<void(const std::any&)>;// 订阅消息template <typename MessageType>void subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));handlers_[typeIndex].emplace_back([handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});}// 发布消息template <typename MessageType>void publish(const MessageType& message) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {for (auto& handler : it->second) {handler(message);}}}private:std::map<std::type_index, std::vector<Handler>> handlers_;std::mutex mutex_; };
2. 使用示例
#include <iostream> #include <string>// 定义消息类型 struct TestMessage {int id;std::string content; };struct AnotherMessage {float value; };int main() {MessageBus bus;// 订阅TestMessagebus.subscribe<TestMessage>([](const TestMessage& msg) {std::cout << "Received TestMessage: " << msg.id << ", " << msg.content << std::endl;});// 订阅AnotherMessagebus.subscribe<AnotherMessage>([](const AnotherMessage& msg) {std::cout << "Received AnotherMessage: " << msg.value << std::endl;});// 发布消息bus.publish(TestMessage{1, "Hello"});bus.publish(AnotherMessage{3.14f});return 0; }
高级特性实现
1. 线程安全的单例模式
class MessageBus { public:// 获取单例实例static MessageBus& instance() {static MessageBus instance;return instance;}// 删除拷贝构造函数和赋值运算符MessageBus(const MessageBus&) = delete;MessageBus& operator=(const MessageBus&) = delete;// ... 其他成员函数同上 ...private:MessageBus() = default;~MessageBus() = default;// ... 其他成员变量同上 ... };
2. 支持取消订阅
class MessageBus { public:using HandlerId = size_t;template <typename MessageType>HandlerId subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));HandlerId id = nextHandlerId_++;handlers_[typeIndex].emplace_back(id,[handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});return id;}void unsubscribe(std::type_index typeIndex, HandlerId id) {std::lock_guard<std::mutex> lock(mutex_);auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {auto& handlers = it->second;handlers.erase(std::remove_if(handlers.begin(), handlers.end(),[id](const auto& pair) { return pair.first == id; }),handlers.end());}}private:std::map<std::type_index, std::vector<std::pair<HandlerId, Handler>>> handlers_;HandlerId nextHandlerId_ = 0;// ... 其他成员变量 ... };
3. 异步消息处理
#include <future> #include <queue>class MessageBus { public:// 异步发布消息template <typename MessageType>std::future<void> publishAsync(const MessageType& message) {return std::async(std::launch::async, [this, message] {this->publish(message);});}// 处理消息队列void processQueue() {while (!messageQueue_.empty()) {auto task = std::move(messageQueue_.front());messageQueue_.pop();task();}}// 延迟发布template <typename MessageType>void publishDelayed(const MessageType& message) {std::lock_guard<std::mutex> lock(queueMutex_);messageQueue_.emplace([this, message] {this->publish(message);});}private:std::queue<std::function<void()>> messageQueue_;std::mutex queueMutex_; };
性能优化建议
使用对象池:对于频繁创建销毁的消息对象,可以使用对象池减少内存分配开销
类型擦除优化:使用更高效的类型擦除技术替代
std::any
无锁队列:在高并发场景下,考虑使用无锁队列替代互斥锁
批量处理:支持批量消息发布以减少锁竞争
完整实现示例
#include <functional> #include <map> #include <vector> #include <memory> #include <mutex> #include <any> #include <typeindex> #include <queue> #include <future>class MessageBus { public:using HandlerId = size_t;using Handler = std::function<void(const std::any&)>;// 获取单例实例static MessageBus& instance() {static MessageBus instance;return instance;}// 订阅消息template <typename MessageType>HandlerId subscribe(std::function<void(const MessageType&)> handler) {std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));HandlerId id = nextHandlerId_++;handlers_[typeIndex].emplace_back(id,[handler](const std::any& message) {handler(std::any_cast<MessageType>(message));});return id;}// 取消订阅void unsubscribe(std::type_index typeIndex, HandlerId id) {std::lock_guard<std::mutex> lock(mutex_);auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {auto& handlers = it->second;handlers.erase(std::remove_if(handlers.begin(), handlers.end(),[id](const auto& pair) { return pair.first == id; }),handlers.end());}}// 同步发布template <typename MessageType>void publish(const MessageType& message) {std::vector<Handler> handlersToCall;{std::lock_guard<std::mutex> lock(mutex_);std::type_index typeIndex(typeid(MessageType));auto it = handlers_.find(typeIndex);if (it != handlers_.end()) {for (auto& [id, handler] : it->second) {handlersToCall.push_back(handler);}}}for (auto& handler : handlersToCall) {handler(message);}}// 异步发布template <typename MessageType>std::future<void> publishAsync(const MessageType& message) {return std::async(std::launch::async, [this, message] {this->publish(message);});}// 延迟发布template <typename MessageType>void publishDelayed(const MessageType& message) {std::lock_guard<std::mutex> lock(queueMutex_);messageQueue_.emplace([this, message] {this->publish(message);});}// 处理延迟队列void processQueue() {std::queue<std::function<void()>> queue;{std::lock_guard<std::mutex> lock(queueMutex_);queue.swap(messageQueue_);}while (!queue.empty()) {auto task = std::move(queue.front());queue.pop();task();}}private:MessageBus() = default;~MessageBus() = default;MessageBus(const MessageBus&) = delete;MessageBus& operator=(const MessageBus&) = delete;std::map<std::type_index, std::vector<std::pair<HandlerId, Handler>>> handlers_;std::queue<std::function<void()>> messageQueue_;std::mutex mutex_;std::mutex queueMutex_;HandlerId nextHandlerId_ = 0; };
这个轻量级消息总线实现提供了:
线程安全的发布/订阅机制
同步和异步消息发布
延迟消息处理
单例模式访问
取消订阅功能
可以根据具体需求进一步扩展或优化。