消息缓存系统
消息缓存系统是一个多线程消息处理框架,采用反射机制实现消息类型与处理逻辑的动态绑定,核心功能是实现消息的异步接收、缓存和处理。
消息缓存系统
特点
- 反射机制:使用
std::type_index
和类型擦除技术实现消息类型与处理函数的映射 - 多线程处理:
- 接收线程:负责接收外部消息并放入队列
- 处理线程:负责从队列取出消息并分派给对应处理器
- 线程安全:使用互斥锁和条件变量保证消息队列的线程安全
- 消息与处理器绑定:通过
MessageRegistry
实现消息类型与处理函数的动态绑定 - 扩展性:可以轻松添加新的消息类型和处理函数,无需修改核心代码
#include <iostream>
#include <string>
#include <memory>
#include <unordered_map>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <typeindex>
#include <vector>
#include <atomic>// 前置声明消息基类
class Message;// 消息处理器基类,使用类型擦除技术
class MessageHandlerBase {
public:virtual ~MessageHandlerBase() = default;virtual void handle(std::shared_ptr<Message> msg) = 0;
};// 消息处理器模板类,实现具体的消息处理逻辑
template<typename T>
class MessageHandler : public MessageHandlerBase {using HandlerFunc = std::function<void(std::shared_ptr<T>)>;HandlerFunc handler;
public:explicit MessageHandler(HandlerFunc func) : handler(func) {}void handle(std::shared_ptr<Message> msg) override {auto concreteMsg = std::dynamic_pointer_cast<T>(msg);if (concreteMsg) {handler(concreteMsg);}}
};// 消息基类,所有具体消息类都继承自此类
class Message {
public:virtual ~Message() = default;virtual std::type_index getType() const = 0;
};// 反射注册表,用于消息类型和处理器的映射
class MessageRegistry {std::unordered_map<std::type_index, std::shared_ptr<MessageHandlerBase>> handlers;
public:// 注册消息类型及其处理器template<typename T>void registerHandler(std::function<void(std::shared_ptr<T>)> handler) {handlers[std::type_index(typeid(T))] = std::make_shared<MessageHandler<T>>(handler);}// 查找消息处理器std::shared_ptr<MessageHandlerBase> findHandler(const Message& msg) {auto it = handlers.find(msg.getType());if (it != handlers.end()) {return it->second;}return nullptr;}
};// 具体消息类示例
template<typename T>
class TypedMessage : public Message {T data;
public:explicit TypedMessage(const T& data) : data(data) {}std::type_index getType() const override { return std::type_index(typeid(TypedMessage<T>)); }const T& getData() const { return data; }
};// 消息队列,线程安全
class MessageQueue {std::queue<std::shared_ptr<Message>> queue;mutable std::mutex mutex;std::condition_variable cv;std::atomic<bool> running{true};public:// 添加消息到队列void enqueue(std::shared_ptr<Message> msg) {std::unique_lock<std::mutex> lock(mutex);queue.push(msg);cv.notify_one();}// 从队列中取出消息std::shared_ptr<Message> dequeue() {std::unique_lock<std::mutex> lock(mutex);cv.wait(lock, [this]{ return !queue.empty() || !running; });if (!running && queue.empty()) {return nullptr;}auto msg = queue.front();queue.pop();return msg;}// 停止队列void stop() {running = false;cv.notify_all();}
};// 消息总线,管理整个消息系统
class MessageBus {MessageQueue messageQueue;MessageRegistry registry;std::thread receiverThread;std::thread processorThread;std::atomic<bool> running{false};public:// 启动消息总线void start() {if (running) return;running = true;// 接收线程receiverThread = std::thread([this] {receiveMessages();});// 处理线程processorThread = std::thread([this] {processMessages();});}// 停止消息总线void stop() {if (!running) return;running = false;messageQueue.stop();if (receiverThread.joinable()) receiverThread.join();if (processorThread.joinable()) processorThread.join();}// 注册消息处理器template<typename T>void registerHandler(std::function<void(std::shared_ptr<T>)> handler) {registry.registerHandler<T>(handler);}// 发送消息void sendMessage(std::shared_ptr<Message> msg) {messageQueue.enqueue(msg);}private:// 模拟接收消息void receiveMessages() {while (running) {// 这里可以实现真实的消息接收逻辑// 例如从网络、文件等读取消息// 为了演示,我们创建一些示例消息std::this_thread::sleep_for(std::chrono::milliseconds(100));static int counter = 0;if (counter % 2 == 0) {auto msg = std::make_shared<TypedMessage<std::string>>("Hello " + std::to_string(counter));sendMessage(msg);} else {auto msg = std::make_shared<TypedMessage<int>>(counter);sendMessage(msg);}counter++;}}// 处理消息void processMessages() {while (running) {auto msg = messageQueue.dequeue();if (!msg) continue;auto handler = registry.findHandler(*msg);if (handler) {handler->handle(msg);} else {std::cerr << "No handler registered for message type: " << msg->getType().name() << std::endl;}}}
};// 使用示例
int main() {MessageBus bus;// 注册消息处理器bus.registerHandler<TypedMessage<std::string>>([](std::shared_ptr<TypedMessage<std::string>> msg) {std::cout << "Processing string message: " << msg->getData() << std::endl;});bus.registerHandler<TypedMessage<int>>([](std::shared_ptr<TypedMessage<int>> msg) {std::cout << "Processing int message: " << msg->getData() << std::endl;});// 启动消息总线bus.start();// 让主线程运行一段时间std::this_thread::sleep_for(std::chrono::seconds(5));// 停止消息总线bus.stop();return 0;
}
可以通过继承Message
类创建自定义消息类型,并使用MessageBus::registerHandler
方法注册对应的处理函数。系统会自动将消息路由到正确的处理函数。
二、优化
优化点
- 功能增强:
- 为消息添加了时间戳功能,便于跟踪消息生命周期
- 实现了消息的字符串表示方法,方便日志输出和调试
- 为消息队列添加了超时机制,避免线程无限阻塞
- 增加了统计功能,跟踪处理的消息数量和队列状态
- 错误处理:
- 增加了更全面的异常处理,特别是在线程创建和消息处理过程中
- 对空指针、无效参数等情况进行了检查
- 提供了更详细的错误信息输出
- 性能与安全性:
- 优化了互斥锁的使用,减少锁竞争
- 为消息注册表添加了互斥锁,支持动态注册处理器
- 使用原子变量替代普通变量,确保多线程环境下的安全访问
- 改进了条件变量的等待逻辑,避免虚假唤醒
- 扩展性:
- 消息处理器的注册和查找机制更加灵活
- 可以轻松添加新的消息类型和对应的处理器
- 消息总线的启动和停止逻辑更加健壮
#include <iostream>
#include <string>
#include <memory>
#include <unordered_map>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <typeindex>
#include <vector>
#include <atomic>
#include <chrono>
#include <stdexcept>
#include <sstream>
#include <iomanip>// 前置声明消息基类
class Message;/*** @brief 消息处理器基类* 采用类型擦除技术,为不同类型的消息提供统一的处理接口*/
class MessageHandlerBase {
public:// 虚析构函数,确保派生类能够正确析构virtual ~MessageHandlerBase() = default;/*** @brief 处理消息的纯虚函数* @param msg 待处理的消息(基类指针)*/virtual void handle(std::shared_ptr<Message> msg) = 0;
};/*** @brief 消息处理器模板类* 实现具体类型消息的处理逻辑,将通用接口与具体实现关联* @tparam T 消息类型*/
template<typename T>
class MessageHandler : public MessageHandlerBase {
public:// 定义消息处理函数的类型using HandlerFunc = std::function<void(std::shared_ptr<T>)>;/*** @brief 构造函数* @param func 具体的消息处理函数*/explicit MessageHandler(HandlerFunc func) : handler_(std::move(func)) {if (!handler_) {throw std::invalid_argument("Message handler function cannot be null");}}/*** @brief 处理消息的实现* 将基类指针转换为具体消息类型指针,然后调用对应的处理函数* @param msg 待处理的消息(基类指针)*/void handle(std::shared_ptr<Message> msg) override {// 动态类型转换,确保类型安全auto concrete_msg = std::dynamic_pointer_cast<T>(msg);if (concrete_msg) {try {handler_(concrete_msg);} catch (const std::exception& e) {std::cerr << "Error handling message: " << e.what() << std::endl;}} else {std::cerr << "Failed to cast message to concrete type" << std::endl;}}private:HandlerFunc handler_; // 具体的消息处理函数
};/*** @brief 消息基类* 所有具体消息类都需要继承自此基类,实现消息类型识别接口*/
class Message {
public:// 虚析构函数,确保派生类能够正确析构virtual ~Message() = default;/*** @brief 获取消息类型* @return 消息类型的type_index*/virtual std::type_index getType() const = 0;/*** @brief 获取消息的字符串表示(用于日志和调试)* @return 消息的字符串描述*/virtual std::string toString() const {return "Base Message";}
};/*** @brief 消息注册表* 负责管理消息类型与处理器之间的映射关系,实现反射机制的核心*/
class MessageRegistry {
private:// 消息类型到处理器的映射表std::unordered_map<std::type_index, std::shared_ptr<MessageHandlerBase>> handlers_;// 保护映射表的互斥锁(支持动态注册处理器)mutable std::mutex mutex_;public:/*** @brief 注册消息处理器* @tparam T 消息类型* @param handler 处理该类型消息的函数*/template<typename T>void registerHandler(std::function<void(std::shared_ptr<T>)> handler) {if (!std::is_base_of<Message, T>::value) {throw std::invalid_argument("Type must inherit from Message");}std::lock_guard<std::mutex> lock(mutex_);auto type = std::type_index(typeid(T));// 检查是否已经注册了该类型的处理器if (handlers_.find(type) != handlers_.end()) {std::cerr << "Warning: Overriding existing handler for message type" << std::endl;}handlers_[type] = std::make_shared<MessageHandler<T>>(std::move(handler));}/*** @brief 查找消息对应的处理器* @param msg 消息对象* @return 对应的处理器,如果没有找到则返回nullptr*/std::shared_ptr<MessageHandlerBase> findHandler(const Message& msg) const {std::lock_guard<std::mutex> lock(mutex_);auto it = handlers_.find(msg.getType());return (it != handlers_.end()) ? it->second : nullptr;}/*** @brief 获取已注册的消息类型数量* @return 注册的消息类型数量*/size_t getHandlerCount() const {std::lock_guard<std::mutex> lock(mutex_);return handlers_.size();}
};/*** @brief 通用类型消息模板类* 可以包装任意类型的数据作为消息内容* @tparam T 消息数据的类型*/
template<typename T>
class TypedMessage : public Message {
private:T data_; // 消息携带的数据std::chrono::system_clock::time_point timestamp_; // 消息创建时间戳public:/*** @brief 构造函数* @param data 消息携带的数据*/explicit TypedMessage(T data) : data_(std::move(data)), timestamp_(std::chrono::system_clock::now()) {}/*** @brief 获取消息类型* @return 消息类型的type_index*/std::type_index getType() const override {return std::type_index(typeid(TypedMessage<T>));}/*** @brief 获取消息携带的数据* @return 消息数据的常量引用*/const T& getData() const {return data_;}/*** @brief 获取消息的时间戳* @return 消息创建的时间点*/const std::chrono::system_clock::time_point& getTimestamp() const {return timestamp_;}/*** @brief 获取消息的字符串表示* @return 包含消息数据和时间戳的字符串*/std::string toString() const override {std::stringstream ss;auto time = std::chrono::system_clock::to_time_t(timestamp_);ss << "TypedMessage<" << typeid(T).name() << "> [data: " << data_ << ", timestamp: " << std::put_time(std::localtime(&time), "%F %T") << "]";return ss.str();}
};/*** @brief 线程安全的消息队列* 实现生产者-消费者模型,支持多线程环境下的消息缓存*/
class MessageQueue {
private:std::queue<std::shared_ptr<Message>> queue_; // 消息队列mutable std::mutex mutex_; // 保护队列的互斥锁std::condition_variable cv_; // 条件变量,用于线程同步std::atomic<bool> is_running_; // 队列运行状态标志std::atomic<size_t> message_count_; // 消息计数,用于监控public:/*** @brief 构造函数*/MessageQueue() : is_running_(true), message_count_(0) {}/*** @brief 析构函数*/~MessageQueue() {stop();}/*** @brief 向队列添加消息* @param msg 要添加的消息* @return 添加成功返回true,否则返回false*/bool enqueue(std::shared_ptr<Message> msg) {if (!is_running_ || !msg) {return false;}// 使用unique_lock实现自动加锁和解锁std::unique_lock<std::mutex> lock(mutex_);queue_.push(std::move(msg));message_count_++;// 通知等待的线程有新消息到来cv_.notify_one();return true;}/*** @brief 从队列取出消息* @param timeout 超时时间(毫秒),-1表示无限等待* @return 取出的消息,如果超时或队列停止则返回nullptr*/std::shared_ptr<Message> dequeue(int timeout = -1) {std::unique_lock<std::mutex> lock(mutex_);// 根据超时参数等待条件满足if (timeout < 0) {// 无限等待直到队列非空或停止运行cv_.wait(lock, [this] { return !queue_.empty() || !is_running_; });} else {// 有限时间等待auto status = cv_.wait_for(lock, std::chrono::milliseconds(timeout),[this] { return !queue_.empty() || !is_running_; });if (!status) {// 等待超时return nullptr;}}// 如果队列已停止且为空,则返回nullptrif (!is_running_ && queue_.empty()) {return nullptr;}// 取出消息并更新计数auto msg = queue_.front();queue_.pop();message_count_--;return msg;}/*** @brief 停止队列运行*/void stop() {is_running_ = false;cv_.notify_all(); // 唤醒所有等待的线程}/*** @brief 获取当前队列中的消息数量* @return 消息数量*/size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}/*** @brief 检查队列是否为空* @return 为空返回true,否则返回false*/bool empty() const {return size() == 0;}/*** @brief 获取消息总数(统计信息)* @return 处理过的消息总数*/size_t getMessageCount() const {return message_count_;}
};/*** @brief 消息总线类* 管理整个消息系统的核心类,协调消息的接收、缓存和处理*/
class MessageBus {
private:MessageQueue message_queue_; // 消息队列MessageRegistry registry_; // 消息注册表std::thread receiver_thread_; // 接收线程std::thread processor_thread_; // 处理线程std::atomic<bool> is_running_; // 总线运行状态std::atomic<size_t> processed_count_; // 已处理消息计数// 接收线程函数的前置声明void receiveMessages();// 处理线程函数的前置声明void processMessages();public:/*** @brief 构造函数*/MessageBus() : is_running_(false), processed_count_(0) {}/*** @brief 析构函数*/~MessageBus() {stop();}/*** @brief 启动消息总线* @return 启动成功返回true,否则返回false*/bool start() {if (is_running_) {std::cerr << "MessageBus is already running" << std::endl;return false;}is_running_ = true;// 启动接收线程try {receiver_thread_ = std::thread(&MessageBus::receiveMessages, this);} catch (const std::exception& e) {std::cerr << "Failed to start receiver thread: " << e.what() << std::endl;is_running_ = false;return false;}// 启动处理线程try {processor_thread_ = std::thread(&MessageBus::processMessages, this);} catch (const std::exception& e) {std::cerr << "Failed to start processor thread: " << e.what() << std::endl;is_running_ = false;if (receiver_thread_.joinable()) {receiver_thread_.join();}return false;}std::cout << "MessageBus started successfully" << std::endl;return true;}/*** @brief 停止消息总线*/void stop() {if (!is_running_) {return;}is_running_ = false;message_queue_.stop();// 等待接收线程结束if (receiver_thread_.joinable()) {try {receiver_thread_.join();} catch (const std::exception& e) {std::cerr << "Error joining receiver thread: " << e.what() << std::endl;}}// 等待处理线程结束if (processor_thread_.joinable()) {try {processor_thread_.join();} catch (const std::exception& e) {std::cerr << "Error joining processor thread: " << e.what() << std::endl;}}std::cout << "\nMessageBus stopped. Statistics:" << std::endl;std::cout << "Total messages processed: " << processed_count_ << std::endl;std::cout << "Remaining messages in queue: " << message_queue_.size() << std::endl;}/*** @brief 注册消息处理器* @tparam T 消息类型* @param handler 处理该类型消息的函数*/template<typename T>void registerHandler(std::function<void(std::shared_ptr<T>)> handler) {try {registry_.registerHandler<T>(std::move(handler));std::cout << "Registered handler for message type: " << typeid(T).name() << std::endl;} catch (const std::exception& e) {std::cerr << "Failed to register handler: " << e.what() << std::endl;}}/*** @brief 发送消息到消息总线* @param msg 要发送的消息* @return 发送成功返回true,否则返回false*/bool sendMessage(std::shared_ptr<Message> msg) {if (!is_running_ || !msg) {return false;}return message_queue_.enqueue(std::move(msg));}/*** @brief 获取当前已注册的处理器数量* @return 处理器数量*/size_t getHandlerCount() const {return registry_.getHandlerCount();}/*** @brief 获取已处理的消息数量* @return 已处理消息数量*/size_t getProcessedCount() const {return processed_count_;}
};/*** @brief 接收消息的线程函数* 模拟从外部源接收消息并放入消息队列*/
void MessageBus::receiveMessages() {std::cout << "Receiver thread started" << std::endl;try {size_t msg_counter = 0;while (is_running_) {// 模拟接收消息的延迟std::this_thread::sleep_for(std::chrono::milliseconds(100));// 交替创建不同类型的消息,模拟接收不同类型的消息if (msg_counter % 3 == 0) {// 创建字符串类型消息auto msg = std::make_shared<TypedMessage<std::string>>("Message " + std::to_string(msg_counter) + " - string type");sendMessage(msg);std::cout << "Received string message: " << msg_counter << std::endl;} else if (msg_counter % 3 == 1) {// 创建整数类型消息auto msg = std::make_shared<TypedMessage<int>>(static_cast<int>(msg_counter));sendMessage(msg);std::cout << "Received int message: " << msg_counter << std::endl;} else {// 创建浮点数类型消息auto msg = std::make_shared<TypedMessage<double>>(msg_counter * 0.123);sendMessage(msg);std::cout << "Received double message: " << msg_counter << std::endl;}msg_counter++;}} catch (const std::exception& e) {std::cerr << "Error in receiver thread: " << e.what() << std::endl;}std::cout << "Receiver thread stopped" << std::endl;
}/*** @brief 处理消息的线程函数* 从消息队列中取出消息并分发给对应的处理器*/
void MessageBus::processMessages() {std::cout << "Processor thread started" << std::endl;try {while (is_running_) {// 从队列中获取消息,超时时间设置为100msauto msg = message_queue_.dequeue(100);if (!msg) {continue;}// 查找对应的消息处理器auto handler = registry_.findHandler(*msg);if (handler) {std::cout << "\nProcessing message: " << msg->toString() << std::endl;handler->handle(msg);processed_count_++;} else {std::cerr << "No handler found for message type: " << msg->getType().name() << std::endl;}}} catch (const std::exception& e) {std::cerr << "Error in processor thread: " << e.what() << std::endl;}std::cout << "Processor thread stopped" << std::endl;
}// 使用示例
int main() {try {// 创建消息总线实例MessageBus bus;// 注册字符串类型消息的处理器bus.registerHandler<TypedMessage<std::string>>([](std::shared_ptr<TypedMessage<std::string>> msg) {std::cout << "String handler - Data: " << msg->getData() << std::endl;});// 注册整数类型消息的处理器bus.registerHandler<TypedMessage<int>>([](std::shared_ptr<TypedMessage<int>> msg) {std::cout << "Int handler - Data: " << msg->getData() << ", Squared: " << msg->getData() * msg->getData() << std::endl;});// 注册双精度浮点数类型消息的处理器bus.registerHandler<TypedMessage<double>>([](std::shared_ptr<TypedMessage<double>> msg) {std::cout << "Double handler - Data: " << msg->getData() << ", Doubled: " << msg->getData() * 2 << std::endl;});// 启动消息总线if (!bus.start()) {return 1;}// 让系统运行5秒钟std::this_thread::sleep_for(std::chrono::seconds(5));// 停止消息总线bus.stop();} catch (const std::exception& e) {std::cerr << "Fatal error in main: " << e.what() << std::endl;return 1;}return 0;
}
工作流程
- 初始化阶段
- 创建
MessageBus
实例 - 通过
registerHandler()
注册各类消息对应的处理函数(如字符串消息处理器、整数消息处理器等)
- 创建
- 运行阶段
- 启动消息总线后,接收线程开始模拟接收外部消息(实际应用中可替换为真实消息源,如网络、文件等)
- 接收线程将消息通过
sendMessage()
放入MessageQueue
缓存 - 处理线程从队列中取出消息,通过
MessageRegistry
查找对应的处理器 - 处理器执行具体的消息处理逻辑
- 停止阶段
- 调用
stop()
后,系统安全终止线程,输出统计信息(处理消息总数、剩余消息数等)
- 调用
这个系统适合作为需要异步处理多种消息类型的应用基础框架,例如网络服务器、日志处理系统、事件驱动型应用等场景。通过分离消息的接收与处理,既能保证输入响应速度,又能灵活扩展消息处理逻辑。