高性能分布式消息队列系统(三)
七、服务端模块
7.1、交换机数据管理
交换机的数据管理部分分为三个模块进行实现,分别是数据结构层(交换机数据类)、数据库操作层(持久化类)、业务逻辑层(交换机管理类)
7.1.1、数据结构层(定义交换机数据类)
这个类只需要进行承载数据,不需要进行保存数据,也不需要进行业务层面的操作
a.交换机名称
b.交换机类型 定义到proto 文件中的
c.是否持久化标志
d.是否自动删除标志
e.其他参数
//定义交换机类型struct Exchange{using ptr = std::shared_ptr<Exchange>;//1、交换机名称std::string name;//2、交换机类型ys::ExchangeType type; //3、交换机持久化标志bool durable;//4、是否进行自动删除bool auto_delete;//5、其他参数std::unordered_map<std::string,std::string> args;Exchange(){}Exchange(const std::string& ename,ys::ExchangeType etype,bool edurable,bool eauto_delete,std::unordered_map<std::string,std::string>& eargs):name(ename),type(etype),durable(edurable),auto_delete(eauto_delete),args(eargs){}//args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=val&key=val....//内部进行解析str_args字符串,将内容进行存储到成员中 --- 反序列化形成键值对void setArgs(const std::string& str_args){std::vector<std::string> sub_str;brush::StrHelper::split(str_args,"&",sub_str);for(auto& str:sub_str){size_t pos=str.find("=");const std::string& key=str.substr(0,pos);const std::string& value=str.substr(pos+1);args[key]=value;}}//将args的内容进行序列化,然后进行返回一个字符串 --- 键值对进行序列化std::string getArgs(){std::string ret;for(auto i=args.begin();i!=args.end();i++){ret+=i->first;ret+="=";ret+=i->second;ret+="&";}//ret.pop_back();return ret;}};
这个类中为什么会存在序列化的函数和反序列化的函数?
之所以进行序列化和反序列化,是因为后续涉及到数据的持久化存储,也就是需要将数据进行存储到数据库中,将数据进行存储到数据库中需要通过键值对的方式,这也就是序列化和反序列化的原因
为什么选择将序列化和反序列化进行放到交换机的数据结构类中呢?
按照合理的逻辑来进行看待的化,进行序列化和反序列化应该是放到持久化管理类中反而应该是更合适,是的这种想法是相当正确的,但是放到ExchangeMapper中,要是使用序列化的函数还需要将Exchange对象进行传入到序列化函数中,因为需要进行获取该对象中的置于哈希表中的其他属性,反之要进行使用序列化时需要从数据库中进行读取数据然后在通过传入的对象进行转化,这样相对于放到Exchange中是麻烦的。
7.1.2、数据库操作层(定义交换机数据持久化类)
通过数据库对磁盘进行管理,将数据进行放入磁盘中,既然是要将交换机有关的数据进行通过数据库进行持久化存储,首先就需要有一张数据库表进行记录交换机的数据,然后就是对交换机数据进行增删查的操作,也就是总结成下面这几种操作。
a.创建/删除交换机数据表
b.新增交换机数据
c.移除交换机数据
d.查询所有交换机数据 返回的是一个智能指针的对象
e.查询指定交换机数据(根据名称) 需要进行类型统一
//定义交换机数据持久化管理类----数据存储在sqlite库中class ExchangeMapper{public:ExchangeMapper(const std::string &dbfile):_sql_helper(dbfile){std::string parent_path=brush::FileHelper::parentDirectory(dbfile);brush::FileHelper::createDirectory(parent_path);assert(_sql_helper.open());createTable();}//创建数据库交换机表void createTable(){#define CREATE_TABLE "create table if not exists exchange_table\(name varchar(32) primary key,\type int,\durable int,\auto_delete int,\args varchar(128));"bool ret=_sql_helper.exec(CREATE_TABLE,nullptr,nullptr);if(ret==false){ELOG("创建交换机数据库表失败");abort(); //直接进行退出异常程序}}//删除数据库交换机表void removeTable(){#define DROP_TABLE "drop table if exists exchange_table"bool ret=_sql_helper.exec(DROP_TABLE,nullptr,nullptr);if(ret==false){ELOG("删除交换机数据库表失败");abort(); //直接进行退出异常程序} }//将一个Exchange对象进行插入到数据库的表中bool insert(Exchange::ptr& exchange){std::stringstream ss;ss<<"insert into exchange_table values (";ss<<"'"<<exchange->name<<"',";ss << exchange->type << ", ";ss << exchange->durable << ", ";ss << exchange->auto_delete << ", ";ss << "'" << exchange->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);}//通过名称进行删除某一个条例void remove(const std::string& name){std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}//从数据库中进行恢复所有的Exchange数据到内存结构中using ExchangeMap=std::unordered_map<std::string,Exchange::ptr>;ExchangeMap recovery(){ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields) {ExchangeMap *result = (ExchangeMap*)arg;auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (ys::ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);if (row[4]) exp->setArgs(row[4]);result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};
构造函数的设计
通过函数参数(想要进行创建数据库的文件位置)进行构建数据库对象,紧接着进行创建数据库文件的目录,然后进行创建打开数据库文件,打开数据库文件成功后进行创建数据库表,至此完成构造函数的模块。
创建/删除数据库表函数以及删除数据表中的某一条语句
这些函数其实本质就是执行数据库的操作语句了,唯一需要进行注意的就是sqlite的语句不要出现错误即可。
在数据库表中进行插入数据函数
将Exchange对象中的一些信息进行插入到sqlite 数据表中进行持久化存储,但是Exchange 对象中有一些其他参数是通过哈希表进行定义的,这就需要将哈希表的数据进行转化成字符串进行存储到sqlite数据库中。
恢复数据库中的所有数据到内存
从数据库中恢复所有的交换机数据、构建并返回内存中的哈希表、实现交换机模块的冷启动数据同步,换句话说:它是数据库 ↔ 内存 数据同步的桥梁,确保系统重启后,所有持久化的交换机信息都能被完整加载到内存缓存中。
将数据库中的数据进行恢复需要一个容器进行存储恢复的数据,这里之所以选择unordered_map而不是使用vector是因为下面内存结构的存储也是使用的unordered_map,之后进行执行的时候直接可以赋值,就不用在进行转换了。
7.1.3、业务逻辑层(定义交换机教据管理类)
a.声明交换机,并添加管理(存在则OK,不存在则创建)
b.删除交换机
c.获取指定交换机
d.销毁所有交换机数据
//定义交换机内存管理类class ExchageManager{public:using ptr=std::shared_ptr<ExchageManager>;ExchageManager(const std::string &dbfile):_mapper(dbfile){//通过数据库文件加载缓存_exchange=_mapper.recovery();}//声明交换机void declareExchage(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,std::unordered_map<std::string,std::string>& args){//构建交换机的对象auto exp=std::make_shared<Exchange>(name,type,durable,auto_delete,args);//查看交换机是否声明//-保证线程安全进行加锁std::unique_lock<std::mutex> lock(_mutex);auto it=_exchange.find(name);if(it!=_exchange.end()){return;}if(durable==true){_mapper.insert(exp);}_exchange.insert(std::make_pair(name,exp));}//删除交换机void deleteExchange(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it=_exchange.find(name);if(it==_exchange.end()){return;}if(it->second->durable == true){_mapper.removeTable();}_exchange.erase(name);}//获取指定的交换机对象Exchange::ptr selectExchange(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it=_exchange.find(name);if(it==_exchange.end()){return nullptr;}return it->second;}//查看指定的对象是否存在bool exists(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it=_exchange.find(name);if(it==_exchange.end()){return false;}return true;}//清理所有的交换机数据void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchange.clear();}//内存缓存表中的数量size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchange.size();}private:std::mutex _mutex;ExchangeMapper _mapper; //操作数据库的组件std::unordered_map<std::string,Exchange::ptr> _exchange; //运行时的缓存};
7.2、队列数据管理
7.2.1、队列管理的信息
定义队列的属性信息,包括队列名称、是否持久化、是否被唯一占有、是否自动删除和可扩展的附加属性(灵活适配业务),并且提供对args的序列化和反序列化
getArgs() → 序列化为 SQL 可存储的字符串格式(k1=v1&k2=v2&...)
setArgs() → 从数据库读取的字符串反序列化为可用的 Map
struct MsgQueue{using mqp=std::shared_ptr<MsgQueue>;std::string name;bool durable;bool exclusive;bool auto_delete;google::protobuf::Map<std::string,std::string> args;MsgQueue(){}MsgQueue(const std::string qname,bool qdurable,bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string,std::string>& qargs):name(qname),durable(qdurable),exclusive(qexclusive),auto_delete(qauto_delete),args(qargs){}//将数据库中的数据进行反序列void setArgs(const std::string& sql_str){//k1=v1&k2=v2&std::vector<std::string> ret;brush::StrHelper::split(sql_str,"&",ret);for(auto& sub_str:ret){size_t pos=sub_str.find("=");std::string key=sub_str.substr(0,pos);std::string value=sub_str.substr(pos+1);args[key]=value;}}//将容器中的键值对进行序列化std::string getArgs(){std::string ret;for(auto it=args.begin();it!=args.end();it++){ret+=it->first;ret+="=";ret+=it->second;ret+="&";}return ret; }};using QueueMap=std::unordered_map<std::string,MsgQueue::mqp>;
7.2.2、队列的持久化操作
MsgQueueMapper 专注于数据库的交互工作,负责将内存中的 MsgQueue 对象映射到 SQLite 表的行,实现持久化存储;反过来,它也能从数据库中加载队列配置,恢复内存数据结构。这一层让持久化操作对上层模块完全透明,保障数据可靠性并实现持久化与内存状态的一致性。
class MsgQueueMapper{public:MsgQueueMapper(const std::string& dpfile):_sql(dpfile){//进行创建数据库的目录std::string path=brush::FileHelper::parentDirectory(dpfile);brush::FileHelper::createDirectory(path);//进行打开数据库文件assert(_sql.open());//创建数据库表create_table();}void create_table(){#define QUEUE_CREATE_TABLE "create table if not exists queue_table(\name varchar(32) primary key,\durable int,\exclusive int,\auto_delete int,\args varchar(128));"int ret=_sql.exec(QUEUE_CREATE_TABLE,nullptr,nullptr);if(ret==false){ELOG("创建交换机表失败");abort();}}void remove_table(){#define QUEUE_DROP_TABLE "drop table if exists queue_table"int ret=_sql.exec(QUEUE_DROP_TABLE,nullptr,nullptr);if(ret==false){ELOG("删除交换机表失败");abort();}}bool insert(MsgQueue::mqp& ptr){std::stringstream ss;ss<<"insert into queue_table values(";ss<<"'"<<ptr->name<<"',";ss<<ptr->durable<<",";ss<<ptr->exclusive<<",";ss<<ptr->auto_delete<<",";ss<<"'"<<ptr->getArgs()<<"');";return _sql.exec(ss.str(),nullptr,nullptr);}void remove(const std::string& name){std::stringstream ss;ss<<"delete from queue_table where name=";ss<<"'"<<name<<"';";_sql.exec(ss.str(),nullptr,nullptr);}QueueMap recovery(){QueueMap result;std::string str="select name,durable,exclusive,auto_delete,args from queue_table;";_sql.exec(str.c_str(),selectCallback,&result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields) {QueueMap* result=(QueueMap*)arg;auto queue=std::make_shared<MsgQueue>();queue->name=row[0];queue->durable=(bool)atoi(row[1]);queue->exclusive=(bool)atoi(row[2]);queue->auto_delete=(bool)atoi(row[3]);if(row[4]){queue->setArgs(row[4]);}result->insert(std::make_pair(queue->name,queue));return 0;}private:SqliteHelper _sql;};
7.2.3、对队列的管理
MsgQueueManager是对外提供队列管理能力的核心模块,具备线程安全的队列生命周期管理能力。它既支持内存中高效的队列操作,也根据队列的持久化属性决定是否落盘保存,统一封装对外接口,使得队列管理既安全又便捷,同时为上层模块或业务提供一致性和高可用性保障。
class MsgQueueManager{public:using ptr=std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string& dbfile):_mapper(dbfile){_msg_queue=_mapper.recovery();}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string,std::string> qargs){std::unique_lock<std::mutex> lock(_mutex);auto pos=_msg_queue.find(qname);if(pos!=_msg_queue.end()){return true;}MsgQueue::mqp ptr=std::make_shared<MsgQueue>();ptr->name=qname;ptr->durable=qdurable;ptr->exclusive=qexclusive;ptr->auto_delete=qauto_delete;ptr->args=qargs;if(ptr->durable==true){int ret=_mapper.insert(ptr);if(ret==false){return false;}}_msg_queue.insert(std::make_pair(ptr->name,ptr));return true;}void deleteQueue(const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);auto pos=_msg_queue.find(qname);if(pos==_msg_queue.end()){return;}_msg_queue.erase(qname);if(pos->second->durable==true){_mapper.remove(qname);}}MsgQueue::mqp selectQueue(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queue.find(name);if(it==_msg_queue.end()){return MsgQueue::mqp();}return it->second;}bool exists(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queue.find(name);if(it==_msg_queue.end()){return false;}return true;}QueueMap allQueues(){return _msg_queue;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _msg_queue.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.remove_table();_msg_queue.clear();}private:std::mutex _mutex;MsgQueueMapper _mapper;QueueMap _msg_queue;};
7.3、绑定数据管理
这个模块主要用于管理消息中间件中“交换机(exchange)”与“队列(queue)”之间的绑定关系。
它维护内存中的绑定关系表,并将持久化配置存储到 SQLite 数据库中,以便服务器重启时能恢复状态。哪怕服务器宕机重启,绑定信息也会从数据库恢复,保证消息系统仍能正常工作。
7.3.1、绑定管理的信息
struct Binding{using bp=std::shared_ptr<Binding>;std::string exchange_name;std::string queue_name;std::string binding_key;Binding(){}Binding(const std::string& ename,const std::string& qname,const std::string& key):exchange_name(ename),queue_name(qname),binding_key(key){}};
7.3.2、绑定的持久化操作
持久化操作模块这个类专注于与 SQLite 的交互,提供绑定关系在数据库中的存储、删除、恢复等操作。
主要功能
- 数据库表的创建与删除
- 绑定关系的持久化 (insert)
- 删除操作(可按 exchange / queue / exchange+queue)
- 恢复所有绑定(recovery)
using MsgQueueBindingMap=std::unordered_map<std::string,Binding::bp>;using BindingMap=std::unordered_map<std::string,MsgQueueBindingMap>;class BindingMapper{public://std::unordered_map<std::string,Binding::bp>; 交换机和绑定之间的映射//std::unordered_map<std::string,Binding::bp>; 队列和绑定之间的映射BindingMapper(const std::string& dbfile):_sql(dbfile){const std::string&path=FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql.open());create_table();}void create_table(){#define BINDING_CREATE_TABLE "create table if not exists binding_table (\exchange_name varchar(32),\queue_name varchar(32),\binding_key varchar(128));"_sql.exec(BINDING_CREATE_TABLE,nullptr,nullptr);}void remove_table(){#define BINDING_DROP_TABLE "drop table if exists binding_table"_sql.exec(BINDING_DROP_TABLE,nullptr,nullptr);}bool insert(Binding::bp& ptr){std::stringstream ss;ss<<"insert into binding_table values (";ss<<"'"<<ptr->exchange_name<<"',";ss<<"'"<<ptr->queue_name<<"',";ss<<"'"<<ptr->binding_key<<"');";_sql.exec(ss.str(),nullptr,nullptr);return true;}void remove(const std::string& ename,const std::string& qname){std::stringstream ss;ss<<"delete from binding_table where ";ss<<"exchange_name = '"<<ename<<"'";ss<<" and ";ss<<"queue_name = '"<<qname<<"';";_sql.exec(ss.str(),nullptr,nullptr);}void removeExchangeBinding(const std::string& ename){std::stringstream ss;ss<<"delete from binding_table where ";ss<<"exchange_name = '"<<ename<<"';";_sql.exec(ss.str(),nullptr,nullptr);}void removeQueueBinding(const std::string& qname){std::stringstream ss;ss<<"delete from binding_table where ";ss<<"queue_name = '"<<qname<<"';";_sql.exec(ss.str(),nullptr,nullptr);}BindingMap recovery(){BindingMap result;std::stringstream ss;ss<<"select exchange_name,queue_name,binding_key from binding_table;";_sql.exec(ss.str(),selectCallback,&result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields) {BindingMap* result=(BindingMap*)arg;auto bp=std::make_shared<Binding>();bp->exchange_name=row[0];bp->queue_name=row[1];bp->binding_key=row[2];MsgQueueBindingMap &qmap = (*result)[bp->exchange_name]; //必须使用引用,否则进行插入是无效的qmap.insert(std::make_pair(bp->queue_name, bp));return 0;}private:SqliteHelper _sql; };
recovery函数的整体流程
1、创建空的内存 BindingMap
2、执行 select 语句查询所有绑定
3、SQLite 遍历结果,回调 selectCallback
- 为每行结果创建 Binding 对象
- 把它插入到 BindingMap(按交换机 -> 队列组织)
4、查询完毕,返回完整 BindingMap
recovery函数的作用
通过查询数据库,把之前持久化到磁盘的绑定配置全部恢复到内存。
保证即使服务器重启,之前的绑定信息也不会丢失,保持消息队列系统一致性。
7.3.3、对绑定的管理
这部分是运行期绑定管理模块,管理内存中的绑定信息 (_bindings),调用 BindingMapper 完成持久化操作,使用 std::mutex 确保多线程安全。
并且是外部 API 的主要接口,向上提供:
- 绑定操作(bind)
- 解除绑定(unbind、removeExchangeBinding、removeQueueBinding)
- 查询(getExchangeBindings、getBinding、exists、size)
- 清理所有绑定(clear)
class BindingManager{public:using ptr=std::shared_ptr<BindingManager>;BindingManager(const std::string& dbfile):_mapper(dbfile){_bindings=_mapper.recovery();}//声明绑定bool bind(const std::string& ename,const std::string& qname,const std::string& key,bool durable){std::unique_lock<std::mutex> lock(_mutex);auto pos=_bindings.find(ename);if(pos!=_bindings.end()&&pos->second.find(qname)!=pos->second.end()){return true;}auto object=std::make_shared<Binding>(ename,qname,key);MsgQueueBindingMap& qmap=_bindings[ename];qmap.insert(std::make_pair(qname,object));if(durable){int ret=_mapper.insert(object);if(ret==false){return false;}}return true;}//解除绑定--解除指定交换机的对应的指定队列的绑定void unbind(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);auto epos=_bindings.find(ename);if(epos==_bindings.end()){return;}auto qpos=epos->second.find(qname);if(qpos==epos->second.end()){return;}_bindings[ename].erase(qname);_mapper.remove(ename,qname);}//删除指定交换机的绑定信息void removeExchangeBinding(const std::string& ename){std::unique_lock<std::mutex> lock(_mutex);auto epos=_bindings.find(ename);if(epos==_bindings.end()){return;}_bindings.erase(ename);_mapper.removeExchangeBinding(ename);}//删除指定队列的绑定信息void removeQueueBinding(const std::string& qname){//遍历每一个交换机进行查找是否有指定的队列绑定std::unique_lock<std::mutex> lock(_mutex);_mapper.removeQueueBinding(qname); //errfor(auto it=_bindings.begin();it!=_bindings.end();it++){if(it->second.find(qname)!=it->second.end()){it->second.erase(qname);}}}//获取交换机的所有绑定信息MsgQueueBindingMap getExchangeBindings(const std::string& ename){std::unique_lock<std::mutex> lock(_mutex);auto eit=_bindings.find(ename);if(eit==_bindings.end()){return MsgQueueBindingMap();}return eit->second;}//获取指定的绑定Binding::bp getBinding(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);auto eit=_bindings.find(ename);if(eit==_bindings.end()){return Binding::bp();}auto qit=eit->second.find(qname);if(qit==eit->second.end()){return Binding::bp();}return qit->second;}//查看指定的绑定是否存在bool exists(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);auto eit=_bindings.find(ename);if(eit==_bindings.end()){return false;}auto qit=eit->second.find(qname);if(qit==eit->second.end()){return false;}return true;}//查看所有绑定size_t size(){size_t count=0;std::unique_lock<std::mutex> lock(_mutex);for(auto eit=_bindings.begin();eit!=_bindings.end();eit++){count+=eit->second.size();}return count;}//清理所有的绑定void clear(){std::unique_lock<std::mutex> lock(_mutex);_bindings.clear();_mapper.remove_table();}private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;};
7.4、消息管理
7.4.1、消息的要素
消息从客户端进行发送到服务端,中间需要进行网络传输,所以就需要涉及到消息的网络传输消息的传输要素和消息在服务器上进行管理所需要的额外因素。
消息的存储是没有进行使用数据库进行存储的,而是直接在文件中进行存储,因为每条消息的长度都是不一致的,不适合进行存储到数据库中。
网络传输的消息要素
- 消息的属性:消息的ID、routing_key、消息的投递方式(是否要进行持久化存储)
- 消息的内容
服务器上对消息进行管理所需要的额外要素
- 消息的有效标志
当消息进行删除时,正常的逻辑是需要将被删除的这条消息的后面的消息全部进行向前进行覆盖式吧写入,但是这种处理方式的效率是非常差的,这里采取的策略就是当消息被删除,直接将该消息的有效标志变成false,表示这条消息所占用的文件中的位置是无效的,显著提升效率
- 消息的实际存储位置(相对于文件的偏移量)
要想进行删除这条消息首先就是要进行先找到这条消息,这就需要这条消息相对于文件的偏移量。
- 消息的长度
当进行恢复历史消息或者进行读取消息的时候,解决粘包问题
syntax="proto3";
package ys;enum ExchangeType
{UNKNOWTYPE=0;DIRECT=1;FANOUT=2;TOPIC=3;
};enum DeliveryMode
{UNKONWMODE=0;UNDURABLE=1;DURABLE=2;
};message BasicProperties
{string id=1;DeliveryMode delivery_mode=2;string routing_key=3;
};message Message
{message Payload {BasicProperties properties = 1;string body = 2;string valid = 3;};Payload payload = 1;uint32 offset = 2;uint32 length = 3;
};
7.4.2、消息的持久化管理
管理的数据
- 队列名
- 根据队列名生成数据文件的名称:队列名.mqd
- 根据队列名生成的临时文件名称:队列名.mqd.tmp
向外提供的操作
- 消息的创建与删除
- 消息的新增持久化/删除消息的持久化
- 历史消息的恢复/垃圾回收
- 什么情况下进行垃圾回收
- 进行垃圾会受到思想
什么时候进行垃圾回收?
每次进行删除数据都不是真正的删除,而是将有效标记位置为false,因此文件数据中会有越来越多的无效数据,当文件中的有效数据超过2000,且其中有效比例低于50%进行触发垃圾回收机制。
垃圾回收的思想
加载文件中所有有效消息(内存缓冲区),删除源文件,生成新的数据文件,将数据写入(存在风险)
加载文件中所有有效消息,先写入到一个临时文件中,然后再去删除源文件,将临时文件名称改为源文件名称
返回所有的有效消息,(每条消息中都记录当前的新的存储位置---用于更新内存中数据的内容)持久化数据管理的思想
- 以队列为单元进行消息的持久化管理
如果所有的队列共享同一个文件,当消息文件进行触发垃圾回收机制时,需要进行加载文件中的所有有效消息,重新进行生成新的数据文件,有效数据在文件中的存储位置就发生了变化,需要将内存中的消息进行更新,这时候就需要将所有队列进行加锁,锁冲突严重,效率低,因此,以队列为单元进行消息的持久化管理,每个队列都有自己的独立数据文件,每次只需要对操纵的队列进行加锁即可。
- 消息既然是存储在文件中因此需要特定的格式
using MsgPtr = std::shared_ptr<ys::Message>;class MessageMapper{public:MessageMapper(std::string &basedir, const std::string &qname): _qname(qname){if (basedir.back() != '/'){basedir.push_back('/');}_datafile = basedir + qname + DATAFILE_SUBFIX;_tempfile = basedir + qname + TMPFILE_SUBFIX;if (FileHelper(basedir).exists() == false){assert(FileHelper::createDirectory(basedir));}createMsgFile();}bool createMsgFile(){if (FileHelper(_datafile).exists() == true){return true;}bool ret = FileHelper::createFile(_datafile);if (ret == false){ELOG("创建队列数据文件失败 %s 失败", _datafile.c_str());return false;}return true;}void removeMsgFile(){FileHelper::removeFile(_datafile);FileHelper::removeFile(_tempfile);}bool insert(MsgPtr &msg) // 必须进行使用引用{return insert(_datafile, msg);}bool remove(const MsgPtr &msg){// 1、将msg中的有效标志位进行修改成0msg->mutable_payload()->set_valid("0");// 2、将消息进行序列化std::string body = msg->payload().SerializeAsString();// 3、将消息进行写入磁盘,覆盖原来的消息if (body.size() != msg->length()){ELOG("新生成的数据和原来的数据长度不一致");return false;}FileHelper file(_datafile);bool ret = file.write(body.c_str(), msg->offset(), msg->length());if (ret == false){ELOG("进行覆盖写入数据失败");return false;}return true;}std::list<MsgPtr> gc() // Garbage Collection{// 1、加载出文件中的所有数据// 存储格式 4字节长度|数据|4字节长度|数据|4字节长度|数据std::list<MsgPtr> result;bool ret = load(result);if (ret == false){ELOG("加载历史有效数据失败");return result;}//DLOG("垃圾回收得到的有效消息数量:%d", result.size());// 2、将有效数据进行序列化存储到临时文件中FileHelper::createFile(_tempfile);for (auto &msg : result){ret = insert(_tempfile, msg);if (ret == false){ELOG("向临时文件进行写入数据失败");return result;}}//DLOG("垃圾回收后,向临时文件中写入数据完毕,临时文件的大小:%ld", FileHelper(_tempfile).size());// 3、删除源文件ret = FileHelper::removeFile(_datafile);if (ret == false){ELOG("删除源文件失败");return result;}// 4、将临时文件进行修改成源文件名称ret = FileHelper(_tempfile).rename(_datafile);if (ret == false){ELOG("修改临时文件名失败");return result;}// 5、返回新的有效路径return result;}private:bool load(std::list<MsgPtr> &result){FileHelper file(_datafile);size_t offset = 0;size_t msg_size = 0;size_t fsize = file.size();while (offset < fsize){// 读数据的长度bool ret = file.read((char *)&msg_size, offset, sizeof(size_t));if (ret == false){ELOG("读取消息的长度失败");return false;}offset += sizeof(size_t);std::string body(msg_size, '\0');ret = file.read(&body[0], offset, msg_size);if (ret == false){ELOG("读取消息的内容失败");return false;}offset += msg_size;MsgPtr msg = std::make_shared<ys::Message>();msg->mutable_payload()->ParseFromString(body);// 验证消息的有效性if (msg->payload().valid() == "0"){DLOG("加载到无效消息:%s", msg->payload().body().c_str());continue;}result.push_back(msg);}return true;}bool insert(const std::string &filename, MsgPtr &msg) // 必须进行使用引用{// 新增数据都是进行添加到文件的末尾// 1、进行消息序列化std::string body = msg->payload().SerializeAsString();// 2、获取文件长度FileHelper file(filename);size_t fsize = file.size();size_t msg_size = body.size();// 3、将数据进行写入文件的指定位置bool ret = file.write((char *)&msg_size, fsize, sizeof(size_t));if (ret == false){DLOG("向队列数据文件写入数据长度失败!");return false;}ret = file.write(body.c_str(), fsize+sizeof(size_t), body.size());if (ret == false){ELOG("向队列数据文件中写入失败");return false;}// 4、更新msg中的实际存储位置msg->set_offset(fsize + sizeof(size_t));msg->set_length(body.size());return true;}private:std::string _qname;std::string _datafile;std::string _tempfile;};
7.4.3、消息的数据管理
内存中维护每个队列的消息队列。负责单个队列的消息的入队、出队、消费确认(ACK/NACK)、偏移量等管理。作用:为每个逻辑队列,提供独立的消息缓存和管理,
向外提供的操作
- 创建/打开数据文件(恢复历史消息的数据)
- 新增消息/删除消息(订阅客户端进行确认消息后)
- 获取队列的队首消息(方便进行向订阅客户端进行推送)
- 删除队列的所有消息
- 获取待推送的消息数量
- 获取待确认的的消息数量
- 获取持久化的消息数量
进行管理的数据
- 持久化管理句柄
- 待推送的消息链表
- 持久化消息的hashmap
- 确认消息的hashmap
- 持久化文件中的有效消息数量
说明:这里选择使用链表对消息数据进行管理(以头插尾删实现队列的功能),而不是使用队列的原因是:过期消息清理或逻辑删除消息(通过标志位)可能涉及进行删除中间部分的消息,list 相对于queue 效率高的多。
持久化消息和确认消息都采用哈希表的原因是:当进行查询某条消息时,是O(1)的复杂度的效率进行执行的。
在使用智能指针(如 std::shared_ptr)管理消息对象时,不同的数据结构(如链表、哈希表等)可以共享同一个消息对象的实例。所有结构中持有的智能指针都指向同一块内存,因此一处对消息对象内容的修改,其他持有该智能指针的结构也会观察到相同的变化。这种方式避免了对象复制,提高了内存利用率和一致性。
class QueueMessage{public:using QMPtr = std::shared_ptr<QueueMessage>;QueueMessage(std::string &basedir, const std::string &qname): _qname(qname), _mapper(basedir, qname), _valid_count(0), _total_count(0){}void recovery(){// 恢复历史消息(服务器刚启动的时候需要进行将磁盘中的消息进行加载到内存)std::unique_lock<std::mutex> lock(_mutex);_msgs = _mapper.gc();for (auto &msg : _msgs){_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}_valid_count = _total_count = _msgs.size();}bool insert(const ys::BasicProperties *bp, const std::string &body, bool queue_is_durable){//消息是否持久化首先由队列进行决定,然后再由消息本身进行决定std::unique_lock<std::mutex> lock(_mutex);// 1、构造消息对象auto msg = std::make_shared<ys::Message>(); // 这个对象是序列化的,需要进行手动进行向对象中进行填充数据msg->mutable_payload()->set_body(body);if (bp != nullptr){ys::DeliveryMode mode=queue_is_durable==true?bp->delivery_mode():ys::DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());}else{ys::DeliveryMode mode=queue_is_durable==true?ys::DeliveryMode::DURABLE:ys::DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}// 2、判断消息是否需要进行持久化if (msg->mutable_payload()->properties().delivery_mode() == ys::DeliveryMode::DURABLE){msg->mutable_payload()->set_valid("1");// 3、进行消息的持久化存储bool ret = _mapper.insert(msg);if (ret == false){ELOG("消息进行持久化失败: %s 失败了", body.c_str());return false;}_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));_valid_count++;_total_count++;}// 4、进行放入内存中//_waitack_msg.insert(std::make_pair(msg->payload().properties().id(),msg));_msgs.push_back(msg);return true;}MsgPtr front(){std::unique_lock<std::mutex> lock(_mutex);if (_msgs.size() == 0){return MsgPtr();}// 1、从队列中进行获取一个对象MsgPtr msg = _msgs.front();_msgs.pop_front();// 2、将这个消息对象进行放到等待推送的队列中_waitack_msg.insert(std::make_pair(msg->payload().properties().id(), msg));return msg;}bool remove(const std::string &msg_id){std::unique_lock<std::mutex> lock(_mutex);// 1、看待确认的队列中是否存在auto wpos = _waitack_msg.find(msg_id);if (wpos == _waitack_msg.end()){DLOG("没有找到要删除的消息 %s", msg_id);return true;}// 2、看这个消息是否进行持久化存储了auto dpos = _durable_msgs.find(msg_id);if (dpos != _durable_msgs.end()){_mapper.remove(dpos->second);_durable_msgs.erase(msg_id);_valid_count--;}// 3、从内存中进行删除_waitack_msg.erase(msg_id);gc();return true;}size_t getable_count() // 待推送的消息数量{std::unique_lock<std::mutex> lock(_mutex);return _msgs.size();}size_t durable_count(){std::unique_lock<std::mutex> lock(_mutex);return _valid_count;}size_t waitack_count(){std::unique_lock<std::mutex> lock(_mutex);return _waitack_msg.size();}size_t total_count(){std::unique_lock<std::mutex> lock(_mutex);return _total_count;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgFile();_msgs.clear();_durable_msgs.clear();_waitack_msg.clear();_valid_count = 0;_total_count = 0;}private:bool GcCheck(){if (_total_count >= 2000 && _valid_count * 10 / _total_count < 5){return true;}return false;}// 垃圾回收机制void gc(){// 1、进行垃圾回收获取有效的消息列表if (GcCheck() == false){return;}std::list<MsgPtr> msgs = _mapper.gc();for (auto &msg : msgs){auto pos = _durable_msgs.find(msg->payload().properties().id());if (pos == _durable_msgs.end()){DLOG("垃圾回收后有一条持久化消息在内存中没有进行管理");// 重新进行推动到链表末尾_msgs.push_back(msg);_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));continue;}// 2、更新每一条消息的实际存储位置pos->second->set_offset(msg->offset());pos->second->set_length(msg->length());}// 3、更新当前的有效消息数量&总的持久化消息数量_valid_count = _total_count = msgs.size();}private:std::mutex _mutex;std::string _qname;MessageMapper _mapper;std::list<MsgPtr> _msgs;std::unordered_map<std::string, MsgPtr> _durable_msgs;std::unordered_map<std::string, MsgPtr> _waitack_msg;size_t _valid_count;size_t _total_count;};
7.4.4、对外总的消息管理类
全局管理所有队列的消息管理器,防止重复创建,动态扩展。根据消息队列的管理将每个队列的管理都进行统筹起来,形成一个对外的总管理
管理的成员
- 互斥锁:既然是对外提供的操作,就需要进行考虑到多线程高并发的场景肯定需要锁的保护
- 每个队列的管理句柄:队列名称和队列管理句柄的哈希表
提供的操作
- 初始化消息队列的管理句柄:创建队列时调用
- 销毁队列的管理句柄:删除队列时调用
- 队列的各项消息操作
- 向队列进行新增消息
- 获取队首消息
- 对队列进行消息确认
- 恢复队列的历史消息
- 获取队列的消息数量
- 可获取的消息数量
- 待确认的消息数量
- 持久化的消息数量
- 总的持久化的消息数量
class MessageManager{public:using ptr = std::shared_ptr<MessageManager>;MessageManager(const std::string &basedir): _basedir(basedir){}void clear(){for (auto &qmsg : _queue_msgs){qmsg.second->clear();}}void initQueueMessage(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos != _queue_msgs.end()){return;}qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));}qmp->recovery();}void destoryQueueMessage(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){return;}qmp = pos->second;_queue_msgs.erase(pos);}qmp->clear();}bool insert(const std::string &qname, ys::BasicProperties *bp, const std::string &body, bool queue_is_durable){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("向队列中进行新增消息失败,没有找到该消息的管理句柄:%s", qname.c_str());return true;}qmp = pos->second;}qmp->insert(bp, body, queue_is_durable);return true;}MsgPtr front(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取队首消息失败,没有找到该消息的管理句柄:%s", qname.c_str());return MsgPtr();}qmp = pos->second;}return qmp->front();}void ack(const std::string &qname, const std::string &msg_id){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取%s队列%s等待确认消息失败", qname.c_str(), msg_id.c_str());return;}qmp = pos->second;}qmp->remove(msg_id);}size_t getable_count(const std::string &qname) // 待推送的消息数量{QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取%s队首消息失败,没有找到管理句柄", qname.c_str());return 0;}qmp = pos->second;}return qmp->getable_count();}size_t durable_count(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取%s队列可持久化消息数量失败,没有找到对应的管理句柄", qname.c_str());return 0;}qmp = pos->second;}return qmp->durable_count();}size_t waitack_count(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取%s对垒等待消息数量失败,没有找到对应的管理句柄", qname.c_str());return 0;}qmp = pos->second;}return qmp->waitack_count();}size_t total_count(const std::string &qname){QueueMessage::QMPtr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos == _queue_msgs.end()){DLOG("获取%s队列总消息数量失败,没有找到对应的管理句柄", qname.c_str());return 0;}qmp = pos->second;}return qmp->total_count();}private:std::mutex _mutex;std::string _basedir;std::unordered_map<std::string, QueueMessage::QMPtr> _queue_msgs;};
初始化队列消息
初始化队列消息采用 延时初始化+持久化恢复的机制
如果队列 qname 的消息管理器还没有被初始化,就创建一个新的 QueueMessage 实例来管理它。并且,确保新创建的 QueueMessage 能够从持久化存储(例如磁盘)中恢复已经持久化的消息。
实现的难点
void initQueueMessage(const std::string &qname)
{std::unique_lock<std::mutex> lock(_mutex);auto pos = _queue_msgs.find(qname);if (pos != _queue_msgs.end()){return;}QueueMessage::QMPtr qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));
}
当时进行实现的时候是在构建出QueueMessage的时候就进行调用消息的历史恢复这就需要在构造函数的时候进行加锁,这就导致在对外的总的消息管理的时候非常难实现, 构建出QueueMessage对象的时候会出现锁重复,为了解决这个问题只能将initQueueMessage的锁进行加到构建对象之前,但是这又会出现一个别的问题,initQueueMessage函数的加锁是不完全的,容易出现数据不一致问题。
为了解决这个问题,只好把消息的历史恢复从构造函数中移出,在进行构建对象后单独进行调用,并且在initQueueMessage中进行限制锁的范围。
其他函数的实现也是大概统一的思路,就不再一一解释。
7.5、虚拟机管理
虚拟机管理模块进行实现的管理就是对交换机、消息队列、绑定、队列消息这几个模块的管理进行整合,对外进行提供操作,要想进行对交换机、消息队列、绑定、队列消息这几个模块进行操作直接通过虚拟机进行即可。
虚拟机包含的成员
- 交换机管理句柄
- 消息队列管理句柄
- 绑定管理句柄
- 队列消息管理句柄
虚拟机包含的操作
- 声明交换机(存在则OK,不存在则进行创建)
- 删除交换机 (删除交换机需要进行同时进行删除关联的绑定信息)
- 声明队列(存在则OK,不存在则进行创建,创建同时创建队列关联的消息对象)
- 删除队列(删除队列需要进行同时进行删除关联的绑定信息,删除关联消息管理对象及队列所有消息)
- 交换机→队列的绑定
- 交换机→队列的解绑
- 消息的新增
- 获取指定队列的队首消息
- 消息确认的删除功能
虚拟机管理的操作
增删查
该项目进行实现的只是一个虚拟机,未进行涉及这方面的操作,因此这个模块没有进行实现。
--------------------------------------------------------------------------------------------------------------------------------
在进行实现虚拟机进行包含的的操作时,需要进行好好考虑是否要进行实现相关关系的操作
例如在进行删除交换机的时候,还需要进行删除交换机的绑定、声明队列消息的时候需要先进行队列消息的初始化然后再进行声明队列消息、进行绑定的时候需要进行查看对应的交换机和队列是否存在、进行发布消息的时候也需要进行查看消息是否存在。
class VirtualHost{public:using ptr=std::shared_ptr<VirtualHost>;VirtualHost(const std::string& name,const std::string &basedir,const std::string &dbfile):_name(name),_emp(std::make_shared<ExchageManager>(dbfile)),_mqmp(std::make_shared<MsgQueueManager>(dbfile)),_bmp(std::make_shared<BindingManager>(dbfile)),_mmp(std::make_shared<MessageManager>(basedir)){QueueMap queues=_mqmp->allQueues();for(auto& queue:queues){_mmp->initQueueMessage(queue.first);}}bool declareExchange(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,const google::protobuf::Map<std::string,std::string>& args){return _emp->declareExchage(name,type,durable,auto_delete,args);}void deleteExchange(const std::string& name){//进行删除时,需要进行删除交换机的相关关系--和交换机进行绑定的消息队列的信息_bmp->removeExchangeBinding(name);return _emp->deleteExchange(name);}bool existsExchange(const std::string& ename){return _emp->exists(ename);}Exchange::ptr selectExchange(const std::string& ename){return _emp->selectExchange(ename);}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string,std::string> qargs){_mmp->initQueueMessage(qname);return _mqmp->declareQueue(qname,qdurable,qexclusive,qauto_delete,qargs);}void deleteQueue(const std::string& qname){//删除分为两步 1、删除队列的消息 2、删除队列的绑定消息 3、删除队列_mmp->destoryQueueMessage(qname);_bmp->removeQueueBinding(qname);_mqmp->deleteQueue(qname);}bool existsQueue(const std::string& qname){return _mqmp->exists(qname);}bool bind(const std::string& ename,const std::string& qname,const std::string& key){//不必进行提供持久化的参数,绑定的持久化由交换机和队列共同决定Exchange::ptr ep=_emp->selectExchange(ename);if(ep.get()==nullptr){DLOG("进行绑定的%s交换机不存在",ename.c_str());}MsgQueue::mqp mqp=_mqmp->selectQueue(qname);if(mqp.get()==nullptr){DLOG("进行绑定的%s队列不存在",qname.c_str());}return _bmp->bind(ename,qname,key,ep->durable&&mqp->durable);}void unbind(const std::string& ename,const std::string& qname){return _bmp->unbind(ename,qname);}MsgQueueBindingMap exchangeBindings(const std::string& ename){return _bmp->getExchangeBindings(ename);}bool existsBinding(const std::string& ename,const std::string& qname){return _bmp->exists(ename,qname);}QueueMap allQueues(){return _mqmp->allQueues();}bool basicPublish(const std::string &qname, ys::BasicProperties *bp, //insertconst std::string &body){//消息的持久化先是由队列先决定,然后再由消息本身决定MsgQueue::mqp mqp=_mqmp->selectQueue(qname);if(mqp.get()==nullptr){DLOG("进行发布消息失败,%s消息队列不存在",qname.c_str());return false;}return _mmp->insert(qname,bp,body,mqp->durable);}MsgPtr basicConsume(const std::string &qname) //front{return _mmp->front(qname);}void basicAck(const std::string &qname, const std::string &msg_id) //ack{return _mmp->ack(qname,msg_id);}void clear(){_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();}private:std::string _name;ExchageManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;};
7.6、路由交换模块
功能:
判断消息的 routing_key 和 binding_key 是否可以进行配对成功
取决要素两个:
- 交换机类型
- routing_key 和 binding_key 的匹配
因此基于功能的需求分析,路由交换模块只需要对传入的数据进行处理即可,不需要进行设置成员变量,这个模块进行实现的就是一个功能接口类
提供的功能
- 判断routing_key是否合法
- 必须是a~z、A~Z、0~9、. 、_这五部分进行组成
- 判断binding_key是否合法
- 必须是a~z、A~Z、0~9、. # * 、_ 这五部分进行组成
- 进行routing_key和binding_key的匹配
- 直接路由:只有routing_key和binding_key完全一致才可以路由成功
- 广播路由:无论如何都能进行路由成功
- 匹配路由:只有满足特定的规则才可以进行路由成功
补充:匹配路由的规则
因为有通配符的存在,不能单单进行字符的比较还应该有具体的算法进行规定新的匹配规则
匹配算法的思想:
- 对binding_key 和 routing_key 进行字符串分割
- 按照元素的个数进行创建大小+1的二维数组,然后将二维数组dp [ 0 ] [ 0 ] 的位置进行置1
- 使用routing_key 和 binding_key 的每个元素进行比对,在二维数组中进行标记
标记的规则
- 当两个单词进行标记成功时,直接从左上方进行继承结果
- 当遇到 # 通配符时,不仅可以从左上方、左方进行继承结果,也可以从上方进行继承结果
- 当binding_key 以 # 进行起始时,需要将 # 对应行的第0列进行置为1
算法实现的步骤
- 对binding_key 和 routing_key 进行字符串分割
- 按照元素的个数进行创建大小+1的二维数组,然后将二维数组dp [ 0 ] [ 0 ] 的位置进行置1
- 当binding_key 以 # 进行起始时,需要将 # 对应行的第0列进行置为1
- 使用routing_key中的每个单词与binding_key中的每个单词进行匹配并进行标记数组
注意:# 代表省略任意个的元素,但是* 只能代表省略的一个元素
* 和 # 必须单独存在(不能和其他部分一起出现在一个单词中)
# 通配符两边不能出现其他通配符
class Router{public:static bool isLegalRoutingKey(const std::string &routing_key){// 必须是a~z、A~Z、0~9、. 、_这五部分进行组成for (auto &e : routing_key){if ((e >= 'a' && e <= 'z') || (e >= 'A' && e <= 'Z') || (e >= '0' && e <= '9') || e == '_' || e == '.'){continue;}return false;}return true;}static bool isLegalBindingKey(const std::string &binding_key){// 1、必须是a~z、A~Z、0~9、. # * 、_ 这五部分进行组成for (auto &e : binding_key){if ((e >= 'a' && e <= 'z') || (e >= 'A' && e <= 'Z') || (e >= '0' && e <= '9') || e == '_' || e == '.' || e == '*' || e == '#'){continue;}return false;}// 2、并且#和*必须进行独立存在std::vector<std::string> words;StrHelper::split(binding_key, ".", words);for (auto &word : words){size_t size = word.size();if (word.size() > 1 && (word.find("*") != std::string::npos ||word.find("#") != std::string::npos)) {return false;}}// 3、#的两侧不能有通配符size_t size = words.size();for (int i = 1; i < size; i++){if (words[i] == "#" && words[i - 1] =="*"){return false;}if (words[i] == "#" && words[i - 1] =="#"){return false;}if (words[i] == "*" && words[i - 1] =="#"){return false;}}return true;}static bool route(ys::ExchangeType type, std::string &routing_key, const std::string &binding_key){//直接路由if(type==ys::ExchangeType::DIRECT){return routing_key==binding_key;}//绑定路由else if(type==ys::ExchangeType::FANOUT){return true;}//主题路由else{//需要进行特定的规则的模式匹配---动态规划的思想//1、对binding_key 和 routing_key 进行字符串分割std::vector<std::string> bkeys;std::vector<std::string> rkeys;int n_rkey=StrHelper::split(routing_key,".",rkeys);int n_bkey=StrHelper::split(binding_key,".",bkeys);//2、按照元素的个数进行创建大小+1的二维数组,然后将二维数组dp [ 0 ] [ 0 ] 的位置进行置1std::vector<std::vector<bool>> dp(n_bkey+1,std::vector<bool>(n_rkey+1,false));dp[0][0]=true;//3、当binding_key 以 # 进行起始时,需要将 # 对应行的第0列进行置为1for(int i=1;i<=bkeys.size();i++){if(bkeys[i-1]=="#"){dp[i][0]=true;continue;}break;}//4、使用routing_key中的每个单词与binding_key中的每个单词进行匹配并进行标记数组for(int i=1;i<=n_bkey;i++){for(int j=1;j<=n_rkey;j++){//如果bkey为* 或者两个单词相同则匹配成功if(bkeys[i-1]=="*"||bkeys[i-1]==rkeys[j-1]){dp[i][j]=dp[i-1][j-1];}else if(bkeys[i-1]=="#"){dp[i][j]=dp[i-1][j-1]|dp[i-1][j]|dp[i][j-1];}}}return dp[n_bkey][n_rkey];}}};
7.7、队列的消费者/订阅者的管理模块
一个客户端占用一个连接太浪费了,因此使用的是一个客户端占用的是一个信道进行网络通信,根据信道的功能定义出描述信道功能的角色——生产者和消费者。
消费者是对于信道描述的角色,消费者和信道的关系是一一对应的,那么要不要以信道为单元进行消费者的管理呢?其实这样是不合适的,消费者只是在信道进行关闭的时候消费者进行删除,但是还有另一种情况,就是当队列中有了消息就需要找到对应的订阅消息的消费者,这种情况显然是最多的,因此消费者的管理是以队列为单元的。
7.7.1、消费者结构层
- 消费者标识
- 订阅队列名称
- 是否就进行自动应答
- 一个消息进行处理的回调函数(实现的是当进行发布一条消息,如何进行选择消费者进行消费,如何进行消费?对于服务端来说就是调用这个回调函数来进行处理,其内部的逻辑就是找到对应的消费者连接,然后将数据进行发送给消费者对应的客户端)
using ConsumerCallback = std::function<void(const std::string&, const ys::BasicProperties *bp, const std::string&)>;struct Consumer{using ptr=std::shared_ptr<Consumer>;std::string _cid;std::string _qname;bool _auto_ack;ConsumerCallback _callback;Consumer(){DLOG("new Consumer:%p",this);}Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb): _cid(cid), _qname(qname), _auto_ack(auto_ack), _callback(std::move(cb)){DLOG("new Consumer:%p",this);}~Consumer(){DLOG("del Consumer:%p",this);}};
7.7.2、消费者管理层
提供的操作
- 新增消费者:信道进行提供的服务是订阅队列消息的时候进行创建消费者
- 删除消费者:取消订阅 / 信道关闭 /连接关闭的时候进行删除消费者
- 获取消费者:从队列所有消费者中进行按序取出一个消费者进行消息的推送
- 判断队列的消费者是否为空(测试时使用)
- 判断指定的消费者是否存在(测试时使用)
包含的成员
- 消费者结构:vector
- 轮询序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者进行消费即可,因此采用RR轮转的方式
- 互斥锁:保证线程安全
- 队列名称
class QueueConsumer{public:using ptr=std::shared_ptr<QueueConsumer>;QueueConsumer( const std::string &qname):_qname(qname){}//队列新增消费者Consumer::ptr create(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb){std::unique_lock<std::mutex> lock(_mutex);//1、先进行判断指定队列中是否存在该消费者for(auto& consumer:_consumers){//2、存在则直接进行返回if(consumer->_cid==cid){return Consumer::ptr();}}//3、不存在则直接进行创建消费者Consumer::ptr cp=std::make_shared<Consumer>(cid,qname,auto_ack,cb);_consumers.push_back(cp);return cp;}//队列移除消费者void remove(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);for(auto it=_consumers.begin();it!=_consumers.end();it++){if((*it)->_cid==cid){_consumers.erase(it);return;}}return;}//队列获取消费者--rr轮转Consumer::ptr choose(){std::unique_lock<std::mutex> lock(_mutex);if(_consumers.size()==0){return Consumer::ptr();}int index=_rr_seq%_consumers.size();_rr_seq++;return _consumers[index];}//消费者是否为空bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _consumers.size()==0;}//指定消费者是否存在bool exists(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);for(auto it=_consumers.begin();it!=_consumers.end();it++){if((*it)->_cid==cid){return true;}}return false;}//清理队列中的消费者void clear(){std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq=0;}private:std::string _qname;std::mutex _mutex;u_int64_t _rr_seq; //sequencestd::vector<Consumer::ptr> _consumers;};
当发布客户端进行发布了一条消息,存在多个消费者,消息怎么分配?
一般最容易想到的两种策略分别是
直接分配(比如只给第一个消费者)的问题
-
容易导致 第一个消费者负载过重
-
其他消费者 可能永远没机会消费
-
队列整体的消费能力受限
随机分配的问题
-
随机分配虽然避免了固定消费者过载,但随机算法可能不均匀,可能出现:
-
消费者A短时间内连续接收,B、C长时间空闲
-
局部抖动性(随机抖动,导致消息分配不平滑)
-
这里在进行获取消费者时采用的是RR轮转的方式,可以进行避免上述两种算法带来的弊端
RR轮转算法在该模块的应用
-
注册时,消费者会被 依次添加到
_consumers
vector 中,顺序添加的消费者,天然形成了“轮转次序”
- 先检查是否有消费者
- 通过 取模运算,定位下一个该轮转的消费者,然后将轮转序号++
7.7.3、对消费者管理层
- 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)
- 向指定队列进行新增消费者(客户端订阅指定队列消息的时候):新增完成的时候进行返回消费者对象
- 从指定队列进行移除消费者(客户端取消订阅的时候)
- 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理对象
- 从指定的队列获取一个消费者(轮询调度--消费者轮换起到负载均衡的作用)
class ConsumerManager{public:using ptr=std::shared_ptr<ConsumerManager>;ConsumerManager(){}void initQueueConsumer(const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);//1、是否存在该队列的消费者管理器auto pos=_qconsumers.find(qname);//2、存在则返回if(pos!=_qconsumers.end()){return;}//3、不存在直接进行创建队列消费者管理器QueueConsumer::ptr qp=std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname,qp));return;}void destoryConsumer(const std::string& qname){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname);}Consumer::ptr create(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &&cb){QueueConsumer::ptr ptr;{std::unique_lock<std::mutex> lock(_mutex);auto pos=_qconsumers.find(qname);if(pos==_qconsumers.end()){DLOG("没有找到队列%s的消费者管理句柄",qname.c_str());return Consumer::ptr();}ptr=pos->second;}return ptr->create(cid,qname,auto_ack,cb);}void remove(const std::string &cid, const std::string &qname){QueueConsumer::ptr ptr;{std::unique_lock<std::mutex> lock(_mutex);auto pos=_qconsumers.find(qname);if(pos==_qconsumers.end()){DLOG("没有找到队列%s的消费者管理句柄",qname.c_str());return ;}ptr=pos->second;}return ptr->remove(cid);}Consumer::ptr choose(const std::string &qname){QueueConsumer::ptr ptr;{std::unique_lock<std::mutex> lock(_mutex);auto pos=_qconsumers.find(qname);if(pos==_qconsumers.end()){DLOG("没有找到队列%s的消费者管理句柄",qname.c_str());return Consumer::ptr();}ptr=pos->second;}return ptr->choose();}bool empty(const std::string &qname){QueueConsumer::ptr ptr;{std::unique_lock<std::mutex> lock(_mutex);auto pos=_qconsumers.find(qname);if(pos==_qconsumers.end()){DLOG("没有找到队列%s的消费者管理句柄",qname.c_str());return true;}ptr=pos->second;}ptr->empty();}bool exists(const std::string &cid,const std::string &qname){QueueConsumer::ptr ptr;{std::unique_lock<std::mutex> lock(_mutex);auto pos=_qconsumers.find(qname);if(pos==_qconsumers.end()){DLOG("没有找到队列%s的消费者管理句柄",qname.c_str());return false;}ptr=pos->second;}return ptr->exists(cid);}void clear(){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string,QueueConsumer::ptr> _qconsumers;};
在这个模块中进行传入的参数qname都是代表为该名称的队列进行建立的消费者池
例如在 initQueueConsumer中
传入的 qname,其实是为了告诉 ConsumerManager:“我现在需要给这个特定队列(名字是 qname)建立消费者管理池。”
然后,ConsumerManager 中的 initQueueConsumer 会为该 qname 创建一个新的 QueueConsumer(除非之前已经创建过了)。
这样,队列 qname 对应的所有消费者就都在 QueueConsumer::_consumers 这个 vector 中被管理。
7.8、信道的管理模块
模块定位
channel模块是服务端的核心组件,代表客户端与虚拟主机之间的连接信道。它是客户端与服务器进行消息交互的上下文,负责将客户端请求转化为后端的实际操作。
信道模块将以下关键模块进行整合和协调:
-
虚拟机模块(VirtualHost):队列、交换机、绑定关系等数据管理和核心操作。
-
路由模块(Router):根据交换机类型和路由键,决定消息路由的目标队列。
-
消费者模块(ConsumerManager):负责消费者的管理与消息推送。
通过信道,客户端可进行:声明/删除交换机、声明/删除队列、绑定/解绑队列、发布消息、消费消息等核心操作。
信道模块的核心职责
客户端请求处理
- 接收并解析客户端的二进制协议请求(由 ProtobufCodec 负责解码为 Protobuf 对象)。
- 根据请求类型(如 declareExchangeRequest, basicPublishRequest 等),分发到对应处理函数。
协调模块操作
- 使用 VirtualHost 进行数据层操作(队列/交换机的声明、绑定、消息持久化等)。
- 借助 Router 进行路由决策,找到目标队列的订阅者。
- 调用 ConsumerManager 创建或销毁消费者,或者推送消息给消费者。
消息传递和路由
- 客户端发布的消息,先路由到交换机,交换机将消息分发到符合路由键条件的队列。
- 信道从队列中消费消息,并通过消费者回调推送给客户端
7.7.1、管理的操作
- 提供声明&删除交换机的操作(删除交换机的同时删除交换机的相关绑定信息)
- 提供声明&删除队列的操作 (删除队列的同时,删除队列的绑定信息、消息、订阅该队列的消费者信息)
- 提供绑定&和解绑队列的操作
- 提供订阅&取消订阅的操作
- 提供发布&和确认消息的操作
在信道模块中调用虚拟机模块、消息模块等时参数很多很麻烦,因此用请求对象封装这些参数,简化函数调用和模块对接。通俗说法就是:参数太多,直接传太麻烦,我打包成一个结构体传过去,模块调用就清爽多了。
客户端通过网络传输将信息进行发布到服务器上,这个过程既然设计到网络传输就需要进行将数据进行序列化和反序列化,但是序列化和反序列化是底层的协议,还需要通过自定义应用层的协议进行解决TCP的粘包问题。(序列化和反序列化都是使用陈硕大佬的muduo库中的protobuf协议进行的,应用层的协议也是进行使用陈硕大佬进行进行设计好的协议,如下图)
7.7.2、管理的信息
- 信道ID:信道的唯一标识
- 信道关联的消费者:用于消费者在信道进行关闭的时候进行取消订阅,删除订阅者消息
- 信道关联的连接:用于客户端进行发送数据(响应,推送的消息)
- protobuf协议处理句柄:网络通信前的协议处理
- 消费者管理句柄:信道关闭或者取消订阅时,通过句柄进行删除订阅者的信息
- 虚拟机句柄:交换机/队列/绑定/小i西数据的管理
- 工作线程池句柄(一条消息被发布到队列后,需要将消息推送到订阅队列的消费者,该过程由线程池进行完成)-- 异步工作池
namespace brush
{using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr=std::shared_ptr<ys::openChannelRequest>;using closeChannelRequestPtr=std::shared_ptr<ys::closeChannelRequest>;using declareExchangeRequestPtr=std::shared_ptr<ys::declareExchangeRequest>;using deleteExchangeRequestPtr=std::shared_ptr<ys::deleteExchangeRequest>;using declareQueueRequestPtr=std::shared_ptr<ys::declareQueueRequest>;using deleteQueueRequestPtr=std::shared_ptr<ys::deleteQueueRequest>;using queueBindRequestPtr=std::shared_ptr<ys::queueBindRequest>;using queueUnBindRequestPtr=std::shared_ptr<ys::queueUnBindRequest>;using basicPublishRequestPtr=std::shared_ptr<ys::basicPublishRequest>;using basicAckRequestPtr=std::shared_ptr<ys::basicAckRequest>;using basicConsumeRequestPtr=std::shared_ptr<ys::basicConsumeRequest>;using basicCancelRequestPtr=std::shared_ptr<ys::basicCancelRequest>;class Channel{public:using ptr=std::shared_ptr<Channel>;Channel(const std::string& cid,const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn,const threadpool::ptr pool):_cid(cid),_host(host),_cmp(cmp),_codec(codec),_conn(conn),_pool(pool){DLOG("new Channel:%p",this);}~Channel(){if(_consumer.get()!=nullptr){_cmp->remove(_consumer->_cid,_consumer->_qname);}DLOG("del Channel:%p",this);}//交换机的声明void declareExchange(const declareExchangeRequestPtr& req){bool ret=_host->declareExchange(req->exchange_name(),req->exchange_type(),req->durable(),req->auto_delete(),req->args());//声明完成进行响应进行返回return basicResponse(ret,req->rid(),req->cid());}//交换机的删除void deleteExchange(const deleteExchangeRequestPtr& req){_host->deleteExchange(req->exchange_name());return basicResponse(true,req->rid(),req->cid());}//队列声明void declareQueue(const declareQueueRequestPtr& req){//与队列有关的 信息 绑定 消费者 虚拟机-信息&绑定bool ret=_host->declareQueue(req->queue_name(),req->durable(),req->exclusive(),req->auto_delete(),req->args());if(ret==false){return basicResponse(false,req->rid(),req->cid());}_cmp->initQueueConsumer(req->queue_name());return basicResponse(true,req->rid(),req->cid());}//队列删除void deleteQueue(const deleteQueueRequestPtr& req){_cmp->destoryConsumer(req->queue_name());_host->deleteQueue(req->queue_name());return basicResponse(true,req->rid(),req->cid());}//绑定队列void queueBind(const queueBindRequestPtr& req){bool ret=_host->bind(req->exchange_name(),req->queue_name(),req->binding_key());if(ret==false){return basicResponse(false,req->rid(),req->cid());}return basicResponse(true,req->rid(),req->cid());}//解除绑定队列void queueUnBind(const queueUnBindRequestPtr& req){_host->unbind(req->exchange_name(),req->queue_name());return basicResponse(true,req->rid(),req->cid());}//消息发布void basicPublish(const basicPublishRequestPtr& req){//1、获取交换机的信息Exchange::ptr ep=_host->selectExchange(req->exchange_name());if(ep.get()==nullptr){return basicResponse(false,req->rid(),req->cid());}//2、进行交换路由,获取交换机的绑定信息,判断消息可以进行发送到哪个交换机的绑定队列中MsgQueueBindingMap mqbm=_host->exchangeBindings(req->exchange_name());ys::BasicProperties* properties=nullptr;std::string routing_key;if(req->has_properties()){properties=req->mutable_properties();routing_key=req->properties().routing_key();}for(auto& binding:mqbm){if(Router::route(ep->type,routing_key,binding.second->binding_key)){//3、将消息进行添加到队列中(添加消息管)_host->basicPublish(binding.first,properties,req->body());//4、向线程池中添加一个消息消费任务(向指定队列的订阅者去推送消息)auto task=std::bind(&Channel::consume,this,binding.first);_pool->push(task);}}return basicResponse(true,req->rid(),req->cid());}//消息确认void basicAck(const basicAckRequestPtr& req){_host->basicAck(req->queue_name(),req->message_id());return basicResponse(true,req->rid(),req->cid());}//订阅队列消息void basicConsumer(const basicConsumeRequestPtr& req){//告诉服务器哪个队列有消息了告诉我一下//1、判断队列是否存在bool ret=_host->existsQueue(req->queue_name());if(ret==false){return basicResponse(false,req->rid(),req->cid());}//2、创建队列的消费者进行处理auto cb=std::bind(&Channel::callback,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);_consumer=_cmp->create(req->consume_tag(),req->queue_name(),req->auto_ack(),cb);return basicResponse(true,req->rid(),req->cid());}//取消订阅void basicCannel(const basicCancelRequestPtr& req){_cmp->remove(req->consume_tag(),req->queue_name());return basicResponse(true,req->rid(),req->cid());}private:void callback(const std::string& tag, const ys::BasicProperties *bp, const std::string& body){//根据参数进行组织出推动消息的请求,将消息进行推送到channel对应的客户端ys::basicConsumeResponse resp;resp.set_cid(_cid);resp.set_body(body);resp.set_consume_tag(tag);if(bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn,resp);}void consume(const std::string& qname){//指定队列进行消费消息//1、从队列中取出一条消息MsgPtr msg=_host->basicConsume(qname);if(msg.get()==nullptr){DLOG("执行消费任务失败,%s队列中没有消息",qname.c_str());return;}//2、从队列的订阅者中进行取出一个订阅者Consumer::ptr consumer=_cmp->choose(qname);if(consumer.get()==nullptr){DLOG("执行消费任务失败,%s队列中没有消费者",qname.c_str());return;}//3、调用订阅者对应的消息处理函数,实现消息的推送consumer->_callback(consumer->_cid,msg->mutable_payload()->mutable_properties(),msg->payload().body());//4、判断如果订阅者是自动确认--不需要等待确认,直接进行删除小i下,否则需要外部进行消息确认后再删除if(consumer->_auto_ack){_host->basicAck(qname,msg->payload().properties().id());}}void basicResponse(bool ok,const std::string& rid,const std::string& cid){ys::basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn,resp);}private:std::string _cid;Consumer::ptr _consumer;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;threadpool::ptr _pool;};
信道的设计一个关键点是:信道内不直接执行消费操作,而是交由线程池异步执行。
例如,basicPublish 时,将 consume 消费任务通过 threadpool->push 提交到线程池。这样避免了信道直接阻塞在长耗时操作,提升系统整体吞吐能力。消息消费完全由线程池线程异步处理,主 I/O 线程依然可以快速响应新请求,保持高并发处理能力。
7.7.3、对信道的管理
增删查
class ChannelManager{public:using ptr=std::shared_ptr<ChannelManager>;ChannelManager(){}bool openChannel(const std::string& cid,const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn,const threadpool::ptr& pool){std::unique_lock<std::mutex> lock(_mutex);auto cn=std::make_shared<Channel>(cid,host,cmp,codec,conn,pool);auto pos=_channels.find(cid);if(pos==_channels.end()){return false;}_channels.insert(std::make_pair(cid,cn));return true;}void closeChannel(const std::string& cid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr getChannel(const std::string& cid){std::unique_lock<std::mutex> lock(_mutex);auto pos=_channels.find(cid);if(pos==_channels.end()){return Channel::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};
7.9、连接管理模块
背景
连接管理模块进行管理的就是客户端的连接请求,由于客户端到服务器之间涉及网络通信,网络通信连接的句柄其实就是一个原生套接字的描述符,但是我们使用的是陈硕大佬的Muduo库进行的网络通信,不需要进行管理原生套接字,直接进行管理Muduo库提供的Connection 连接对象,但是Muduo库的连接对象还是不太符合我们的需求,因为我们这个项目中的连接还需要进行细分出信道这个概念,所以成员信息中需要进行增加信道的管理句柄,连接操作需要提供创建信道和删除信道的操作
每个客户端连接只负责自身的通信和状态维护,而像用户管理、协议处理、消费者管理、线程池等资源是整个服务器全局共享的,统一由服务器创建并传入连接进行使用。这样可以避免为每个连接重复构造这些高开销组件,保持系统状态一致性,降低资源消耗,提高模块之间的协同效率,因此没必要让每个连接单独持有这些模块的实例,直接使用智能指针即可。
模块定位
服务端连接模块是网络层与业务层之间的桥梁。它以 Connection 为核心,管理每个 TCP 连接的上下文、信道和请求状态;以 ConnectionManager 为全局活跃连接表,提供安全高效的增删查能力。
连接模块 通过处理客户端的请求(如“我要创建队列”),调用 资源整合模块 的接口 ( VirtualHost→declareQueue() ) 来真正地操作这些核心资源。
主要职责
-
连接模块 负责“沟通协调”,让不同的请求、不同的客户端有序访问资源。
-
资源整合模块 负责“背后调度和资源落地”,确保客户端需要的交换机、队列和消息等都能被安全、高效地管理。
7.9.1、管理的信息
- 连接关联的信道管理句柄(实现信道的增删改查)
- protobuf协议处理句柄:网络通信前的协议处理
- 消费者管理句柄:信道关闭或者取消订阅时,通过句柄进行删除订阅者的信息
- 虚拟机句柄:交换机/队列/绑定/小i西数据的管理
- 工作线程池句柄(一条消息被发布到队列后,需要将消息推送到订阅队列的消费者,该过程由线程池进行完成)-- 异步工作池
7.9.2、管理的操作
- 创建信道
- 删除信道
class Connection{public:using ptr=std::shared_ptr<Connection>;Connection(const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn,const threadpool::ptr pool):_host(host),_cmp(cmp),_codec(codec),_conn(conn),_pool(pool),_channels(std::make_shared<ChannelManager>()){}//收到连接请求进行创建信道void openChannel(const openChannelRequestPtr& req){//创建信道bool ret=_channels->openChannel(req->rid(),_host,_cmp,_codec,_conn,_pool);//给客户端进行回复if(ret==false){return basicResponse(false,req->rid(),req->cid());}return basicResponse(true,req->rid(),req->cid());}void closeChannel(const closeChannelRequestPtr& req){_channels->closeChannel(req->cid());return basicResponse(true,req->rid(),req->cid());}Channel::ptr getChannel(const std::string &cid) {return _channels->getChannel(cid);}private:void basicResponse(bool ok,const std::string& rid,const std::string& cid){ys::basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn,resp);}private:muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;threadpool::ptr _pool;ChannelManager::ptr _channels;};
7.9.3、对连接管理的信息
连接的增删改查
通过muduo库中的连接进行基础的网络通信,但是要进行我们自己的操作,需要通过muduo库中的连接对象来找到我们自己的连接对象,然后进行网络通信。
class ConnectionManager{//实现连接的增删查public:using ptr=std::shared_ptr<ConnectionManager>;ConnectionManager(){}void newConnection(const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn,const threadpool::ptr pool){std::unique_lock<std::mutex> lock(_mutex);auto pos=_conns.find(conn);if(pos!=_conns.end()){return;}auto self_conn=std::make_shared<Connection>(host,cmp,codec,conn,pool);_conns.insert(std::make_pair(conn,self_conn));}void delConnection(const muduo::net::TcpConnectionPtr& conn){std::unique_lock<std::mutex> lock(_mutex);_conns.erase(conn);}Connection::ptr getConnection(const muduo::net::TcpConnectionPtr& conn){std::unique_lock<std::mutex> lock(_mutex);auto pos=_conns.find(conn);if(pos==_conns.end()){return Connection::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr,Connection::ptr> _conns;};
}
7.10、服务器模块的整合
模块定位与主要功能
将整个消息队列的服务进行封装,连接模块通过信道(Channel)把客户端请求发送到服务器资源整合模块。资源整合模块处理这些请求,保证消息持久化、队列消费、路由分发等功能。
class Server{#define DBFILE "/meta.db"#define HOSTNAME "MyVirtualHost"public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Server(int port,const std::string& basedir):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOSTNAME,basedir,basedir+DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<threadpool>()){// 在进行初始化的时候,恢复历史消息,此时队列已经进行创建出来了,还需要进行初始化消费者队列的管理结构QueueMap qm=_virtual_host->allQueues();for(auto& q:qm){_consumer_manager->initQueueConsumer(q.first);}// 注册消息类型和处理函数_dispatcher.registerMessageCallback<ys::openChannelRequest>(std::bind(&Server::onOpenChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::closeChannelRequest>(std::bind(&Server::onCloseChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::queueBindRequest>(std::bind(&Server::onQueueBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicPublishRequest>(std::bind(&Server::onBasicPublish, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicAckRequest>(std::bind(&Server::onBasicAck, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicCancelRequest>(std::bind(&Server::onBasicCancel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){_server.start();_baseloop.loop();}private://打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("打开信道时没有找到对应的Connection对象");conn->shutdown();return;}return mconn->openChannel(message);}//关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("关闭信道时没有找到对应的Connection对象");conn->shutdown();return;}return mconn->closeChannel(message);}//声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("声明交换机时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("声明交换机时没有找到对应的信道");return;}return cp->declareExchange(message);}//删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("删除交换机时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("删除交换机时没有找到对应的信道");return;}return cp->deleteExchange(message);}//声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("声明队列时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("声明队列时没有找到对应的信道");return;}return cp->declareQueue(message);}//删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("删除队列时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("删除队列时没有找到对应的信道");return;}return cp->deleteQueue(message);}//队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("队列绑定时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("队列绑定时没有找到对应的信道");return;}return cp->queueBind(message);}//队列解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("队列解绑时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("队列解绑时没有找到对应的信道");return;}return cp->queueUnBind(message);}//消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("消息发布时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("消息发布时没有找到对应的信道");return;}return cp->basicPublish(message);}//消息确认void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("消息确认时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("消息确认时没有找到对应的信道");return;}return cp->basicAck(message);}//队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("消息订阅时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("队列消息订阅时没有找到对应的信道");return;}return cp->basicConsumer(message);}//队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr &message, muduo::Timestamp){//1、先进行连接查找Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("消息订阅时没有找到对应的Connection对象");conn->shutdown();return;}//2、通过连接查找再去进行查找信道Channel::ptr cp=mconn->getChannel(message->cid());if(cp.get()==nullptr){DLOG("队列消息订阅时没有找到对应的信道");return;}return cp->basicCannel(message);}// 未知消息处理void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}// 连接事件处理void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_connection_manager->newConnection(_virtual_host,_consumer_manager,_codec,conn,_threadpool);}else{_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher; // 请求分发器对象ProtobufCodecPtr _codec; // protobuf协议处理器ConsumerManager::ptr _consumer_manager;VirtualHost::ptr _virtual_host;ConnectionManager::ptr _connection_manager;threadpool::ptr _threadpool;};
服务器连接模块交互流程
- 在 Server 启动时,它首先会根据传入的端口和数据存储目录创建 VirtualHost(管理交换机、队列及其绑定关系)、ConsumerManager(消费者管理器)以及 ConnectionManager(连接上下文管理器)。初始化时会从 VirtualHost 恢复持久化的队列数据,并为每个队列初始化消费者管理结构。
- 服务器通过 muduo::net::TcpServer 监听客户端的连接请求,每当有新连接建立(onConnection 回调中 conn->connected()),就调用 ConnectionManager 创建并管理一个新的 Connection 上下文对象。这个 Connection 代表一个物理 TCP 连接,封装了与该客户端相关的会话信息。
- 当客户端发送数据时,TcpServer 的消息回调会触发 ProtobufCodec::onMessage,对消息进行 protobuf 解码。解码后的消息交给 ProtobufDispatcher,它会根据消息类型分发到对应的处理函数(比如 onOpenChannel、onDeclareQueue 等)。
- 在各个具体的消息处理函数(如 onDeclareQueue)中,首先会通过 ConnectionManager 根据连接 conn 查找出对应的 Connection。在一个 Connection 中,可能会有多个逻辑信道(Channel)并发处理不同的业务。处理消息时,Connection 会根据消息中的信道 ID(cid)查找或管理相应的 Channel。
- 一旦找到了对应的 Channel,消息的具体业务逻辑就委托给 Channel 中的处理方法(比如 cp->declareQueue(message))来执行。这样,逻辑就形成了从 TCP 连接(Connection) → 逻辑信道(Channel) → 具体队列/交换机的操作 的完整链路。
- 如果在任何步骤中找不到对应的 Connection 或 Channel(比如客户端发送了错误或非法消息),服务器会关闭连接以保护系统安全