当前位置: 首页 > news >正文

交换机数据管理

交换机数据管理

代码如下:

#ifndef __M_EXCHANGE_H__
#define __M_EXCHANGE_H__
#include <string>
#include <cassert>
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_msg.pb.h"namespace xypmq
{//1. 定义交换机类struct Exchange{using ptr = std::shared_ptr<Exchange>;//1. 交换机名称std::string name;//2. 交换机类型ExchangeType type;//3. 交换机持久化标志bool durable;//4. 是否自动删除标志bool auto_delete;//5. 其他参数google::protobuf::Map<std::string, std::string> args;Exchange(){}Exchange(const std::string &ename,ExchangeType etype,bool edurable,bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs) {}void setArgs(const std::string& str_args){std::vector<std::string> sub_args;StrHelper::split(str_args,"&",sub_args);for(auto&str:sub_args){size_t pos=str.find("=");std::string key=str.substr(0,pos);std::string val=str.substr(pos+1);args[key] = val;}}std::string getArgs(){std::string result;for(auto start=args.begin();start!=args.end();++start){result+=start->first+"="+start->second+"&";}return result;}};using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;//2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中class ExchangeMapper{public:ExchangeMapper(const std::string &dbfile):_sql_helper(dbfile){std::string path=FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}void createTable(){#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key,\type int,\durable int,\auto_delete int,\args varchar(128));"bool ret=_sql_helper.exec(CREATE_TABLE,nullptr,nullptr);if(ret==false){DLOG("创建交换机数据库失败!");abort();//直接异常退出程序}}void removeTable(){#define DROP_TABLE "drop table if exists exchange_table;"bool ret=_sql_helper.exec(DROP_TABLE,nullptr,nullptr);if(ret==false){DLOG("创建交换机数据库失败!");abort();//直接异常退出程序}}bool insert(Exchange::ptr &exp){std::stringstream ss;ss << "insert into exchange_table values(";ss << "'" << exp->name << "', ";ss << exp->type << ", ";ss << exp->durable << ", ";ss << exp->auto_delete << ", ";ss << "'" << exp->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);}void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}ExchangeMap recovery() {ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields) {ExchangeMap *result = (ExchangeMap*)arg;auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (xypmq::ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);if (row[4]) exp->setArgs(row[4]);result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};class ExchangeManager{   public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbfile):_mapper(dbfile){_exchanges=_mapper.recovery();}bool declareExchange(const std::string& name,ExchangeType type,bool durable,bool auto_delete,const google::protobuf::Map<std::string, std::string>& args){std::unique_lock<std::mutex> _lock(_mutex);auto it=_exchanges.find(name);if(it!=_exchanges.end()){return true;}auto exp=std::make_shared<Exchange>(name,type,durable,auto_delete,args);if(durable==true){bool ret = _mapper.insert(exp);if (ret == false) return false;}_exchanges.insert(std::make_pair(name,exp));return true;}void deleteExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex) ;auto it=_exchanges.find(name);if(it==_exchanges.end()){return;}if(it->second->durable==true) _mapper.remove(name);_exchanges.erase(name);}Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex) ;auto it=_exchanges.find(name);if(it==_exchanges.end()){return Exchange::ptr();}return it->second;}bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex) ;auto it=_exchanges.find(name);if(it==_exchanges.end()){return false;}return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex) ;return _exchanges.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex) ;_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;ExchangeMapper _mapper;ExchangeMap _exchanges;};
} 
#endif

文章转载自:

http://hc3jThu6.tpyrn.cn
http://z91MT6Iv.tpyrn.cn
http://hz7OuW9H.tpyrn.cn
http://Q5Ar33Hl.tpyrn.cn
http://6W83ylJd.tpyrn.cn
http://gBt2Mu1n.tpyrn.cn
http://ZUVeKZCo.tpyrn.cn
http://Mt6mAPsL.tpyrn.cn
http://QXWWfbpG.tpyrn.cn
http://bbaCj0CK.tpyrn.cn
http://zN3G8i7v.tpyrn.cn
http://L0APFo4a.tpyrn.cn
http://H1yyxDX2.tpyrn.cn
http://9MoXjrJL.tpyrn.cn
http://6PxQOV9I.tpyrn.cn
http://KJWPLfzZ.tpyrn.cn
http://Wvx6TvVo.tpyrn.cn
http://OglqPDBa.tpyrn.cn
http://E3D4WR4Z.tpyrn.cn
http://HVGwAXnS.tpyrn.cn
http://qDWColw7.tpyrn.cn
http://4hcfhD1r.tpyrn.cn
http://2c9iBaVg.tpyrn.cn
http://hgBrSTBm.tpyrn.cn
http://B0pjWQ9d.tpyrn.cn
http://excN7duj.tpyrn.cn
http://5iwoZrBQ.tpyrn.cn
http://dlYlU9ph.tpyrn.cn
http://zKcURhvx.tpyrn.cn
http://mO2DDeb6.tpyrn.cn
http://www.dtcms.com/a/382007.html

相关文章:

  • 【Redis#11】Redis 在 C++ 客户端下的安装使用流程(一条龙服务)
  • leetcode 315 计算右侧小于当前元素的个数
  • MYSQL端口号3306被占用
  • Python核心技术开发指南(062)——静态方法
  • [Windows] 整容脸比对系统
  • C语言:指针从入门到精通(上)
  • 【MySQL】--- 表的约束
  • SpringBoot 轻量级一站式日志可视化与JVM监控
  • Java零基础学习Day10——面向对象高级
  • JavaScript中ES模块语法详解与示例
  • 系统核心解析:深入操作系统内部机制——进程管理与控制指南(三)【进程优先级/切换/调度】
  • Roo Code:用自然语言编程的VS Code扩展
  • 第8.4节:awk的内置时间处理函数
  • leetcode算法刷题的第三十四天
  • 【技术博客分享】LLM推理过程中的不确定问题
  • Vue3基础知识-setup()、ref()和reactive()
  • 规则系统架构风格
  • 宋红康 JVM 笔记 Day17|垃圾回收器
  • vue表单弹窗最大化无法渲染复杂组件内容
  • 加餐加餐!烧烤斗破苍穹
  • SCSS 中的Mixins 和 Includes,%是什么意思
  • RFID基础了解 --- RC522
  • 第九篇 永磁同步电机控制-弱磁控制
  • 搭建langchain4j+SpringBoot的Ai项目
  • 一次 Linux 高负载 (Load) 异常问题排查实录
  • 扩散模型进化史
  • 学习Python是一个循序渐进的过程,结合系统学习、持续实践和项目驱动,
  • EKSPod 资源利用率配置修复:从占位符到完整资源分析系统
  • MySql基础:数据类型
  • 鸿蒙中的智能设备数据分析实战:从采集到建模的完整实现指南