当前位置: 首页 > news >正文

信道管理模块实现

信道管理模块实现

代码如下:

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_threadpool.hpp"
#include "mq_consumer.hpp"
#include "mq_host.hpp"
#include "mq_route.hpp"
#include "mq_queue.hpp"namespace xypmq
{using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;class Channel {public:using ptr = std::shared_ptr<Channel>;Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool):_cid(id),_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool){DLOG("new Channel: %p", this);}~Channel() {if (_consumer.get() != nullptr) {_cmp->remove(_consumer->tag, _consumer->qname);}DLOG("del Channel: %p", this);}//交换机的声明与删除void declareExchange(const declareExchangeRequestPtr &req) {bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req) {_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid());}//队列的声明与删除void declareQueue(const declareQueueRequestPtr &req) {bool ret = _host->declareQueue(req->queue_name(),req->durable(), req->exclusive(),req->auto_delete(), req->args());if (ret == false) {return basicResponse(false, req->rid(), req->cid());}_cmp->initQueueConsumer(req->queue_name());//初始化队列的消费者管理句柄return basicResponse(true, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req) {_cmp->destroyQueueConsumer(req->queue_name());_host->deleteQueue(req->queue_name());return basicResponse(true, req->rid(), req->cid());}//队列的绑定与解除绑定void queueBind(const queueBindRequestPtr &req){bool ret = _host->bind(req->exchange_name(),req->queue_name(), req->binding_key());return basicResponse(ret, req->rid(), req->cid());    }void queueUnBind(const queueUnBindRequestPtr &req) {_host->unBind(req->exchange_name(),req->queue_name());return basicResponse(true, req->rid(), req->cid());}//消息的发布void basicPublish(const basicPublishRequestPtr &req) {//1. 判断交换机是否存在auto ep=_host->selectExchange(req->exchange_name());if(ep.get()==nullptr){return basicResponse(false, req->rid(), req->cid());}//2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中MsgQueueBindingMap mqbm=_host->exchangeBindings(req->exchange_name());BasicProperties *properties=nullptr;std::string routing_key;if(req->has_properties()){properties=req->mutable_properties();routing_key=properties->routing_key();}for(auto &binding: mqbm){if(Router::route(ep->type,routing_key,binding.second->binding_key)){//3. 将消息添加到队列中(添加消息的管理)_host->basicPublish(binding.first,properties,req->body());//4. 向线程池中添加一个消息消费任务(向指定队列的订阅者去推送消息--线程池完成)auto task=std::bind(&Channel::consume,this,binding.first);_pool->push(task);}}return basicResponse(true, req->rid(), req->cid());}//消息的确认void basicAck(const basicAckRequestPtr &req) {_host->basicAck(req->queue_name(),req->message_id());return basicResponse(true, req->rid(), req->cid());}//订阅队列消息void basicConsume(const basicConsumeRequestPtr &req) {//1. 判断队列是否存在bool ret=_host->existsQueue(req->queue_name());if(ret==false){return basicResponse(false, req->rid(), req->cid());}//2. 创建队列的消费者auto cb=std::bind(&Channel::callback,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);//创建了消费者之后,当前的channel角色就是个消费者_consumer=_cmp->create(req->consumer_tag(),req->queue_name(),req->auto_ack(),cb);return basicResponse(true, req->rid(), req->cid());}//取消订阅void basicCancel(const basicCancelRequestPtr &req) {_cmp->remove(req->consumer_tag(),req->queue_name());return basicResponse(true, req->rid(), req->cid());}private:void callback(const std::string tag, const BasicProperties *bp, const std::string &body){//针对参数组织出推送消息请求,将消息推送给channel对应的客户端basicConsumeResponse resp;resp.set_cid(_cid);resp.set_body(body);resp.set_consumer_tag(tag);if (bp) {resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp);}void consume(const std::string &qname){//指定队列消费消息//1. 从队列中取出一条消息MessagePtr mp=_host->basicConsume(qname);if(mp.get()==nullptr){DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}//2. 从队列订阅者中取出一个订阅者Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr) {DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}//3. 调用订阅者对应的消息处理函数,实现消息的推送cp->callback(cp->tag,mp->mutable_payload()->mutable_properties(),mp->payload().body());//4. 判断如果订阅者是自动确认---不需要等待确认,直接删除消息,否则需要外部收到消息确认后再删除if(cp->auto_ack) _host->basicAck(qname,mp->payload().properties().id());}void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp);}private:std::string _cid;Consumer::ptr _consumer;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;threadpool::ptr _pool;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager(){}bool openChannel(const std::string &id,const VirtualHost::ptr &host,ConsumerManager::ptr &cmp,const ProtobufCodecPtr &codec,const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool){std::unique_lock<std::mutex> lock(_mutex);auto it=_channels.find(id);if(it!=_channels.end()){DLOG("信道:%s 已经存在!", id.c_str());return false;}auto channel=std::make_shared<Channel> (id,host,cmp,codec,conn,pool);_channels.insert(std::make_pair(id, channel));return true;}void closeChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);auto it=_channels.find(id);if(it==_channels.end()){return Channel::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};
}
#endif
http://www.dtcms.com/a/390902.html

相关文章:

  • Java 网络原理(一)--- 自定义协议,UDP协议和TCP协议
  • 键盘失灵 键盘不好使问题解决(更新到 Windows 11后 )
  • 远程控制操作中,如何开启游戏键盘及3D鼠标?移动端设置教程分享
  • C 语言宏函数进阶:逗号表达式与 GNU 拓展的妙用
  • 币安加密货币API接口文档
  • Ubuntu20.04仿真 | iris无人机添加mid360激光雷达可直接使用文件
  • 17.ImGui-Hook消息循环
  • 《Skinned Mesh Renderer与LOD系统蒙皮变形异常全解析》
  • 免费插件分享 |Pro Scene Manager
  • Elasticsearch 的 ES|QL 编辑器体验 vs. OpenSearch 的 PPL 事件分析器
  • Unity核心概念⑪:光
  • C 语言运算符优先级(超详细)
  • Ingress使用示例
  • HarmonyOS开源项目分享:识笺——高效学习的卡片应用
  • 揭秘提示词攻击:AI时代的安全新战场
  • vscode安装go插件问题
  • 创作一个简单的编程语言3 加上VLLM后端
  • C语言入门指南:内存操作函数详解
  • React 列表渲染 列表排序 条件渲染 数据渲染 响应式处理
  • 从安卓手机切换到iPhone:好处、缺点及4种方法
  • C++ 篇 类和对象(1)万能工具怎么用?
  • Ansible-copy模块
  • SAPO去中心化训练:多节点协作让LLM训练效率提升94%
  • Stm32 IAP 升级
  • 5G标准学习笔记17------ MDT(Minimization of Drive Tests)路测最小化
  • [Dify] 构建“流程型表单问答”系统:逐步提问逻辑实现
  • 从RAW到JPG到BMP:工业视觉图像格式怎么选?
  • Linux系统Rsync+sersync 实现数据同步
  • 【13/20】缓存与性能优化:Redis 在 Express 中的整合,实现用户数据缓存
  • 如何防止电脑长时间运行过热?定时关机是第一道防线