当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(六)

目录

队列消费者/订阅者管理

代码实现

信道管理模块

代码实现


队列消费者/订阅者管理

        客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。
        而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息给删除掉。

基于以上需求,因此需要对订阅者信息进行管理。

  • 定义消费者信息结构
    • 消费者标识。
    • 订阅的队列名称。
    • 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)。
      • void(const std::string&, const BasicProperties&, const std::string&)
    • 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)。
  • 消费者管理--以队列为单元进行管理-队列消费者管理结构
    • 操作
      • 新增消费者:信道提供的服务是订阅队列消息的时候创建。
      • 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除。
      • 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送。
      • 判断队列消费者是否为空。
      • 判断指定消费者是否存在。
      • 清理队列所有消费者。
    • 元素
      • 消费者管理结构:vector。
      • 轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用 RR 轮转。
      • 互斥锁:保证线程安全。
      • 队列名称。
  • 对消费者进行统一管理结构
    • 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)。
    • 向指定队列新增消费者(客户端订阅指定队列消息的时候):新增完成的时候返回消费者对象。
    • 从指定队列移除消费者(客户端取消订阅的时候)。
    • 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象。
    • 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)。
    • 判断队列中消费者是否为空。
    • 判断队列中指定消费者是否存在。
    • 清理所有消费者。

代码实现

#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>
#include <functional>namespace jiuqi
{// 消费者表示, 属性, 消息using ConsumerCallback = std::function<void(const std::string&, const BasicProperties*, const std::string&)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string tag;           // 消费者标识std::string qname;         // 绑定的队列名称bool auto_ack;             // 是否自动应答ConsumerCallback callback; // 回调函数Consumer() { DEBUG("new Consumer %p", this); }Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb): tag(ctag), qname(queue_name), auto_ack(ack), callback(cb) { DEBUG("new Consumer %p", this); }~Consumer() { DEBUG("del Consumer %p", this); } };// 以队列为单元的消费者管理结构class QueueConsumer{public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0) {}// 新增消费者Consumer::ptr create(const std::string &ctag, bool ack, const ConsumerCallback &cb){std::unique_lock<std::mutex> lock(_mutex);// 判断是否重复for (auto &consumer : _consumers)if (consumer->tag == ctag)return nullptr;// 构建消费者Consumer::ptr consumer = std::make_shared<Consumer>(ctag, _qname, ack, cb);_consumers.push_back(consumer);return consumer;}// 移除消费者void remove(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); it++){if ((*it)->tag == ctag){_consumers.erase(it);return;}}}// 获取消费者Consumer::ptr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_consumers.empty())return nullptr;int index = _rr_seq % _consumers.size();++_rr_seq;return _consumers[index];}// 是否为空bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _consumers.empty();}// 某个消费者是否存在bool exists(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); it++)if ((*it)->tag == ctag)return true;return false;}// 清空void clear(){std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq; // 轮转序号std::vector<Consumer::ptr> _consumers;};class ConsumerManager{public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager() {}void initQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it != _qconsumers.end())return;QueueConsumer::ptr qc = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qc));}void destoryQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock_mutex;for (auto it = _qconsumers.begin(); it != _qconsumers.end(); it++){if (it->first == qname){_qconsumers.erase(it);return;}}}Consumer::ptr create(const std::string &ctag, const std::string &qname, bool ack, const ConsumerCallback &cb){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return nullptr;}   qcp = it->second; }return qcp->create(ctag, ack, cb);}void remove(const std::string &ctag, const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return;}   qcp = it->second; }return qcp->remove(ctag);}Consumer::ptr choose(const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return nullptr;}   qcp = it->second; }return qcp->choose();}bool empty(const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return true;}   qcp = it->second; }return qcp->empty();}bool exists(const std::string &ctag, const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return false;}   qcp = it->second; }return qcp->exists(ctag);}void clear(){std::unique_lock<std::mutex> lock_mutex;_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};
}

信道管理模块

        在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。而信道模块就是再次将上述模块进行整合提供服务的模块。

  • 管理信息:
    • 信道 ID:信道的唯一标识。
    • 信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅者信息。
    • 信道关联的连接:用于向客户端发送数据(响应,推送的消息)。
    • protobuf 协议处理句柄:网络通信前的协议处理。
    • 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息。
    • 虚拟机句柄:交换机/队列/绑定/消息数据管理。
    • 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)。
  • 管理操作:
    • 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)。
    • 提供绑定&解绑队列操作。
    • 提供订阅&取消订阅队列消息操作。
    • 提供发布&确认消息操作。
  • 信道管理
    • 信道的增删查。

代码实现

