【仿RabbitMQ的发布订阅式消息队列】--- 服务端模块
Welcome to 9ilk's Code World

(๑•́ ₃ •̀๑) 个人主页: 9ilk
(๑•́ ₃ •̀๑) 文章专栏: 项目
本篇博客主要是对消息队列服务端模块进行梳理。
交换机数据管理
该模块主要是对交换机相关数据的管理,主要的是先在内存中管理,再根据是否持久化进行持久化。
1. 交换机数据类:
a. 交换机名称
b. 交换机类型
c. 交换机是否持久标志
d. 是否自动删除标志
e. 其他参数
2. 交换机数据持久化类
a. 声明/删除交换机表
b. 新增交换机数据
c. 移除交换机数据
d. 查询所有交换机数据 / 恢复历史数据(为内存管理提供的接口)
e. 查询指定交换机数据(根据名称)
3. 交换机数据内存管理类
a. 声明交换机并添加管理(存在则OK,不存在则创建)
b. 删除交换机
c. 获取指定交换机
d. 交换机是否存在的判断
e. 清理所有交换机数据
f. 交换机数量
交换机类
struct Exchange{using ptr = shared_ptr<Exchange>;//1.交换机名称string _name;//2.交换机类型ExchangeType _type;//3.交换机持久化标志bool _durable;//4.是否自动删除标志bool _auto_delete;//5.其他参数google::protobuf::Map<std::string, std::string> _args;//提供一个默认构造Exchange(){}Exchange(const string& ename,ExchangeType etype,bool edurable,bool eauto_delete,const google::protobuf::Map<std::string, std::string>& eargs):_name(ename),_type(etype),_durable(edurable),_auto_delete(eauto_delete),_args(eargs){}//args存储键值对,在存储数据库的时候会组织一个格式字符串进行存储 key=val&key=val...void setArgs(const string& str_args)//内部解析str_args字符串将内容存储到成员中{//1.按照&分割每一组参数kvvector<string> result;StringHelper::SplitStr(str_args,"&",result);//2.将每一组按照=分割for(auto& str : result){ //key=valsize_t pos = str.find("=");string key = str.substr(0,pos);string val = str.substr(pos+1);_args[key] = val; }}string getArgs() //将args中的内容进行序列化后,返回一个字符串{stringstream ss;for(auto& pr : _args)ss << pr.first << "=" << pr.second << "&"; return ss.str(); }};
交换机数据持久化类
数据持久化我们这里采取的是对交换机的数据持久化到数据库中,同时既然持久化了,到时服务器重启的时候,我们需要借助持久化的数据恢复内存中的数据继续提供服务,因此我们还需要提供一个recovery()接口来恢复历史数据
using ExchangeMap = std::unordered_map<string,Exchange::ptr>;class ExchangeMapper{public:ExchangeMapper(const string& dbfile):_sql_helper(dbfile){//1.目录string parentDir = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(parentDir);//2.文件assert(_sql_helper.open());//3.创建表createTable();}//声明交换机表void createTable(){ //1.编写创建表语句#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key,\type int,\durable int,\auto_delete int,\args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE,nullptr,nullptr);if(ret == false) //创建表失败没必要再继续看{ERR_LOG("创建交换机数据表失败!");abort();}return;}//删除交换机表void removeTable(){#define REMOVE_TABLE "drop table if exists exchange_table"bool ret = _sql_helper.exec(REMOVE_TABLE,nullptr,nullptr);if(ret == false) //创建表失败没必要再继续看{ERR_LOG("删除交换机数据表失败!");abort();}return;}//新增交换机数据void insert(Exchange::ptr& exchange){stringstream ss;ss << "insert into exchange_table values (";ss << "'" << exchange->_name << "',";ss << exchange->_type << ",";ss << exchange->_durable << ",";ss << exchange->_auto_delete << ",";ss << "'" << exchange->getArgs() << "');";if(!_sql_helper.exec(ss.str(),nullptr,nullptr))ERR_LOG("插入交换数据表失败!");}//移除交换机数据void remove(const string&name){string sql = "delete from exchange_table where name='" ;sql += name;sql += "';";if(!_sql_helper.exec(sql,nullptr,nullptr))ERR_LOG("删除交换机%s失败!",name.c_str());}//恢复历史数据ExchangeMap recovery(){ExchangeMap emp;string sql = "select name,type,durable,auto_delete,args from exchange_table;";bool ret = _sql_helper.exec(sql,selectCallback,&emp);if(ret == false){ERR_LOG("恢复历史数据失败!");abort();return ExchangeMap();}return emp;}private://注意回调必须设置为静态否则会多个this指针参数,无法匹配static int selectCallback(void* arg,int numcol,char** row,char** fields){ExchangeMap* emptr = (ExchangeMap*)arg;//1.构造Exchange对象Exchange::ptr exchange = make_shared<Exchange>();exchange->_name = row[0];exchange->_type = (zmq::ExchangeType)stoi(row[1]);exchange->_durable = (bool)stoi(row[2]);exchange->_auto_delete = (bool)stoi(row[3]);if(row[4]) //注意这里有参数才设置exchange->setArgs(row[4]);//2.插入到ExchangeMap中emptr->insert(make_pair(exchange->_name,exchange));//3.一定返回0return 0;}private:SqLiteHelper _sql_helper; };
交换机数据内存管理类
我们交换机类是有一个是否持久化标志的,因此内存管理类,势必需要一个交换机数据持久化句柄,同时内存也要管理,我们可以采用hashmap容器进行管理。
需要注意的是:
1. 在构造内存管理类时,就可以使用持久化句柄恢复历史数据。
2. 这些操作都是需要加锁的,不然可能被多个线程操作存在线程安全问题。
3. 为了尽可能减少数据库的I/O操作提高效率,在对交换机的增删操作,我们可以现在内存中查找是否已经存在过了。
4. 判断某个交换机是否存在这个操作我们是单独提供的,但是不能在其他接口复用这个操作,否则可能产生死锁或未定义行为,因此我们需要自己单独判断,同时这部分操作需要放在临界区中,因为有可能你刚判断完也没有该交换机,后面就有线程先拿到锁进行新增,导致重复添加
//3.交换机数据内存管理类class ExchangeManager{public:using ptr = shared_ptr<ExchangeManager>;ExchangeManager(const string& dbfile):_mapper(dbfile){_exchanges = _mapper.recovery();}//声明交换机void declareExchange(const string& name,ExchangeType type,bool durable,bool auto_delete,const google::protobuf::Map<std::string, std::string>& args){unique_lock<std::mutex> lock(_mtx);//1.构造交换机对象//判断是否已经有了auto it = _exchanges.find(name);if(it != _exchanges.end()) return;Exchange::ptr exchange = make_shared<Exchange>();exchange->_name = name;exchange->_type = type;exchange->_durable = durable;exchange->_auto_delete = auto_delete;exchange->_args = args; //2.插入map进行管理_exchanges.insert(make_pair(name,exchange));//3.判断是否需要持久化if(durable) _mapper.insert(exchange);}//删除交换机void deleteExchange(const string&name){unique_lock<std::mutex> lock(_mtx);//先查是否已经存在该交换机,没有就不删除auto it = _exchanges.find(name);if(it == _exchanges.end()) return;//判断是否设置持久化 设置则移除数据库内该交换机数据if(it->second->_durable) _mapper.remove(name);//删除哈希表中交换机_exchanges.erase(name);} //判断交换机是否存在bool exists(const string&name){unique_lock<std::mutex> lock(_mtx);auto it = _exchanges.find(name);if(it == _exchanges.end()) return false;return true;}//清理所有交换机数据void clear(){unique_lock<std::mutex> lock(_mtx);_exchanges.clear();_mapper.removeTable();//???不持久化吗}//获取指定交换机对象Exchange::ptr selectExchange(const string&name){unique_lock<std::mutex> lock(_mtx);//先查是否已经存在该交换机,没有就不删除auto it = _exchanges.find(name);if(it == _exchanges.end()) return nullptr;return it->second;}size_t size(){unique_lock<std::mutex> lock(_mtx);for(auto& ex : _exchanges){cout << ex.first << endl;}return _exchanges.size();}private:std::mutex _mtx;ExchangeMapper _mapper;ExchangeMap _exchanges;};
队列数据管理
当前队列数据的管理 , 本质上是队列描述信息的管理 , 描述当前服务器上有哪些队列。
队列数据类
a. 队列名称
b. 是否持久化标志
c. 是否独占标志
d. 是否自动删除标志
e. 其他参数
struct MsgQueue{using ptr = shared_ptr<MsgQueue>; //队列名称 string _name;//是否持久化标志bool _durable; //是否独占标志bool _exclusive;//是否自动删除标志bool _auto_delete;//其他参数google::protobuf::Map<std::string, std::string> _args;MsgQueue(){}MsgQueue(const string&name,bool durable,bool exclusive,bool auto_delete,const google::protobuf::Map<std::string, std::string>& args):_name(name),_durable(durable),_exclusive(exclusive),_auto_delete(auto_delete),_args(args){}//args存储键值对,在存储数据库的时候会组织一个格式字符串进行存储 key=val&key=val...void setArgs(const string& str_args)//内部解析str_args字符串将内容存储到成员中{//1.按照&分割每一组参数kvvector<string> result;StringHelper::SplitStr(str_args,"&",result);//2.将每一组按照=分割for(auto& str : result){ //key=valsize_t pos = str.find("=");string key = str.substr(0,pos);string val = str.substr(pos+1);_args[key] = val; }}string getArgs() //将args中的内容进行序列化后,返回一个字符串{stringstream ss;for(auto& pr : _args)ss << pr.first << "=" << pr.second << "&"; return ss.str(); }};
队列数据持久化类
a. 创建 / 删除队列数据表
b. 新增队列数据
c. 移除队列数据
d. 查询所有队列数据
e. 查询指定队列数据(根据名称)
using QueueMap = std::unordered_map<string,MsgQueue::ptr>;class MsgQueueMapper{public:using ptr = shared_ptr<MsgQueueMapper>;MsgQueueMapper(const string& dbfile):_sql_helper(dbfile){//1.目录string parentDir = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(parentDir);//2.文件assert(_sql_helper.open());//3.创建表createTable();}//创建队列数据表void createTable(){//1.编写创建表语句#define CREATE_TABLE "create table if not exists queue_table(\name varchar(32) primary key,\durable int,\exclusive int,\auto_delete int,\args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE,nullptr,nullptr);if(ret == false) //创建表失败没必要再继续看{ERR_LOG("创建交换机数据表失败!");abort();}return;}//删除数据表void removeTable(){#define REMOVE_TABLE "drop table if exists queue_table"bool ret = _sql_helper.exec(REMOVE_TABLE,nullptr,nullptr);if(ret == false) //创建表失败没必要再继续看{ERR_LOG("删除交换机数据表失败!");abort();}return;}//新增队列数据bool insert(MsgQueue::ptr& queue){stringstream ss;ss << "insert into queue_table values (";ss << "'" << queue->_name << "',";ss << queue->_durable << ",";ss << queue->_exclusive << ",";ss << queue->_auto_delete << ",";ss << "'" << queue->getArgs() << "');";if(!_sql_helper.exec(ss.str(),nullptr,nullptr))ERR_LOG("插入交换数据表失败!");return true; //bug:没返回true}//删除指定队列数据void remove(const string& name){string sql = "delete from queue_table where name='" ;sql += name;sql += "';";if(!_sql_helper.exec(sql,nullptr,nullptr))ERR_LOG("删除交换机%s失败!",name.c_str());}//恢复历史数据QueueMap recovery(){QueueMap qmp;string sql = "select name,durable,exclusive,auto_delete,args from queue_table;";bool ret = _sql_helper.exec(sql,selectCallback,&qmp);if(ret == false){ERR_LOG("恢复历史数据失败!");abort();return QueueMap();}return qmp;}private://注意回调必须设置为静态否则会多个this指针参数,无法匹配static int selectCallback(void* arg,int numcol,char** row,char** fields){QueueMap* qmptr = (QueueMap*)arg;//1.构造Exchange对象MsgQueue::ptr queue = make_shared<MsgQueue>();queue->_name = row[0];queue->_durable = (bool)stoi(row[1]);queue->_exclusive = (bool)stoi(row[2]);queue->_auto_delete = (bool)stoi(row[3]);if(row[4]) //注意这里有参数才设置queue->setArgs(row[4]);//2.插入到ExchangeMap中qmptr->insert(make_pair(queue->_name,queue));//3.一定返回0return 0;}private:SqLiteHelper _sql_helper;};
队列数据内存管理类
a. 创建队列并添加管理(存在则OK,不存在则创建)
b. 删除队列
c. 获取指定队列
d. 获取所有队列
e. 销毁所有队列数据
同样的,如果队列数据设置了持久化标志需要使用持久化句柄进行持久化;同时删除的时候,如果之前设置了持久化标志,也是要从数据库删除。
class MsgQueueManager{public:using ptr = shared_ptr<MsgQueueManager>;MsgQueueManager(const string& dbfile):_mapper(dbfile){_msg_queues = _mapper.recovery();}//1.声明队列bool declareQueue(const string& qname,bool qdurable,bool qexclusive,bool qauto_delete, const google::protobuf::Map<std::string, std::string>& args){unique_lock<std::mutex> lock(_mtx);//1.构造交换机对象//判断是否已经有了auto it = _msg_queues.find(qname);if(it != _msg_queues.end()) return true;MsgQueue::ptr queue = make_shared<MsgQueue>(qname,qdurable,qexclusive,qauto_delete,args);//2.插入map进行管理_msg_queues.insert(make_pair(qname,queue));//3.判断是否需要持久化if(qdurable){bool ret = _mapper.insert(queue);if(!ret){ERR_LOG("持久化失败")return false;}}return true;}//2.删除队列void deleteQueue(const string& name){unique_lock<std::mutex> lock(_mtx);//先查是否已经存在该交换机,没有就不删除auto it = _msg_queues.find(name);if(it == _msg_queues.end()) return;//判断是否设置持久化 设置则移除数据库内该交换机数据if(it->second->_durable) _mapper.remove(name);//删除哈希表中交换机_msg_queues.erase(name);}//3.获取指定队列MsgQueue::ptr selectQueue(const string& name){unique_lock<std::mutex> lock(_mtx);//先查是否已经存在该交换机,没有就不删除auto it = _msg_queues.find(name);if(it == _msg_queues.end()) return nullptr;return it->second;}//4.获取所有队列QueueMap allQueues(){unique_lock<std::mutex> lock(_mtx);return _msg_queues;}//指定队列是否存在bool exists(const string& name){unique_lock<std::mutex> lock(_mtx);auto it = _msg_queues.find(name);if(it == _msg_queues.end()) return false;return true;}//队列个数size_t size(){unique_lock<std::mutex> lock(_mtx);return _msg_queues.size();}//清空void clear(){unique_lock<std::mutex> lock(_mtx);_msg_queues.clear();_mapper.removeTable();}private:std::mutex _mtx;MsgQueueMapper _mapper;QueueMap _msg_queues; };
绑定数据管理
绑定信息本质上就是一个交换机关联了哪些队列的模块。
绑定信息类
a. 交换机名称
b. 队列名称
c. binding_key(分发匹配规则 --- 决定了哪些数据能被交换机放入队列)
//绑定信息类struct Binding{using ptr = shared_ptr<Binding>;string _exchange_name;//交换机名称string _msgqueue_name;//队列名称string _binding_key; //绑定信息Binding();Binding(const string& ename,const string& qname, const string& key):_exchange_name(ename),_msgqueue_name(qname),_binding_key(key){}};
绑定信息持久化类
a. 创建 / 删除绑定信息数据表
b. 新增绑定信息数据
c. 移除指定绑定信息数据
d. 移除指定交换机相关绑定信息数据 : 移除交换机的时候会被调用
e. 移除指定队列相关绑定信息数据:移除队列的时候会被调用
f. 查询所有绑定信息数据:用于重启服务器时进行历史数据恢复
g. 查询指定绑定信息数据 (根据交换机-队列名称)
using MsgQueueBindMap = std::unordered_map<string,Binding::ptr>; //队列名称,绑定信息using BindingMap = std::unordered_map<string,MsgQueueBindMap>;class BindingMapper{public:BindingMapper(const string& dbfile):_sql_helper(dbfile){//看数据文件的目录是否创建string parentDir = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(parentDir);//打开数据库文件assert(_sql_helper.open());//创建表createTable();}void createTable(){stringstream sql;sql << "create table if not exists binding_table(";sql << "exchange_name varchar(32)," ;sql << "queue_name varchar(32),";sql << "binding_key varchar(128));";assert(_sql_helper.exec(sql.str(),nullptr,nullptr));}void removeTable(){string sql = "drop table if exists binding_table; ";assert(_sql_helper.exec(sql,nullptr,nullptr));}//新增绑定信息bool insert(Binding::ptr& binding){//1.根据绑定信息对象编写sql语句stringstream sql;sql << "insert into binding_table values(";sql << "'" <<binding->_exchange_name << "',";sql << "'" << binding->_msgqueue_name << "',";sql << "'" << binding->_binding_key << "');";//2.执行语句return _sql_helper.exec(sql.str(),nullptr,nullptr);}//根据交换机-队列移除指定绑定信息void remove(const string& ename,const string& qname){stringstream sql;sql << "delete from binding_table where exchange_name='";sql << ename << "' and queue_name='";sql << qname << "';";_sql_helper.exec(sql.str(),nullptr,nullptr);}//移除指定交换机的所有绑定信息void removeExchangeBinding(const string& ename){stringstream sql;sql << "delete from binding_table where exchange_name='";sql << ename << "';";_sql_helper.exec(sql.str(),nullptr,nullptr);}//移除指定队列的绑定信息void removeMsgQueueBinding(const string& qname){stringstream sql;sql << "delete from binding_table where queue_name='";sql << qname <<"'" << ";";_sql_helper.exec(sql.str(),nullptr,nullptr);}//恢复历史数据BindingMap recovery(){string sql = "select exchange_name,queue_name,binding_key from binding_table;";BindingMap bmp;_sql_helper.exec(sql,selectCallback,&bmp);return bmp;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields){//注意这里我们需要先使用引用获取MsgQueueMap否则根本没有插入数据,而且是[]BindingMap* bmptr = (BindingMap*)arg;MsgQueueBindMap& mqbp = (*bmptr)[row[0]]; //不存在则创建Binding::ptr bp = make_shared<Binding>(row[0],row[1],row[2]);mqbp[bp->_msgqueue_name] = bp;return 0;}private:SqLiteHelper _sql_helper; };
- 队列与绑定信息是一一对应的(因为是给某个交换机绑定队列),因此一个交换机可能会有多个队列的绑定信息
- 基于这个思想,我们先定义一个队列名,与绑定信息binding_key的映射关系MsgQueueBindMap,这样能方便通过队列名查找binding_key;然后定义一个交换机名称与队列绑定信息的映射关系BindingMap ,这个map中包含了所有绑定信息,并且以交换机为单元进行区分。
- 如果是建立两个map,即交换机与绑定信息的映射&&队列与绑定信息的映射,在删除交换机相关绑定信息的时候,不仅要删除交换机映射,还要删除对应队列中中的映射,否则对象得不到释放,这样是比较麻烦的。
绑定信息内存管理类
a. 创建绑定信息,并添加管理(存在则OK,不存在则创建)
b. 解除指定的绑定信息
c. 删除指定队列的所有绑定信息
d. 删除交换机相关的所有绑定信息
e. 获取交换机相关的 所有绑定信息 : 交换机收到消息后,需要分发给自己队列
f. 判断指定绑定信息是否存在
g. 获取当前绑定信息数量
h. 销毁所有绑定信息数据
class BindingManager{public:using ptr = shared_ptr<BindingManager>;BindingManager(const string& dbfile):_mapper(dbfile){_bindings = _mapper.recovery();}//新增一条绑定bool bind(const string& ename,const string& qname,const string& key,bool durable){unique_lock<std::mutex> lock(_mtx);//1.先查找是否已经新增绑定了auto it = _bindings.find(ename);if(it != _bindings.end() && it->second.find(qname) != it->second.end())return true;//2.构造绑定信息对象:绑定信息是否需要持久化 取决于交换机数据是否持久化Binding::ptr bp = make_shared<Binding>(ename,qname,key);if(durable) //交换机数据持久化那么绑定信息也要持久化{bool ret = _mapper.insert(bp);if(!ret) return false;}//3.将绑定信息添加到内存管理auto& qbmp = _bindings[ename];//注意这里用的是引用qbmp.insert(make_pair(qname,bp));return true;}//移除绑定void unBind(const string& ename,const string& qname){unique_lock<std::mutex> lock(_mtx);//1.查找交换机auto eit = _bindings.find(ename); if(eit == _bindings.end()) return;//没有交换机绑定信息//2.查找队列auto qit = eit->second.find(qname);if(qit == eit->second.end()) return;//3.查找到eit->second.erase(qname);//注意数据里的也要删除_mapper.remove(ename,qname);}//移除指定交换机的绑定信息void removeExchangeBindings(const string& ename){unique_lock<std::mutex> lock(_mtx);auto eit = _bindings.find(ename); if(eit == _bindings.end()) return;//没有交换机绑定信息_bindings.erase(ename);//注意数据库里的也要删除_mapper.removeExchangeBinding(ename);}//移除指定队列的绑定信息void removeMsgQueueBindings(const string& qname){unique_lock<std::mutex> lock(_mtx);auto start = _bindings.begin();for(;start != _bindings.end() ;start++)start->second.erase(qname);_mapper.removeMsgQueueBinding(qname);}//获取指定交换机的绑定信息MsgQueueBindMap getExchangeBindings(const string& ename){unique_lock<std::mutex> lock(_mtx);auto eit = _bindings.find(ename); if(eit == _bindings.end()) return MsgQueueBindMap();//没有交换机绑定信息return eit->second;}//获取指定队列的绑定信息Binding::ptr getBinding(const string& ename,const string& qname){unique_lock<std::mutex> lock(_mtx);//1.查找交换机auto eit = _bindings.find(ename); if(eit == _bindings.end()) return nullptr;//没有交换机绑定信息//2.查找队列auto qit = eit->second.find(qname);if(qit == eit->second.end()) return nullptr;//3.查找到return qit->second;}//bool exists(const string& ename,const string& qname){unique_lock<std::mutex> lock(_mtx);//1.查找交换机auto eit = _bindings.find(ename); if(eit == _bindings.end()) return false;//没有交换机绑定信息//2.查找队列auto qit = eit->second.find(qname);if(qit == eit->second.end()) return false;//3.查找到return true;}//绑定信息的个数size_t size(){unique_lock<std::mutex> lock(_mtx);size_t total = 0;auto start = _bindings.begin();for(;start != _bindings.end() ;start++)total += start->second.size();return total;}//void clear(){unique_lock<std::mutex> lock(_mtx);_bindings.clear();_mapper.removeTable();}private:std::mutex _mtx; BindingMapper _mapper;BindingMap _bindings;};
消息管理
我们的消息是有持久化的需求的,因为有可能你服务器运行期间暂时没有消费者来消费这条消息,等到下次启动时有人来消费时,就需要根据持久化的数据恢复历史消息。
消息的持久化管理,我们不使用数据库,因为有些消息庞大不适合用数据库存储,其次消息的持久化是为了备份,不是为了查询,因此直接使用普通文件进行存储即可。具体的方案如下:
1. 以队列为单元进行持久化管理:即每个队列都要有一个自己的数据文件。当对消息文件进行垃圾回收时,需要重新加载所有的有效消息重新生成新的数据文件,但是这会导致消息的存储位置改变,所有队列的消息位置都会变 , 需要更新内存中的数据,此时就需要将所有队列数据进行加锁("全局锁",阻塞所有队列,并发性能极差),然后进行更新,这无疑导致锁冲突频繁效率较低,因此如果每个队列都有自己的独立数据文件,每次只需要对操作的队列数据进行更新即可。
2. 消息长度(4字节)+ 消息主题(若干字节):数据存储在文件中就需要约定格式,通过描述消息实际存储长度来解决粘包问题。
向外提供的操作:
- 消息文件的创建与删除
- 消息的新增持久化 / 删除消息的持久化(并不是真正删除只是将标志位置为无效)
- 历史数据恢复 / 垃圾回收
垃圾回收的时机:因为每次删除数据都不是真正删除,因此文件中的数据会越来越多。但是也不是每次删除都需要回收,当文件中有效消息超过两千条,且其中有效消息比例低于50%就进行回收。
垃圾回收的思想:先删除原文件再将数据写入新的数据文件,这无疑是不安全的,因为可能写一半时发生故障(比如进程被kill掉、文件系统崩溃、机器断电等)都会让新文件不完整,而旧文件已经被删除,导致数据丢失。真正安全的做法是先提取出有效的消息链表,然后写入到一个临时文件中,写完再删除源文件,将临时文件名改为源文件名(重命名是原子的),最后返回有效消息链表。
消息持久化管理
需要管理的数据:
- 队列名
- 根据队列名生成数据文件名称:
队列名.mqd - 根据队列名生成临时文件名称:
队列名.mqd.tmp
#define DATAFILLE_SUBFIX ".mqd" //数据文件后缀#define TMPFILE_SUBFIX ".mqd.tmp" //临时文件后缀using MessagePtr = shared_ptr<zmq::Message>;class MessageMapper{ public:MessageMapper(string& basedir,const string&qname):_qname(qname){//1.是否最后是路径分割符if(basedir.back() != '/') basedir += '/';//2.生成数据文件名和临时文件名_datafile = basedir + qname + DATAFILLE_SUBFIX;_tmpfile = basedir + qname + TMPFILE_SUBFIX;//3.判断basedir目录是否创建if(FileHelper(basedir).exists() == false)assert(FileHelper::createDirectory(basedir));//4.创建数据文件 assert(createMsgFile());}//创建消息文件bool createMsgFile(){if(FileHelper(_datafile).exists() == true)return true;bool ret = FileHelper::createFile(_datafile);if(ret == false){ERR_LOG("创建数据文件%s失败!",_datafile.c_str());return false;} return true;}//删除消息文件:注意不删除basedir,因为可能有其他的队列数据文件void removeMsgFile(){//1.删除数据文件FileHelper::removeFile(_datafile);//2.删除临时文件FileHelper::removeFile(_tmpfile);}//消息的新增持久化bool insert(MessagePtr& msg){return insert(_datafile,msg);}//消息的删除持久化bool remove( MessagePtr& msg){//1.将文件中的有效标记位设置为'0'msg->mutable_payload()->set_valid("0");//2.对msg重新进行序列化string body = msg->payload().SerializeAsString();//注意:如果跟原来的不一样大小返回false 不能影响到其他消息if(body.size() != msg->length()){ERR_LOG("不能修改文件中的数据信息,因为新生成的数据与原数据长度不一致!");return false;}//3.覆盖到文件中FileHelper helper(_datafile);bool ret = helper.write(body.c_str(),msg->offset(),body.size());if(ret == false){ERR_LOG("向队列%s删除持久化消息失败!",_qname.c_str());return false;}return true;}//历史数据恢复/垃圾回收list<MessagePtr> gc(){list<MessagePtr> result;size_t msg_size = 0;//消息长度size_t offset = 0 ;//遍历文件提取有效消息的指针FileHelper helper(_datafile);std::size_t fsize = helper.size();//1.加载文件中的所有有效数据while(offset < fsize){//1.1先读取4字节长度数据bool ret = helper.read((char*)&msg_size,offset,sizeof(size_t));if(ret == false){ERR_LOG("队列%s读取4字节长度失败!!",_qname.c_str());return list<MessagePtr>();}offset += sizeof(std::size_t);//往前移动size_t的长度字节std::size_t msg_offset = offset;//1.2再读取消息有效载荷string body(msg_size,'\0');ret = helper.read(&body[0],offset,msg_size);if(ret == false){ERR_LOG("队列%s读取消息失败!!",_qname.c_str());return result;}offset += msg_size; //往前偏移消息长度//1.3对消息的有效载荷反序列化MessagePtr msg = make_shared<Message>();msg->mutable_payload()->ParseFromString(body);//这里其实不能设置offset,因为这个offset是不准确的,毕竟前面可能有valid为1的消息//而且这里也用不着设置,因为后面写入临时文件的时候调用insert设置了// msg->set_length(msg_size);// msg->set_offset(msg_offset);//1.4根据valid标志位决定是否丢弃if(msg->payload().valid() == "0") continue;result.push_back(msg);}//2.将有效数据序列化存储到临时文件//注意这里要先创建出临时文件FileHelper::createFile(_tmpfile);for(auto& messsge : result){bool n = insert(_tmpfile,messsge);;if(n == false){ERR_LOG("向临时文件%s写入数据失败",_tmpfile.c_str());return result;}}//3.删除源文件bool ret = FileHelper::removeFile(_datafile);if(ret == false){ERR_LOG("删除源文件%s失败!",_datafile.c_str());return result;}//4.修改临时文件名为u源文件名称ret = FileHelper(_tmpfile).rename(_datafile);if(ret == false){ERR_LOG("临时文件%s重命名失败!",_tmpfile.c_str());return result;}//5.返回新的有效数据return result;}private: bool insert(const string& filename,MessagePtr& msg){//1.先对消息的有效载荷进行序列化获取格式化后的消息string body = msg->payload().SerializeAsString();//2.获取文件长度和消息长度FileHelper helper(filename);size_t fsize = helper.size();size_t msg_size = body.size();//3.先写入4字节长度 再写入消息bool ret = helper.write((char*)&msg_size,fsize,sizeof(size_t));if(ret == false){ERR_LOG("队列%s新增消息写入消息长度失败!",_qname.c_str());return false;}ret = helper.write(body.c_str(),fsize+sizeof(std::size_t),msg_size);if(ret == false){ERR_LOG("队列%s新增消息写入消息失败!",_qname.c_str());return false;}//4.更新messgaePtr中的实际存储位置msg->set_offset(fsize+sizeof(size_t));msg->set_length(msg_size);return true;}private:string _qname;string _datafile;string _tmpfile; };
队列单元消息管理
如果内存中对所有的消息整体进行管理,则在进行垃圾回收以及恢复历史消息上就会变得麻烦。因此每个队列都有一个消息数据的管理结构,最终向外提供一个总体的消息管理类。
队列消息管理:
- 构造对象时:创建/打开队列数据文件,恢复队列历史消息数据
- 新增消息 / 确认消息(确认消息时如果消息是持久化过了的需要置标记位为1,同时确认是否进行垃圾回收)
- 垃圾回收:当持久化数据总量超过2000,且有效比例低于50%则进行垃圾回收
- 获取队首消息推送给客户端
- 删除队列所有消息
- 获取待推送消息数量
- 获取待确认消息数量
- 获取持久化消息数量
需要管理的数据:
- 持久化的管理句柄
- 待推送消息链表:以头插尾删的思想实现队列功能
- 持久化消息的哈希map:垃圾回收后需要更新消息数据(需要更新实际存储位置),用链表的话效率低,而且我们用的是智能指针,不管有几个链表几个哈希,它们保存的智能指针都指向同一个对象,一个地方修改其他地方也修改。
- 待确认消息的hashmap : 一条消息推送给客户端,并不会立即真正删除,而是等到被确认后才会删除。一条消息被推送到客户端后,取出待推送链表,加入到待确认结构中,等到确认后再删除。
- 持久化文件中有效消息数量
- 持久化文件中总体消息数量:和有效消息数量搭配可以计算出文件中有效消息比例(根据比例和总体数量决定是否进行垃圾回收)
- 队列名称
///队列单元消息管理class QueueMessage{public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage( string& basedir,const string& qname):_qname(qname),_valid_count(0),_total_count(0),_mapper(basedir,qname){}bool recovery(){std::unique_lock<std::mutex> lock(_mtx);//1.调用gc()来进行垃圾回收恢复历史消息_msgs = _mapper.gc();//2.将恢复后的消息插入到持久化map中for(auto& message : _msgs)_durable_msgs.insert(make_pair(message->payload().properties().id(), message));//3.更新valid_count 和 _total_count其实都等于恢复的消息数量_valid_count = _total_count = _msgs.size(); }//向队列新增消息bool insert(const BasicProperties* bp,const string& body,bool is_queue_durable){//1.根据传入的参数来构造消息对象MessagePtr msg = make_shared<Message>();msg->mutable_payload()->set_body(body);//1.1判断属性是否设置 没有我们需要手动设置属性if(bp != nullptr){ //前提是队列允许持久化zmq::DeliveryMode mode = is_queue_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());}else //属性为空->队列不持久化消息也不持久化,没有意义 {zmq::DeliveryMode mode = is_queue_durable ? zmq::DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(uuidHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}//2.判断消息是否需要持久化std::unique_lock<std::mutex> lock(_mtx);if(msg->payload().properties().delivery_mode() == zmq::DeliveryMode::DURABLE){//2.1设置valid标志位为1msg->mutable_payload()->set_valid("1");//2.2使用持久化句柄进行持久化存储bool ret = _mapper.insert(msg);if(ret == false){ERR_LOG("队列%s持久化消息失败",_qname.c_str());return false;}//2.3更新持久化map和两个计数器_durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));_valid_count++;_total_count++;}//3.将消息放到内存中进行管理_msgs.push_back(msg);// cout << "_msgs size:" << _msgs.size() << endl;return true;}//确认/删除消息:每次删除消息后,判断是否需要进行垃圾回收bool remove(const string& msg_id){std::unique_lock<std::mutex> lock(_mtx);//1.从待确认hashmap中查找消息auto it = _waitack_msgs.find(msg_id);if(it == _waitack_msgs.end()){DBG_LOG("未找到要删除的的消息:%s",msg_id.c_str());return true;} //2.判断该消息的模式是否为持久化if(it->second->payload().properties().delivery_mode() == zmq::DeliveryMode::DURABLE){//2.1调用持久化句柄将消息删除_mapper.remove(it->second);//2.2 从durable_msgs中删除 并且更新valid_count_durable_msgs.erase(msg_id);_valid_count--;//2.3判断是否要垃圾回收,需要则垃圾回收更新消息存储位置和计数器gc();}//3.从内存中移除该消息_waitack_msgs.erase(msg_id);}//获取队首消息推送给客户端MessagePtr front(){unique_lock<std::mutex> lock(_mtx);//1.先判断队内是否有消息if(_msgs.size() == 0) {DBG_LOG("队列%s目前没有消息可以消费",_qname.c_str());return MessagePtr();}//2.获取队首消息MessagePtr msg = _msgs.front();//注意这里不能使用引用来接收队首元素,因为这个智能指针被释放了pop_front,注意不是指向对象释放!!_msgs.pop_front();//3.向待确认hash表中添加一份,等待收到消息后再从hash表中删除_waitack_msgs.insert(make_pair(msg->payload().properties().id(), msg));return msg;}//删除队列所有消息void clear(){unique_lock<std::mutex> lock(_mtx);_mapper.removeMsgFile();_msgs.clear();_durable_msgs.clear();_waitack_msgs.clear();_valid_count = _total_count = 0;}//获取待推送消息数量size_t getable_count(){unique_lock<std::mutex> lock(_mtx);return _msgs.size();}//获取所有持久化消息数量size_t total_count(){unique_lock<std::mutex> lock(_mtx);return _total_count;}//获取持久化消息数量size_t durable_count(){unique_lock<std::mutex> lock(_mtx);return _durable_msgs.size();}//获取待确认消息数量size_t waitack_count(){unique_lock<std::mutex> lock(_mtx);return _waitack_msgs.size();}private:bool GCCheck() //判断是否需要进行垃圾回收{//持久化的消息总量大于2000 且其中的有效比例低于50%则需要持久化if(_total_count > 2000 && _valid_count*10/_total_count < 5)return true;return false;}void gc() //垃圾回收的逻辑{//1.先判断一下是否需要进行垃圾回收if(GCCheck() == false) return;//2.调用持久化句柄进行垃圾回收list<MessagePtr> result = _mapper.gc();//3.根据有效消息链表更新实际存储位置,没有在内存中添加管理的则重新添加管理for(auto& message : result){auto it = _durable_msgs.find(message->payload().properties().id());if(it == _durable_msgs.end()){ //没有添加管理的重新推送至待推送链表&&加入duarable_msgsDBG_LOG("垃圾回收后有一条消息没有持久化存储!");_durable_msgs.insert(make_pair(message->payload().properties().id(),message));_msgs.push_back(message);continue; }//更新实际存储位置it->second->set_offset(message->offset());it->second->set_length(message->length());}//4.更新计数器_valid_count = _total_count = result.size();}private:std::mutex _mtx; //互斥锁保证两个map和待推送链表的线程安全string _qname;//队列名称size_t _valid_count;//持久化文件中有效消息数量size_t _total_count; //持久化文件中总体消息数量MessageMapper _mapper; //持久化的管理句柄list<MessagePtr> _msgs;//待推送消息std::unordered_map<string,MessagePtr> _durable_msgs;//持久化消息hash(方便垃圾回收更新消息实际存储位置)unordered_map<string,MessagePtr> _waitack_msgs; //待确认消息hashmap(用于对消息确认,确认后删除消息)};
- 消息持久化的前提是队列也持久化,如果属性设置并且队列也持久化,此时设置消息投递模式为传入的参数;如果属性没设置,则队列没持久化,消息就不持久化。
- 持久化hash更新时机:1. 队列新增消息 2. 确认队列消息(如果该消息是持久化了的)
- 待确认hash更新时机:1. 获取队首消息推送给客户端 2. 确认队列消息
- 与durable_count不同的是,队列确定一条消息durable_count就会减少,而total_count不一定因为可能不会垃圾回收。
总体队列消息管理
管理成员:
- 每个队列的消息管理句柄:队列名称&队列消息管理句柄的hash表
- 互斥锁
- 所有消息文件存放目录
提供的操作:
- 创建队列的时候初始化队列的消息管理句柄:创建队列的时候调用
- 销毁队列的消息管理句柄:删除队列的时候调用
- 队列的各项消息操作:
(1)向队列新增消息
(2)获取队列队首消息
(3)对队列进行消息确认
(4)获取队列消息数量:可获取消息数量,持久化消息数量,待确认消息数量,总的持久化消息数量
(5)恢复队列历史消息
//队列单元消息总体管理(队列--消息单元管理句柄)class MessageManager{public:using ptr = shared_ptr<MessageManager>;MessageManager(const string& basedir):_basedir(basedir){}void clear(){unique_lock<std::mutex> lock(_mtx);for(auto& message : _queue_msgs)message.second->clear(); }//创建队列初始化队列消息管理句柄void initQueueMessage(const string& qname){//1.查找消息管理句柄,有则返回QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);auto it = _queue_msgs.find(qname);if(it != _queue_msgs.end()) return;qmp = make_shared<QueueMessage>(_basedir,qname);//2.没有则构造 然后添加管理_queue_msgs.insert(make_pair(qname, qmp));}//恢复队列历史数据qmp->recovery();}//销毁队列消息管理句柄void destroyQueueMessage(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()) return;//3.删除句柄qmp = it->second; //注意不能先删除迭代器再赋值_queue_msgs.erase(it);}//2.利用句柄删除数据qmp->clear();}//向指定队列新增消息bool insert(const string& qname,BasicProperties* bp,const string& body,bool is_queue_durable){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,新增消息失败!",qname.c_str());return false;}qmp = it->second;}//2.新增return qmp->insert(bp, body,is_queue_durable);}//向指定队列获取队首消息MessagePtr front(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,获取队首消息失败!",qname.c_str());return MessagePtr();}qmp = it->second;}//调用句柄的front;return qmp->front();}//对指定队列进行消息确认void ack(const string& qname,const string& msg_id){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,对消息%s确认失败!",qname.c_str(),msg_id.c_str());return;}qmp = it->second;}qmp->remove(msg_id);}//获得指定队列的待推送消息数量size_t getable_count(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,获取getable_count失败",qname.c_str());return 0;}qmp = it->second;}return qmp->getable_count();}//获得指定队列持久化文件中总的持久化消息数量//(与durable_count不同的是,队列确定一条消息durable_count就会减少,// 而total_count不一定因为可能不会垃圾回收)size_t total_count(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,获取total_count失败",qname.c_str());return 0;}qmp = it->second;}return qmp->total_count();}//获得指定队列的持久化消息数量size_t durable_count(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,获取durable_count失败",qname.c_str());return 0;}qmp = it->second;}return qmp->durable_count();} //获得指定队列的待推送消息数量size_t waitack_count(const string& qname){QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.查找消息管理句柄auto it =_queue_msgs.find(qname);if(it == _queue_msgs.end()){ERR_LOG("没有找到队列%s的消息管理句柄,获取waitack__count失败",qname.c_str());return 0;}qmp = it->second;}return qmp->waitack_count();}private:std::mutex _mtx;string _basedir;std::unordered_map<string,QueueMessage::ptr> _queue_msgs;//队列名称-队列消息管理句柄 };
- 服务器启动的时候我们需要恢复历史消息,那我们就需要知道存在哪些队列,之前的MsgQueueManager类提供了从数据库获取所有队列信息的接口,能让我们知道所有的队列名称,但是这样做模块之间会有耦合,因此我们在整合虚拟机的时候再根据队列信息进行初始化队列。
initQueueMessage():
//创建队列初始化队列消息管理句柄void initQueueMessage(const string& qname){//1.查找消息管理句柄,有则返回{unique_lock<std::mutex> lock(_mtx);auto it = _queue_msgs.find(qname);if(it != _queue_msgs.end()) return;}//2.没有则构造 然后添加管理QueueMessage::ptr qmp = make_shared<QueueMessage>(_basedir,qname);{unique_lock<std::mutex> lock(_mtx);_queue_msgs.insert(make_pair(qname, qmp));}}
原本初始化队列消息管理,我们是先查找句柄再添加管理,但是这样做添加管理这两个操作不是原子性的(有可能查找的时候里面已经插入了),因为qmp构造的时候是需要恢复历史消息的,这个过程是比较耗时的,因此解决方案是将qmp构造函数里的历史消息恢复封装成一个单独的接口,等添加管理后再调用进行恢复历史消息:
class QueueMessage{public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage( string& basedir,const string& qname):_qname(qname),_mapper(basedir,qname){}bool recovery(){std::unique_lock<std::mutex> lock(_mtx);//1.调用gc()来进行垃圾回收恢复历史消息_msgs = _mapper.gc();//2.将恢复后的消息插入到持久化map中for(auto& message : _msgs)_durable_msgs.insert(make_pair(message->payload().properties().id(), message));//3.更新valid_count 和 _total_count其实都等于恢复的消息数量_valid_count = _total_count = _msgs.size(); }
//创建队列初始化队列消息管理句柄void initQueueMessage(const string& qname){//1.查找消息管理句柄,有则返回QueueMessage::ptr qmp;{unique_lock<std::mutex> lock(_mtx);auto it = _queue_msgs.find(qname);if(it != _queue_msgs.end()) return;qmp = make_shared<QueueMessage>();//2.没有则构造 然后添加管理_queue_msgs.insert(make_pair(qname, qmp));}//恢复队列历史数据qmp->recovery();}
虚拟机管理
虚拟机模块是对上述三个数据管理模块的整合 , 并基于数据之间的关联关系进行联合操作。
1. 定义虚拟机类包含以下成员:
(1) 交换机数据管理模块句柄
(2) 队列数据管理模块句柄
(3) 绑定数据管理模块句柄
(4) 消息数据管理模块句柄
(5) 虚拟机名称
2. 虚拟机包含操作 :
(1) 提供声明交换机的功能 (存在则OK否则创建)
(2) 提供删除交换机的功能 (删除交换机的同时删除相关的绑定信息)
(3) 提供声明队列的功能(存在则OK不存在则创建,创建的同时创建队列关联消息管理对象)
(4) 提供删除队列的功能 (删除队列的同时删除关联绑定信息,删除关联消息管理对象以及队列所有消息)
(5) 提供交换机-队列绑定的功能
(6) 提供交换机-队列解绑的功能
(7) 提供获取交换机相关的所有绑定信息功能
(8) 提供新增消息的功能
(9) 提供获取指定队列队首消息的功能
(10) 提供消息确认删除删除的功能
3. 虚拟机管理操作 : 虚拟机的增删查
整个实现思想其实就是要实现什么服务,就利用相关句柄完成即可:
class VirtualHost{public:using ptr = shared_ptr<VirtualHost>;VirtualHost(const string& host_name,const string& basedir,const string& dbfile):_host_name(host_name),_emp(make_shared<ExchangeManager>(dbfile)),_mqmp(make_shared<MsgQueueManager>(dbfile)),_bmp(make_shared<BindingManager>(dbfile)),_mmp(make_shared<MessageManager>(basedir)){//获取所有队列信息QueueMap queues = _mqmp->allQueues();//恢复队列消息for(auto& queue : queues)_mmp->initQueueMessage(queue.first);}//声明交换机void declareExchange(const string& ename,ExchangeType etype,bool edurable,bool eauto_delete,const google::protobuf::Map<std::string, std::string>&eargs){return _emp->declareExchange(ename, etype, edurable, eauto_delete,eargs);}//删除交换机void deleteExchange(const string& ename){//删除交换机的时候删除相关的绑定信息_bmp->removeExchangeBindings(ename);return _emp->deleteExchange(ename);}//声明队列bool declareQueue(const string& qname,bool qdurable,bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string>& qargs){//1.初始化队列消息管理句柄_mmp->initQueueMessage(qname);//2.队列创建return _mqmp->declareQueue(qname,qdurable,qexclusive,qauto_delete,qargs);}//删除队列void deleteQueue(const string& qname){//1.删除队列消息_mmp->destroyQueueMessage(qname);//2.删除队列相关绑定信息_bmp->removeMsgQueueBindings(qname);//3.删除队列_mqmp->deleteQueue(qname);}//绑定队列-交换机bool bind(const string& ename,const string& qname,const string& key){//1.交换机是否存在Exchange::ptr exchange =_emp->selectExchange(ename);if(exchange == nullptr){ERR_LOG("交换机%s不存在无法绑定!",ename.c_str());return false;}//2.队列是否存在MsgQueue::ptr msg = _mqmp->selectQueue(qname);if(msg == nullptr){ERR_LOG("队列%s不存在无法绑定!",qname.c_str());return false;}//3.绑定return _bmp->bind(ename, qname, key, exchange->_durable & msg->_durable);}//解绑队列-交换机void unBind(const string& ename,const string& qname){return _bmp->unBind(ename,qname);}bool existExchange(const string& ename){return _emp->exists(ename);} bool existMsgQueue(const string& qname){return _mqmp->exists(qname);}bool existBinding(const string& ename,const string& qname){return _bmp->exists(ename,qname);}//获取指定交换机的绑定信息MsgQueueBindMap exchangeBindings(const string& ename){return _bmp->getExchangeBindings(ename);}//在指定队列发布一条消息bool publish(const string& qname,BasicProperties* bp,const string& body){//1.获取指定队列持久化信息MsgQueue::ptr queue = _mqmp->selectQueue(qname);if(queue.get() == nullptr){ERR_LOG("指定队列%s不存在,发布消息失败!",qname.c_str());return false;}//2.发布消息return _mmp->insert(qname, bp, body,queue->_durable);}//在指定队列队首取出一条消息MessagePtr consume(const string& qname){return _mmp->front(qname);}//在指定队列确实/删除一条消息void ack(const string& qname,const string& msg_id){return _mmp->ack(qname,msg_id);}void clear(){_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();}Exchange::ptr selectExchange(const string& ename){return _emp->selectExchange(ename);}MsgQueue::ptr selectMsgQueue(const string& qname){return _mqmp->selectQueue(qname);}QueueMap allQueues(){return _mqmp->allQueues();}private:string _host_name;ExchangeManager::ptr _emp; //交换机数据管理句柄MsgQueueManager::ptr _mqmp; //队列数据管理句柄BindingManager::ptr _bmp; //绑定信息数据管理句柄MessageManager::ptr _mmp; //消息数据管理句柄 };
交换路由模块
客户端将消息发布到指定的交换机,交换机这时候要考虑这条数据该放入到哪些与自己绑定的队列中,而这个考量是通过交换机类型以及匹配规则来决定的:
- 广播交换:直接将消息交给所有绑定的队列,无需匹配。
- 直接交换:队列绑定信息中的
binding_key与消息中的routing_key一致则匹配成功,否则失败。 - 主题交换:只有匹配队列主题的消息才会被放入队列中。
binding_key:是由数字字母下划线构成的,并且使用.分成若干部分,并支持*和#通配符。例:news.music.#两种通配符,但是*#只能作为.切分出来的独立部分,不能和其他数字字母混用。
- 支持
*和#两种通配符,但是*#只能作为切分出来的独立部分,不能和其他数字字母混用
- 比如
a.*.b是合法的,a.*a.b是不合法的。 *可以匹配任意一个单词(注意是单词不是字母)#可以匹配任意零个或多个单词。(注意是单词不是字母)
注意事项:
(1)一个单词中不能既出现*又出现#,也就是,一个单词中只能有一个通配符,且必须独立存在
(2)*和#通配符不能连续出现,因为#可以匹配任意多个任意单词,因此连续出现是没有意义的(*和*可以连续出现)。
routing_key :是由数据、字母和下划线组成,并且可以使用.划分成若干部分。例如 news.music.pop,这用于表示当前发布的消息是一个流行音乐的新闻。

取决要素:
- 交换机类型
routing_key与binding_key的匹配
基于功能需求分析,路由交换模块只需要对传入的数据进行处理即可,因此这个模块要实现实际上是功能接口类,没有成员变量。因此提供的功能有:
- 判断
routing_key是否合法:a-z,A-Z,0-9,._组成 - 判断
binding_key是否合法:必须是a-z,A-Z,0-9,*,#,._组成 -
进行routing_key与binding_key路由匹配
class Router{public://判断传入的的routing_key是否合法 static bool isLegalRoutingKey(const string& routing_key){for(const auto& ch : routing_key){if(ch >= 'a' && ch <= 'z') continue;if(ch >= 'A' && ch <= 'Z') continue;if(ch >= '0' && ch <= '9') continue;if(ch == '.' || ch == '_') continue;return false;}return true;}//判断传入的binding_key是否合法static bool isLegalBindingKey(const string& binding_key){//1.判断是否含有非法字符for(const auto& ch : binding_key){if(ch >= 'a' && ch <= 'z') continue;if(ch >= 'A' && ch <= 'Z') continue;if(ch >= '0' && ch <= '9') continue;if(ch == '.' || ch == '_' || ch == '*' || ch == '#') continue;return false;}//2.判断两个通配符是否独立存在(即判断一个单词的长度是否大于1且含有*或#)vector<string> bind_strs;StringHelper::SplitStr(binding_key,".", bind_strs);for(auto& str : bind_strs){ //news.a#.bbif(str.size()>1 && (str.find('#')!=string::npos || str.find('*')!=string::npos))return false;}//3.判断两个通配符是否连续出现for(int i = 1 ; i < bind_strs.size() ; i++){if(bind_strs[i] == "#" && bind_strs[i-1] == "*") return false;else if(bind_strs[i] == "#" && bind_strs[i-1] == "#") return false;else if(bind_strs[i] == "*" && bind_strs[i-1] == "#") return false; }return true;}//路由:根据交换机类型和匹配规则进行路由,即消息应该放到哪队列static bool route(zmq::ExchangeType type,const string& routing_key,const string& binding_key){//1.DIRECT:直接返回binding_key和routing_key是否相同if(type == ExchangeType::DIRECT) return binding_key == routing_key;//2.FANOUT:返回true --- 广播if(type == ExchangeType::FANOUT) return true;//3.主题交换:动态规划vector<string> bind_strs;vector<string> route_strs;StringHelper::SplitStr(binding_key,".", bind_strs);StringHelper::SplitStr(routing_key, ".", route_strs);int n = bind_strs.size();int m = route_strs.size();//3.1状态表示:dp[i][j]表示str1的前i个单词能否和str2的前i个单词匹配成功vector<vector<bool>> dp(n+1,vector<bool>(m+1,false));//3.2初始化dp[0][0] = true;for(int i = 1 ; i <= n ; i++){if(bind_strs[i-1] != "#") break;dp[i][0] = true;}//3.3状态转移&&填表for(int i = 1 ; i <= n ; i++)for(int j = 1 ; j <= m ; j++){if(bind_strs[i-1] == route_strs[j-1] || bind_strs[i-1] == "*")dp[i][j] = dp[i-1][j-1];else if(bind_strs[i-1] == "#")dp[i][j] = dp[i-1][j] | dp[i-1][j-1] | dp[i][j-1];}return dp[n][m];}};
主题交换涉及的动态规划算法思路:

消费者管理
客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息删除掉。基于以上需求,因此需要对订阅者信息进行管理。
注意:我们是队列有了消息之后才需要找到消费者,消费者更多是队列收到消息的时候被查找。而不是信道被关闭的时候,因此消费者的管理是基于队列而不是信道。
消费者信息类
管理要素:
a. 消费者标识
b. 订阅的队列名称
c. 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个回调函数进行处理,其内部逻辑是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)
- 回调函数类型:
void(const string&ctag,const BasicProperties&, const string&body)
d. 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)
using ConsumerCallback = std::function<void(const string&,const BasicProperties*,const string&)>;//消费者类struct Consumer{using ptr = shared_ptr<Consumer>;string _tag; //消费者标识string _qname; //消费者订阅的队列名称bool _auto_ack;//自动确认标志ConsumerCallback _callback;Consumer(){}Consumer(const string& ctag,const string& qname,bool auto_ack,const ConsumerCallback& cb):_tag(ctag),_qname(qname),_auto_ack(auto_ack),_callback(cb){}};
注意:消费者是和连接信道相关的,信道关闭就不需要再给他推送消息,因此持久化消费者是没有意义的。
以队列为单元的消费者管理
管理的数据:
(1)消费者管理结构:vector(不用哈希表是为了数组方便RR轮转)
(2)轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用RR轮转。
(3)互斥锁:保证线程安全。
(4)队列名称
提供的操作:
(1) 新增消费者 : 信道提供的服务是订阅队列消息的时候创建。
(2) 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除。
(3) 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送。
(4) 判断队列消费者是否为空
(5) 判断指定消费者是否存在
(6) 清理队列所有消费者
//以队列为单元的消费者管理结构class QueueConsumer{public:using ptr = shared_ptr<QueueConsumer>;QueueConsumer(const string& qname):_qname(qname),_rr_seq(0){}//队列新增消费者Consumer::ptr create(const string& ctag,const string& qname,bool auto_ack,const ConsumerCallback& cb){unique_lock<std::mutex> lock(_mtx);//1.先从数组中查找是否存在for(auto& consumer : _consumers){if(consumer->_tag == ctag) //找到直接返回return consumer;} //2.没找到则构建对象auto consumer = make_shared<Consumer>(ctag,qname,auto_ack,cb);//3.添加管理_consumers.push_back(consumer);return consumer; }//队列移除消费者void remove(const string& ctag){unique_lock<std::mutex> lock(_mtx);//1.查找是否已经移除auto it = _consumers.begin();for( ; it != _consumers.end() ; it++){if((*it)->_tag == ctag){_consumers.erase(it);return;}} }//队列获取消费者:RR轮转获取Consumer::ptr chooseConsumer(){unique_lock<std::mutex> lock(_mtx);//1.注意可能没有消费者if(_consumers.size() == 0){ERR_LOG("当前队列%s没有消费者!",_qname.c_str());return Consumer::ptr();}//2.增加序号uint64_t num = (_rr_seq++) % _consumers.size();return _consumers[num];}//是否为空bool empty(){unique_lock<std::mutex> lock(_mtx);return (_consumers.size() == 0);}//判断指定消费者是否存在bool exists(const string& ctag){unique_lock<std::mutex> lock(_mtx);//1.查找是否已经移除auto it = _consumers.begin();for( ; it != _consumers.end() ; it++){if((*it)->_tag == ctag)return true;} return false; }//清理所有消费者///消费者跟连接通道相关,通道关闭,// 就不需要再给他推送消息,因此持久化消费者是没有意义的void clear(){unique_lock<std::mutex> lock(_mtx);_rr_seq = 0;_consumers.clear();}private:string _qname;std::mutex _mtx;uint64_t _rr_seq;//RR轮转序号vector<Consumer::ptr> _consumers;};
队列消费者总体管理
管理操作:
a. 初始化队列的消费者信息结构(创建队列的时候初始化)
b. 向指定队列新增消费者(客户端订阅指定队列消息的时候)
c. 从指定队列移除消费者(客户端取消订阅的时候)
d. 移除指定队列的所有消费者(队列被删除时销毁)
e. 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)
f. 判断队列中消费者是否为空
g. 判断队列中指定消费者是否存在
h. 清理所有消费者
class ConsumerManager{public:using ptr = shared_ptr<ConsumerManager>;ConsumerManager(){}//初始化队列单元消费者管理句柄void initQueueConsumer(const string& qname){unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it != _qconsumers.end())return;//2.没找到则进行构造QueueConsumer::ptr qcp = make_shared<QueueConsumer>(qname);//3.添加管理_qconsumers[qname] = qcp;}//销毁指定队列的单元消费者管理句柄void destroyQueueConsumer(const string& qname){unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end())return;//2.移除管理,析构之后清理消费者_qconsumers.erase(qname);}//向指定队列添加消费者Consumer::ptr create(const string& ctag,const string& qname,bool auto_ack,const ConsumerCallback& cb){QueueConsumer::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ERR_LOG("队列%s不存在,添加消费者失败!",qname.c_str());return Consumer::ptr();}qmp = it->second;}//2.找到则调用句柄进行添加return qmp->create(ctag, qname,auto_ack,cb);}//移除指定队列的指定消费者void remove(const string& ctag,const string& qname){QueueConsumer::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ERR_LOG("队列%s不存在,删除消费者失败!",qname.c_str());return;}qmp = it->second;}//2.找到则调用句柄进行添加return qmp->remove(ctag);}//选择指定队列的消费者进行消费Consumer::ptr chooseConsumer(const string& qname){QueueConsumer::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ERR_LOG("队列%s不存在,选择消费者失败!",qname.c_str());return Consumer::ptr();}qmp = it->second;}//2.选择消费者return qmp->chooseConsumer();}//判断指定队列的消费者是否为空bool empty(const string& qname){QueueConsumer::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ERR_LOG("队列%s不存在,判断消费者是否为空失败!",qname.c_str());return false;}qmp = it->second;}//2.选择消费者return qmp->empty();}//判断指定队列的消费者是否存在bool exists(const string& ctag,const string& qname){QueueConsumer::ptr qmp;{unique_lock<std::mutex> lock(_mtx);//1.先查找指定句柄是否存在auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ERR_LOG("队列%s不存在判断指定队列的消费者是否存在失败!",qname.c_str());return false;}qmp = it->second;}//2.选择消费者return qmp->exists(ctag);}//清除所有消费者void clear(){unique_lock<std::mutex> lock(_mtx);_qconsumers.clear();}private:std::mutex _mtx;unordered_map<string,QueueConsumer::ptr> _qconsumers;};
信道管理模块
在AMQP模型中,除了通信连接Connection概念外,还有一个Channel的概念,Channel是针对Connection连接的一个更细粒度的通信通道,多个Channel可以使用同一个通信连接Connection进行通信,但是同一个Connection的Channel之间相互独立,而信道模块就是再次将上述模块进行整合提供服务的模块。
网络通信协议设计
生产者和消费者都是客户端,它们都需要通过网络和Broker Server进行通信。具体通信的过程我们使用Muduo库来实现,使用TCP作为为通信的底层协议,同时在这个基础上自定义应用层协议,完成客户端对服务器功能的的远端调用。我们要实现的远端调用接口包括:
- 创建channel
- 关闭channel
- 创建exchange
- 删除exchange
- 创建queue
- 删除queue
- 创建binding
- 删除binding
- 发送message
- 订阅message
- 发送ack
- 返回message(服务器->客户端)
syntax="proto3";
package zmq;import "mq_msg.proto";//信道的打开和关闭
message openChannelRequest {string rid = 1;//请求idstring cid = 2;//信道id
};message closeChannelRequest {string rid = 1;string cid = 2;
};//交换机的声明与删除
message declareExchangeRequest{string rid = 1;string cid = 2;string exchange_name = 3; //交换机名称ExchangeType exchange_type = 4;//交换机类型bool durable = 5; //持久化标志bool auto_delete = 6; //自动删除标志map<string,string> args = 7;
};message deleteExchangeRequest {string rid = 1;string cid = 2;string exchange_name = 3; //交换机名称
};
//队列声明与删除
message declareQueueRequest {string rid = 1;string cid = 2;string queue_name = 3;bool exclusive = 4;bool durable = 5;bool auto_delete = 6;map<string,string> args = 7;
};message deleteQueueRequest {string rid = 1;string cid = 2;string queue_name = 3;
};//队列的绑定与解绑
message queueBindRequest {string rid = 1;string cid = 2;string exchange_name = 3;string queue_name = 4;string binding_key = 5;
};message queueUnBindRequest {string rid = 1;string cid = 2;string exchange_name = 3;string queue_name = 4;
};//消息的发布
message basicPublishRequest {string rid = 1;string cid = 2;string exchange_name = 3; //发布到哪个交换机string body = 4;BasicProperties properties = 5;
};//消息的确认
message basicAckRequest {string rid = 1;string cid = 2;string queue_name = 3;string message_id = 4;
};//队列的订阅
message basicConsumRequest{string rid = 1;string cid = 2;string consumer_tag = 3;//消费者标识string queue_name = 4;bool auto_ack = 5;
};//订阅的取消
message basicCancelRequest{string rid = 1;string cid = 2;string consumer_tag = 3;//消费者标识string queue_name = 4;
};//消息的推送
message basicConsumeRequest {string cid = 1;string consumer_tag = 2;string body = 3;BasicProperties properties = 4;
};//通用响应
message basicCommonResponse {string rid = 1;string cid = 2;bool ok = 3;
};
信道模块
1. 管理信息:
(1)信道ID:信道的唯一标识。
(2)信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅消息。
(3)信道关联的连接:用于向客户端发送数据(响应,推送的消息)
(4)protobuf协议处理句柄:网络通信前的协议处理
(5)消费者管理句柄 : 信道关闭/取消订阅的时候,通过句柄删除订阅者信息
(6)虚拟机句柄 : 交换机 / 队列 / 绑定 / 消息数据管理
(7)工作线程池句柄 (一条消息被发布到队列后 , 需要将消息推送给订阅了对应队列的消费者, 过程由线程池完成)
2. 管理操作
(1)提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
(2)提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
(3)提供绑定&解绑队列操作
(4)提供订阅&取消订阅队列消息操作
(5)提供发布&确认消息操作
class Channel {public:using ptr = shared_ptr<Channel>;Channel(const string& id,const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn,const ThreadPool::ptr& pool):_cid(id),_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool){} ~Channel(){if(_consumer.get() != NULL){//移除消费者信息取消订阅_cmp->remove(_consumer->_tag,_consumer->_qname); }}//交换机的声明与删除void declareExchange(const declareExchangeRequestPtr& req){//1.声明交换机_host->declareExchange(req->exchange_name(),req->exchange_type(),req->durable(),req->auto_delete(),req->args());//2.组织响应发送给客户端basicResponse(req->rid(),req->cid(), true);}void deleteExchange(const deleteExchangeRequestPtr& req){_host->deleteExchange(req->exchange_name()); return basicResponse(req->rid(), req->cid(), true); }//队列的声明与删除void declareQueue(const declareQueueRequestPtr& req){//1.声明队列bool ret = _host->declareQueue(req->queue_name(),req->durable(),req->exclusive(),req->auto_delete(),req->args());if(ret == false) {DBG_LOG("声明队列失败!");return basicResponse(req->rid(), req->cid(), false);}//2.初始化队列消费者_cmp->initQueueConsumer(req->queue_name()); return basicResponse(req->rid(), req->cid(), true);}void deleteQueue(const deleteQueueRequestPtr& req){//1.虚拟机句柄_host->deleteQueue(req->queue_name());//2.删除队列消费者结构_cmp->destroyQueueConsumer(req->queue_name());//3.组织响应return basicResponse(req->rid(), req->cid(), true);}//队列的绑定与解除绑定void bind(const queueBindRequestPtr& req){_host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(req->rid(), req->cid(), true);}void unBind(const queueUnBindRequestPtr& req){_host->unBind(req->exchange_name(),req->queue_name());return basicResponse(req->rid(), req->cid(), true);}//消息的发布void publish(const basicPublishRequestPtr& req){//1.判断交换机是否存在auto ep = _host->selectExchange(req->exchange_name());if(ep.get() == nullptr) return basicResponse(req->rid(),req->cid(),false);//2.进行交换路由判断消息可以发布到交换机的哪个队列中MsgQueueBindMap mqbp = _host->exchangeBindings(req->exchange_name());zmq::BasicProperties* properties = nullptr;string routing_key;//2.1判断是否设置了属性if(req->has_properties()){properties = req->mutable_properties();routing_key = req->mutable_properties()->routing_key();}for(auto& binding : mqbp){if(Router::route(ep->_type, routing_key, binding.second->_binding_key)){//3.路由匹配则添加消息到队列中(添加消息的管理)_host->publish(binding.first , properties, req->body());//4.向线程池中添加有一个消费任务(向指定队列消费者推送消息)auto task = std::bind(&Channel::messageConsume,this,binding.first);_pool->push(task);}}return basicResponse(req->rid(), req->cid(), true);}//消息确认void ack(const basicAckRequestPtr& req){_host->ack(req->queue_name(), req->message_id());return basicResponse(req->rid(), req->cid(), true);}//订阅队列消息void consume(const basicConsumeRequestPtr & req){//1.判断队列是否存在bool ret = _host->existMsgQueue(req->queue_name());if(ret == false){ERR_LOG("订阅队列消息失败,队列%s不存在",req->queue_name().c_str());return;}//2.创建队列的消费者auto cb = std::bind(&Channel::callback,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);_consumer = _cmp->create(req->consumer_tag(), req->queue_name(),req->auto_ack(),cb);return basicResponse(req->rid(),_cid, true); }//取消订阅void cancel(const basicCancelRequestPtr& req){//1.移除队列消费者结构_cmp->remove(req->consumer_tag(), req->queue_name());//2.组织响应return basicResponse(req->rid(), req->cid(), true);}private://让线程池中线程向指定队列消费者推送消息void messageConsume(const string& qname){//1.从队列中取出一条消息MessagePtr msg = _host->consume(qname);if(msg.get() == nullptr){ERR_LOG("向队列%s消费者推送消息失败,队列中没有消息!",qname.c_str());return;} //2.从队列订阅者中取出一个订阅者Consumer::ptr cp = _cmp->chooseConsumer(qname);if(cp.get() == nullptr){ERR_LOG("向队列%s消费者推送消息失败,队列中没有消费者!",qname.c_str());return;}//3.使用订阅者对应的消息处理函数实现消息的推送///参数:消费者标识 属性 正文cp->_callback(cp->_tag,msg->mutable_payload()->mutable_properties(),msg->payload().body());//4.判断如果订阅者是自动确认 则不需要等待确认直接删除消息,否则需要外部收到消息确认后再删除if(cp->_auto_ack) _host->ack(qname,msg->mutable_payload()->properties().id());}void basicResponse(const string& rid,const string&cid,bool ok){//1.构造basicCommonResponsezmq::basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);//2.序列化后发送_codec->send(_conn,resp);}//消费者处理回调void callback(const string& ctag,const BasicProperties* bp,const string& body){//根据参数组织出推送消息请求 将消息推送给channel对应客户端 basicConsumeResponse resp;resp.set_consumer_tag(ctag);resp.set_cid(_cid);resp.set_body(body);//判断属性是否为空if(bp != nullptr){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn,resp);}private:string _cid; //信道idConsumer::ptr _consumer;//信关联的消费者muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;ThreadPool:: ptr _pool; };
连接管理模块
该模块向用户提供一个用于实现网络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,用于与客户端进行网络通信。
连接设计
1. 成员信息:
(1)连接关联的信道管理句柄(实现信道的增删查)
(2)连接关联的实际用于通信的muduo::net::Connection
(3)protobuf协议处理的句柄(ProtobufCodeC对象)
(4)消费者管理句柄
(5)虚拟机句柄
(6)异步工作线程池
后面五个成员其实是提供给创建信道所需要的资源
2. 连接操作
(1)提供创建Channel信道的操作
(2)提供删除Channel信道的操作
class Connection{public:using ptr = shared_ptr<Connection>;Connection(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec,const ConsumerManager::ptr& cmp,const VirtualHost::ptr& host,const ThreadPool::ptr& pool):_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool),_channels(make_shared<ChannelManager>()){}~Connection(){}void openChannel(const openChannelRequestPtr& req){//1.判断信道id是否重复,创建信道bool ret = _channels->openChannel(req->cid(),_host, _cmp, _codec, _conn,_pool);if(ret == false){ ERR_LOG("创建的信道%s已经存在,信道id重复!",req->cid().c_str());return basicResponse(req->rid(),req->cid(), false);} DBG_LOG("信道创建成功!");//2.给客户端进行回复return basicResponse(req->rid(),req->cid(), true);}void closeChannel(const closeChannelRequestPtr& req){_channels->closeChannel(req->cid());return basicResponse(req->rid(), req->cid(), true);}Channel::ptr getChannel(const string& channel_id){return _channels->getChannel(channel_id);}private:void basicResponse(const string& rid,const string& cid,bool ok){basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn,resp);} private:muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;ThreadPool::ptr _pool;ChannelManager::ptr _channels;};
连接的管理
class ConnectionManager{public:using ptr = shared_ptr<ConnectionManager>;ConnectionManager(){}void newConnection(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec,const ConsumerManager::ptr& cmp,const VirtualHost::ptr& host,const ThreadPool::ptr& pool){unique_lock<std::mutex> lock(_mtx);auto it = _conns.find(conn);if(it != _conns.end()) return;Connection::ptr self_conn = make_shared<Connection>(conn,codec,cmp,host,pool);_conns.insert(make_pair(conn, self_conn));} void delConnection(const muduo::net::TcpConnectionPtr& conn){unique_lock<std::mutex> lock(_mtx);auto it = _conns.find(conn);if(it == _conns.end()) return;_conns.erase(it);}Connection::ptr getConnection(const muduo::net::TcpConnectionPtr& conn){unique_lock<std::mutex> lock(_mtx);auto it = _conns.find(conn);if(it == _conns.end()) return Connection::ptr();return it->second;}private:std::mutex _mtx;unordered_map<muduo::net::TcpConnectionPtr,Connection::ptr> _conns;};
服务器模块
构建Muduo服务器需要资源:
_server:Muduo库提供的一个通用TCP服务器,我们可以封装这个服服务器进行TCP通信。_baseloop:主事件循环器,用于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件_codec:一个protobuf编解码器,我们在TCP服务器上设计了一层应用层协议,这个编码器主要是负责实现应用层协议的解析和封装。_dispatcher:一个消息分发器,当Socket接收到一个报文消息后,我们需要按照消息的类型,即上面提到的typeName进行消息分发,会将不同类型的消息分发到相对应的处理函数中。
服务器提供服务需要的资源:
_consumer_manager:服务器中的消费者信息管理句柄_threadpool:异步工作线程池,主要用于队列消息的推送工作_connection_manager:连接管理句柄,管理当前服务器上的所有已经建立的通信连接。_virtual_host:服务器持有的虚拟主机。队列、交换机、绑定、消息等数据都是通过虚拟主机管理。
服务器提供服务主要思想就是获取相应的连接对象,然后根据连接获取信道提供服务,在这之前我们需要给每个服务注册回调:
#define HOST_NAME "host1" //目前支持单个虚拟机#define DBFILE "./meta.db" //持久化使用的数据库文件class Server{public: typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //这里不要忘了Server(int port,const string& basedir):_server(&_baseloop,InetAddress("0.0.0.0",port),"proServer",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_host(make_shared<zmq::VirtualHost>(HOST_NAME,basedir,basedir+DBFILE)),_consumer_manager(make_shared<ConsumerManager>()),_connection_manager(make_shared<ConnectionManager>()),_pool(make_shared<ThreadPool>()){//1.为提供的服务注册回调_dispatcher.registerMessageCallback<zmq::openChannelRequest>(std::bind(&Server::onOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::closeChannelRequest>(std::bind(&Server::onCloseChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::declareQueueRequest>(std::bind(&Server::onDeclareQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::queueBindRequest>(std::bind(&Server::onQueueBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::queueUnBindRequest>(std::bind(&Server::onQueueUnbind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::basicPublishRequest>(std::bind(&Server::onBasicPublish,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::basicAckRequest>(std::bind(&Server::onBasicAck,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::basicConsumeRequest>(std::bind(&Server::onBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::basicCancelRequest>(std::bind(&Server::onBasicCancel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));//2.设置连接建立和收到消息的回调_server.setConnectionCallback(std::bind(&Server::onConnection,this,std::placeholders::_1));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));//3.初始化历史消息中的队列消费者结构QueueMap allQueues = _host->allQueues();for(auto& queue : allQueues)_consumer_manager->initQueueConsumer(queue.first);}public:void Start(){_server.start();_baseloop.loop();//开启事件监控}//打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr& conn,const openChannelRequestPtr& message,muduo::Timestamp){//1.查找连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象创建信道return connection->openChannel(message);}//关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr& conn,const closeChannelRequestPtr& message,muduo::Timestamp){//1.查找连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.关闭信道connection->closeChannel(message);}//声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn,const declareExchangeRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法声明交换机",message->cid().c_str());return;}//3.使用信道提供的服务return channel->declareExchange(message);}//删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn,const deleteExchangeRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法删除交换机",message->cid().c_str());return;}//3.使用信道提供的服务return channel->deleteExchange(message);}//声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn,const declareQueueRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法声明队列",message->cid().c_str());return;}//3.使用信道提供的服务return channel->declareQueue(message);}//删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn,const deleteQueueRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法删除队列",message->cid().c_str());return;}//3.使用信道提供的服务return channel->deleteQueue(message);}//队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr& conn,const queueBindRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法进行队列绑定",message->cid().c_str());return;}//3.使用信道提供的服务return channel->bind(message);}//队列解绑void onQueueUnbind(const muduo::net::TcpConnectionPtr& conn,const queueUnBindRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法队列解绑",message->cid().c_str());return;}//3.使用信道提供的服务return channel->unBind(message);}//消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr& conn,const basicPublishRequestPtr& message,muduo::Timestamp){// DBG_LOG("有客户端发布消息...");//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法消息发布",message->cid().c_str());return;}//3.使用信道提供的服务return channel->publish(message);}//消息确认void onBasicAck(const muduo::net::TcpConnectionPtr& conn,const basicAckRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法消息确认",message->cid().c_str());return;}//3.使用信道提供的服务return channel->ack(message);}//队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr& conn,const basicConsumeRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s无法订阅消息",message->cid().c_str());return;}//3.使用信道提供的服务return channel->consume(message);}//队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr& conn,const basicCancelRequestPtr& message,muduo::Timestamp){//1.获取连接对象Connection::ptr connection = _connection_manager->getConnection(conn);if(connection.get() == nullptr){ERR_LOG("没有找到对应连接对象!");conn->shutdown();return;}//2.通过连接对象来获取信道auto channel = connection->getChannel(message->cid()); if(channel.get() == nullptr){ERR_LOG("没有对应信道%s,无法取消订阅队列消息",message->cid().c_str());return;}//3.使用信道提供的服务return channel->cancel(message);}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp){LOG_INFO << "onUnknownMessage:" << message->GetTypeName();//关闭连接conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){LOG_INFO << "连接获取成功:" << conn->peerAddress().toIpPort();_connection_manager->newConnection(conn,_codec,_consumer_manager,_host,_pool);}else {LOG_INFO << "连接建立失败" ;_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server; //服务器对象ProtobufDispatcher _dispatcher; //请求分发器 --- 要向其中注册请求处理函数ProtobufCodecPtr _codec;//protobuf协议处理器 --- 针对收到的请求数据进行protobuf协议处理VirtualHost::ptr _host;ConsumerManager::ptr _consumer_manager;//消费者管理句柄ConnectionManager::ptr _connection_manager; //连接管理句柄ThreadPool::ptr _pool;};
