当前位置: 首页 > news >正文

客户端实现信道管理

客户端实现信道管理

代码如下:

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "mq_consumer.hpp"
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
namespace xypmq
{typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn, const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()), _conn(conn), _codec(codec) {}~Channel() { basicCancel(); }std::string cid() {return _cid;}bool openChannel(){}void closeChannel(){}bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,google::protobuf::Map<std::string, std::string> &args) {//构造一个声明虚拟机的请求对象,std::string rid = UUIDHelper::uuid();declareExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);//然后向服务器发送请求_codec->send(_conn, req);//等待服务器的响应basicCommonResponsePtr resp = waitResponse(rid);//返回return resp->ok();}void deleteExchange(const std::string &name){std::string rid = UUIDHelper::uuid();deleteExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);_codec->send(_conn, req);waitResponse(rid);return ;}bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string, std::string> &qargs){std::string rid = UUIDHelper::uuid();declareQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_auto_delete(qauto_delete);req.set_exclusive(qexclusive);req.mutable_args()->swap(qargs);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void deleteQueue(const std::string &qname){std::string rid = UUIDHelper::uuid();deleteQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);_codec->send(_conn, req);waitResponse(rid);return ;}bool queueBind(const std::string &ename, const std::string &qname, const std::string &key){std::string rid = UUIDHelper::uuid();queueBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void queueUnBind(const std::string &ename, const std::string &qname){std::string rid = UUIDHelper::uuid();queueUnBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);_codec->send(_conn, req);waitResponse(rid);return ;}void basicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body){std::string rid = UUIDHelper::uuid();basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if (bp != nullptr) {req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, req);waitResponse(rid);return ;}void basicAck(const std::string &msgid){if (_consumer.get() == nullptr) {DLOG("消息确认时,找不到消费者信息!");return ;}std::string rid = UUIDHelper::uuid();basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->qname);req.set_message_id(msgid);_codec->send(_conn, req);waitResponse(rid);return;}void basicCancel() {if (_consumer.get() == nullptr) {return ;}std::string rid = UUIDHelper::uuid();basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->qname);req.set_consumer_tag(_consumer->tag);_codec->send(_conn, req);waitResponse(rid);_consumer.reset();return;}bool basicConsume(const std::string &consumer_tag,const std::string &queue_name,bool auto_ack,const ConsumerCallback &cb){if (_consumer.get() != nullptr) {DLOG("当前信道已订阅其他队列消息!");return false;}std::string rid = UUIDHelper::uuid();basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(queue_name);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);_codec->send(_conn, req);basicCommonResponsePtr resp =  waitResponse(rid);if (resp->ok() == false) {DLOG("添加订阅失败!");return false;}_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);return true;}public:   //连接收到基础响应后,向hash_map中添加响应void putBasicResponse(const basicCommonResponsePtr& resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(),resp));_cv.notify_all();}//连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr& resp) {if (_consumer.get() == nullptr) \{DLOG("消息处理时,未找到订阅者信息!");return;}if (_consumer->tag != resp->consumer_tag()) {DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致!");return ;}_consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}private:basicCommonResponsePtr waitResponse(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!=_basic_resp.end();});basicCommonResponsePtr basic_resp=_basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};class ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec) {std::unique_lock<std::mutex> lock(_mutex);auto channel = std::make_shared<Channel>(conn, codec);_channels.insert(std::make_pair(channel->cid(), channel));return channel;}void remove(const std::string &cid) {std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string &cid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end()) {return Channel::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}
#endif
http://www.dtcms.com/a/392925.html

相关文章:

  • 异常解决记录 | Yarn NodeManager 注册异常
  • 【C#】C# 调用 Python 脚本正确姿势:解决 WaitForExit 死锁与退出检测问题
  • Java25新特性
  • 卷积神经网络CNN-part9-DenseNet
  • 深入浅出密码学第一章课后题(持续更新)
  • Mysql 入门概览
  • 大模型中权重共享的作用?
  • 【精品资料鉴赏】55页可编辑PPT详解 数字化高校智慧后勤解决方案
  • LLM大模型 - 实战篇 - AI Agents的开发应用
  • 【分布式技术】RedisShake相关功能详细介绍
  • qsv:一款高性能的CSV数据处理工具
  • `html` 将视频作为背景
  • 口播提词器怎么选?手机提词器实测指南与参数推荐
  • 解剖线性表
  • 计算数学研究方向有哪些细分领域?
  • [xboard]08-Makefile逐行分析2
  • Clash 中 REJECT 的技术原理与解决方案 —— 以哔哩哔哩延迟问题为例
  • 庖丁解牛与专家思维:道家的“心手合一”训练法
  • matlab通过GUI实现点云的读取、自定义显示和保存
  • 工业现场实战:如何利用智能网关实现西门子PLC与库卡机器人的无缝连接
  • 【开题答辩全过程】以 Java程序设计课程作业数据分析为例,包含答辩的问题和答案
  • ubuntu配置cuda与torch
  • C语言:输出水仙花数
  • 进程的创建
  • 如何用Anaconda Navigator和命令行管理Python库?
  • 28 种 LLM 越狱攻击全景拆解(2025.9 版)从“AIM”到“Generation Exploitation”,一张防御地图看懂所有套路
  • 第14章 智能床位
  • 总结一下MySQL数据库服务器性能优化的几个维度
  • IP 打造财富新机遇
  • linux系统如何查看文件位置在数据盘还是系统盘