#pragma once
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"
#include "route.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include <unordered_map>
#include <memory>namespace jiuqi
{using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnbindRequestPtr = std::shared_ptr<queueUnbindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const std::string &cid,const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec,const ConsumerManager::ptr &cmp,const VirtualHost::ptr &vhost,const ThreadPool::ptr &pool): _cid(cid),_conn(conn),_codec(codec),_cmp(cmp),_vhost(vhost),_pool(pool){DEBUG("new Channel %p", this);}~Channel(){if (_consumer != nullptr)_cmp->remove(_consumer->tag, _consumer->qname);DEBUG("del Channel %p", this);}// 交换机的声明与删除void declareExchange(const declareExchangeRequestPtr &req){bool ret = _vhost->declareExchange(req->ename(), req->etype(), req->durable(), req->auto_delete(), req->args());basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req){_vhost->deleteExchange(req->ename());basicResponse(true, req->rid(), req->cid());}// 队列的声明与删除void declareQueue(const declareQueueRequestPtr &req){bool ret = _vhost->declareQueue(req->qname(), req->durable(), req->exclusive(), req->auto_delete(), req->args());if (ret)_cmp->initQueueConsumer(req->qname());basicResponse(ret, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req){_cmp->destoryQueueConsumer(req->qname());_vhost->deleteQueue(req->qname());basicResponse(true, req->rid(), req->cid());}// 队列的绑定与解绑void queueBind(const queueBindRequestPtr &req){bool ret = _vhost->bind(req->ename(), req->qname(), req->bindingkey());basicResponse(ret, req->rid(), req->cid());}void queueUnbind(const queueUnbindRequestPtr &req){_vhost->unBind(req->ename(), req->qname());basicResponse(true, req->rid(), req->cid());}// 消息的发布void basicPublish(const basicPublishRequestPtr &req){// 判断交换机是否存在auto ep = _vhost->selectExchange(req->ename());if (ep == nullptr)basicResponse(false, req->rid(), req->cid());// 进行交换路由QueueBindingMap qbm = _vhost->exchangeBinding(req->ename());BasicProperties *bp = nullptr;std::string routekey;if (req->has_properties()){routekey = req->properties().routing_key();bp = req->mutable_properties();}for (auto &binding : qbm){if (Router::route(ep->type, routekey, binding.second->bindingKey)){// 将消息添加到队列中_vhost->basicPublish(binding.first, bp, req->body());// 向线程池中添加一个消息消费任务(向指定队列的订阅者推送消息)auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}basicResponse(true, req->rid(), req->cid());}// 消息的确认void basicAck(const basicAckRequestPtr &req){_vhost->basicAck(req->qname(), req->mid());basicResponse(true, req->rid(), req->cid());}// 订阅队列消息void basicConsumer(const basicConsumeRequestPtr &req){// 判断队列是否存在bool ret = _vhost->existsQueue(req->qname());if (!ret)return basicResponse(false, req->rid(), req->cid());// 创建队列消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);// 创建了消费者之后, 当前的channel角色就是个消费者_consumer = _cmp->create(req->ctag(), req->qname(), req->auto_ack(), cb);basicResponse(true, req->rid(), req->cid());}// 取消订阅void basicCancel(const basicCancelRequestPtr &req){_cmp->remove(req->ctag(), req->qname());basicResponse(true, req->rid(), req->cid());}private:void basicResponse(bool ok, const std::string &rid, const std::string &cid){basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp);}void consume(const std::string &qname){MessagePtr mp = _vhost->basicConsume(qname);if (mp == nullptr){DEBUG("%s 队列无消息", qname.c_str());return;}Consumer::ptr cp = _cmp->choose(qname);if (cp == nullptr){DEBUG("%s 队列无消费者", qname.c_str())return;}cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());DEBUG("消费者: %s", cp->tag.c_str());if (cp->auto_ack)_vhost->basicAck(qname, mp->payload().properties().id());}void callback(const std::string &tag, const BasicProperties *bp, const std::string &body){basicConsumeResponse resp;resp.set_cid(_cid);resp.set_ctag(tag);resp.set_body(body);if (bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_deliver_mode(bp->deliver_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp);}private:std::string _cid;Consumer::ptr _consumer;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _vhost;ThreadPool::ptr _pool;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}bool openChannl(const std::string &cid,const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec,const ConsumerManager::ptr &cmp,const VirtualHost::ptr &vhost,const ThreadPool::ptr &pool){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it != _channels.end()) return false;auto channel = std::make_shared<Channel>(cid, conn, codec, cmp, vhost, pool);_channels.insert(std::make_pair(cid, channel));return true;}void closeChannel(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr getChannel(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end()) return nullptr;return it->second;            }private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}

http://www.dtcms.com/a/315744.html

相关文章:

  • Go 单元测试:如何只运行某个测试函数(精确控制)
  • C++ 网络编程入门:TCP 协议下的简易计算器项目
  • 【STM32】HAL库中的实现(四):RTC (实时时钟)
  • 日语学习-日语知识点小记-构建基础-JLPT-N3阶段(14):文法:ていく+きた+单词
  • MQTT学习
  • Starrocks 关于 trace 命令的说明
  • C# --- 本地缓存失效形成缓存击穿触发限流
  • 【面向对象】面向对象七大原则
  • 【乐企板式文件生成工程】关于乐企板式文件(PDF/OFD/XML)生成工程介绍
  • [2401MT-B] 面积比较
  • 翻译的本质:人工翻译vs机器翻译的核心差异与互补性
  • Starrocks中的 Query Profile以及explain analyze及trace命令中的区别
  • MySQL 中 VARCHAR 和 TEXT 的区别
  • 智慧酒店:科技赋能下的未来住宿新体验
  • Spring-rabbit使用实战六
  • 国产三防平板电脑是什么?三防平板推荐
  • Spark内核调度
  • RTC实时时钟RX8900SA国产替代FRTC8900S
  • 使用maven-shade-plugin解决es跨版本冲突
  • 微信小程序功能实现:页面导航与跳转
  • jenkins插件Active Choices的使用通过参数动态控制多选参数的选项
  • LHA6958D是一款代替AD7606的芯片
  • 【前端】网站favicon图标制作
  • MyBatisPlus查询数据库中所有表的数据(AI)
  • 使标签垂直水平居中的多种方法
  • 自动驾驶控制算法——MPC控制算法
  • 数据结构 实现单链表
  • Vue3核心语法进阶(Props)
  • C语言:选择排序算法深度剖析!
  • nodejs 编码初体验