用MessageBus优化模块通信:实现订阅/发布模式
- 1、场景介绍
- 2、为什么采用消息总线,订阅/发布模式的特点
- 3、订阅/发布模式的介绍与代码演练
- 总结
1、场景介绍
在复杂的软件系统中,我们常常需要多个模块协同工作。
比如上方有管理套接字的管理器和管理模块的管理器,可能涉及相互调用方法的场景,最常规的就是都创建后,将实例化对象传入到对方中,再相互调用方法。
问题是:
- 如果模块之间直接互相调用,会出现严重的耦合。
- 一个模块改动,可能会影响很多其他模块。
解决办法就是引入 消息总线(MessageBus):
- 所有模块都只和“总线”通信,不直接互相依赖。
- 上位机发来的指令通过总线分发给相关模块。
- 模块的状态也可以通过总线广播给感兴趣的对象。
2、为什么采用消息总线,订阅/发布模式的特点
消息总线采用 订阅/发布(Pub-Sub)模式。
其核心思想是:
- 订阅:模块声明自己对某类消息感兴趣,并注册一个回调函数。
- 发布:某个模块发送消息到总线,总线会自动通知所有订阅者。
优点
-
解耦
- 发布者不需要知道订阅者是谁。
- 订阅者也不需要知道消息从哪来。
-
可扩展性强
- 新增一个模块,只需订阅相关主题即可,无需改动原有模块。
-
灵活的消息流转
- 一个消息可以广播给多个模块,天然支持一对多通信。
就像电台广播:
- 广播站只需要往“频道”里发声音;
- 所有调到该频道的收音机都能听到,不需要一个个单独通知。
3、订阅/发布模式的介绍与代码演练
所谓的订阅发布本质上就是维护一个👉 总线内部的字典:
subscribers[topic1] = {回调函数32, 回调函数52, ...}
subscribers[topic2] = {回调函数112, 回调函数12, ...}
subscribers[topic3] = {回调函数145, 回调函数62, ...}
当某个线程对 topic1 感兴趣时,它会通过调用 subscribe 方法向消息总线注册 topic1 及其对应的回调函数。随后,当系统中其他组件调用 publish 发布 topic1 的消息时,消息总线会在内部的 subscribers 字典中查找与 topic1 关联的所有回调函数,并依次执行这些回调函数,将消息对象传递给订阅者进行处理。
下面通过一个简化版的 MessageBus
演示消息总线的实现。
(1)消息结构
struct Message {std::string topic; // 主题,如 "command" 或 "status"std::string payload; // 消息内容,可以是字符串/JSON/二进制数据
};
(2)消息总线
class MessageBus {
public:using Callback = std::function<void(const Message&)>;// 订阅主题:将回调函数存到 subscribers 表里void subscribe(const std::string& topic, Callback cb) {subscribers[topic].push_back(cb); // 将新注册的回调函数cb压入topic中}// 发布消息:找到所有订阅该主题的回调并依次调用void publish(const Message& msg) {// 发布消息时,遍历topic下所有回调函数,并执行if (subscribers.find(msg.topic) != subscribers.end()) {for (auto& cb : subscribers[msg.topic]) {cb(msg);}}}private:std::unordered_map<std::string, std::vector<Callback>> subscribers;
};
(3)类模块作为订阅者
我们以 转台模块 为例。
class Turret {
public:Turret(std::string name): name(name), angle(0) {}// 处理 command 消息void onCommand(const Message& msg) {std::cout << "[" << name << "] 收到命令: " << msg.payload << std::endl;angle += 10;std::cout << "[" << name << "] 当前角度: " << angle << std::endl;}private:std::string name;int angle;
};
(4)绑定成员函数作为回调
在订阅时,我们可以绑定类成员函数:
MessageBus bus;
Turret turret("转台模块");// 绑定成员函数作为回调
bus.subscribe("command", std::bind(&Turret::onCommand, &turret, std::placeholders::_1));// 发布一条消息
bus.publish({"command", "MOVE_TURRET 10 20"});
输出结果
[转台模块] 收到命令: MOVE_TURRET 10 20
[转台模块] 当前角度: 10
(5)也可以用 Lambda 捕获 this
现代 C++ 更常用 Lambda:
bus.subscribe("command", [&](const Message& msg) {turret.onCommand(msg); // 调用对象的方法
});
这样写更简洁。
总结
- 消息总线是一种发布/订阅模式的实现,核心是“主题→回调函数列表”的映射。
- 订阅就是注册一个回调函数到总线;
- 发布就是找到所有订阅该主题的回调并依次调用。
- 成员函数完全可以作为回调,被调用时能访问对象的所有属性和方法。
通过消息总线,模块之间不再直接依赖,而是通过“消息”沟通,系统结构更清晰、可维护性更强。