C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(六)
目录
队列消费者/订阅者管理
代码实现
信道管理模块
代码实现
队列消费者/订阅者管理
客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。
而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息给删除掉。
基于以上需求,因此需要对订阅者信息进行管理。
- 定义消费者信息结构
- 消费者标识。
- 订阅的队列名称。
- 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)。
- void(const std::string&, const BasicProperties&, const std::string&)
- 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)。
- 消费者管理--以队列为单元进行管理-队列消费者管理结构
- 操作
- 新增消费者:信道提供的服务是订阅队列消息的时候创建。
- 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除。
- 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送。
- 判断队列消费者是否为空。
- 判断指定消费者是否存在。
- 清理队列所有消费者。
- 元素
- 消费者管理结构:vector。
- 轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用 RR 轮转。
- 互斥锁:保证线程安全。
- 队列名称。
- 操作
- 对消费者进行统一管理结构
- 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)。
- 向指定队列新增消费者(客户端订阅指定队列消息的时候):新增完成的时候返回消费者对象。
- 从指定队列移除消费者(客户端取消订阅的时候)。
- 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象。
- 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)。
- 判断队列中消费者是否为空。
- 判断队列中指定消费者是否存在。
- 清理所有消费者。
代码实现
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>
#include <functional>namespace jiuqi
{// 消费者表示, 属性, 消息using ConsumerCallback = std::function<void(const std::string&, const BasicProperties*, const std::string&)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string tag; // 消费者标识std::string qname; // 绑定的队列名称bool auto_ack; // 是否自动应答ConsumerCallback callback; // 回调函数Consumer() { DEBUG("new Consumer %p", this); }Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb): tag(ctag), qname(queue_name), auto_ack(ack), callback(cb) { DEBUG("new Consumer %p", this); }~Consumer() { DEBUG("del Consumer %p", this); } };// 以队列为单元的消费者管理结构class QueueConsumer{public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0) {}// 新增消费者Consumer::ptr create(const std::string &ctag, bool ack, const ConsumerCallback &cb){std::unique_lock<std::mutex> lock(_mutex);// 判断是否重复for (auto &consumer : _consumers)if (consumer->tag == ctag)return nullptr;// 构建消费者Consumer::ptr consumer = std::make_shared<Consumer>(ctag, _qname, ack, cb);_consumers.push_back(consumer);return consumer;}// 移除消费者void remove(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); it++){if ((*it)->tag == ctag){_consumers.erase(it);return;}}}// 获取消费者Consumer::ptr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_consumers.empty())return nullptr;int index = _rr_seq % _consumers.size();++_rr_seq;return _consumers[index];}// 是否为空bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _consumers.empty();}// 某个消费者是否存在bool exists(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); it++)if ((*it)->tag == ctag)return true;return false;}// 清空void clear(){std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq; // 轮转序号std::vector<Consumer::ptr> _consumers;};class ConsumerManager{public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager() {}void initQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it != _qconsumers.end())return;QueueConsumer::ptr qc = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qc));}void destoryQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock_mutex;for (auto it = _qconsumers.begin(); it != _qconsumers.end(); it++){if (it->first == qname){_qconsumers.erase(it);return;}}}Consumer::ptr create(const std::string &ctag, const std::string &qname, bool ack, const ConsumerCallback &cb){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return nullptr;} qcp = it->second; }return qcp->create(ctag, ack, cb);}void remove(const std::string &ctag, const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return;} qcp = it->second; }return qcp->remove(ctag);}Consumer::ptr choose(const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return nullptr;} qcp = it->second; }return qcp->choose();}bool empty(const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return true;} qcp = it->second; }return qcp->empty();}bool exists(const std::string &ctag, const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock_mutex;auto it = _qconsumers.find(qname);if (it == _qconsumers.end()){DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());return false;} qcp = it->second; }return qcp->exists(ctag);}void clear(){std::unique_lock<std::mutex> lock_mutex;_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};
}
信道管理模块
在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。而信道模块就是再次将上述模块进行整合提供服务的模块。
- 管理信息:
- 信道 ID:信道的唯一标识。
- 信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅者信息。
- 信道关联的连接:用于向客户端发送数据(响应,推送的消息)。
- protobuf 协议处理句柄:网络通信前的协议处理。
- 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息。
- 虚拟机句柄:交换机/队列/绑定/消息数据管理。
- 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)。
- 管理操作:
- 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)。
- 提供绑定&解绑队列操作。
- 提供订阅&取消订阅队列消息操作。
- 提供发布&确认消息操作。
- 信道管理
- 信道的增删查。
代码实现
#pragma once
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"
#include "route.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include <unordered_map>
#include <memory>namespace jiuqi
{using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnbindRequestPtr = std::shared_ptr<queueUnbindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const std::string &cid,const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec,const ConsumerManager::ptr &cmp,const VirtualHost::ptr &vhost,const ThreadPool::ptr &pool): _cid(cid),_conn(conn),_codec(codec),_cmp(cmp),_vhost(vhost),_pool(pool){DEBUG("new Channel %p", this);}~Channel(){if (_consumer != nullptr)_cmp->remove(_consumer->tag, _consumer->qname);DEBUG("del Channel %p", this);}// 交换机的声明与删除void declareExchange(const declareExchangeRequestPtr &req){bool ret = _vhost->declareExchange(req->ename(), req->etype(), req->durable(), req->auto_delete(), req->args());basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req){_vhost->deleteExchange(req->ename());basicResponse(true, req->rid(), req->cid());}// 队列的声明与删除void declareQueue(const declareQueueRequestPtr &req){bool ret = _vhost->declareQueue(req->qname(), req->durable(), req->exclusive(), req->auto_delete(), req->args());if (ret)_cmp->initQueueConsumer(req->qname());basicResponse(ret, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req){_cmp->destoryQueueConsumer(req->qname());_vhost->deleteQueue(req->qname());basicResponse(true, req->rid(), req->cid());}// 队列的绑定与解绑void queueBind(const queueBindRequestPtr &req){bool ret = _vhost->bind(req->ename(), req->qname(), req->bindingkey());basicResponse(ret, req->rid(), req->cid());}void queueUnbind(const queueUnbindRequestPtr &req){_vhost->unBind(req->ename(), req->qname());basicResponse(true, req->rid(), req->cid());}// 消息的发布void basicPublish(const basicPublishRequestPtr &req){// 判断交换机是否存在auto ep = _vhost->selectExchange(req->ename());if (ep == nullptr)basicResponse(false, req->rid(), req->cid());// 进行交换路由QueueBindingMap qbm = _vhost->exchangeBinding(req->ename());BasicProperties *bp = nullptr;std::string routekey;if (req->has_properties()){routekey = req->properties().routing_key();bp = req->mutable_properties();}for (auto &binding : qbm){if (Router::route(ep->type, routekey, binding.second->bindingKey)){// 将消息添加到队列中_vhost->basicPublish(binding.first, bp, req->body());// 向线程池中添加一个消息消费任务(向指定队列的订阅者推送消息)auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}basicResponse(true, req->rid(), req->cid());}// 消息的确认void basicAck(const basicAckRequestPtr &req){_vhost->basicAck(req->qname(), req->mid());basicResponse(true, req->rid(), req->cid());}// 订阅队列消息void basicConsumer(const basicConsumeRequestPtr &req){// 判断队列是否存在bool ret = _vhost->existsQueue(req->qname());if (!ret)return basicResponse(false, req->rid(), req->cid());// 创建队列消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);// 创建了消费者之后, 当前的channel角色就是个消费者_consumer = _cmp->create(req->ctag(), req->qname(), req->auto_ack(), cb);basicResponse(true, req->rid(), req->cid());}// 取消订阅void basicCancel(const basicCancelRequestPtr &req){_cmp->remove(req->ctag(), req->qname());basicResponse(true, req->rid(), req->cid());}private:void basicResponse(bool ok, const std::string &rid, const std::string &cid){basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp);}void consume(const std::string &qname){MessagePtr mp = _vhost->basicConsume(qname);if (mp == nullptr){DEBUG("%s 队列无消息", qname.c_str());return;}Consumer::ptr cp = _cmp->choose(qname);if (cp == nullptr){DEBUG("%s 队列无消费者", qname.c_str())return;}cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());DEBUG("消费者: %s", cp->tag.c_str());if (cp->auto_ack)_vhost->basicAck(qname, mp->payload().properties().id());}void callback(const std::string &tag, const BasicProperties *bp, const std::string &body){basicConsumeResponse resp;resp.set_cid(_cid);resp.set_ctag(tag);resp.set_body(body);if (bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_deliver_mode(bp->deliver_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp);}private:std::string _cid;Consumer::ptr _consumer;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _vhost;ThreadPool::ptr _pool;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}bool openChannl(const std::string &cid,const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec,const ConsumerManager::ptr &cmp,const VirtualHost::ptr &vhost,const ThreadPool::ptr &pool){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it != _channels.end()) return false;auto channel = std::make_shared<Channel>(cid, conn, codec, cmp, vhost, pool);_channels.insert(std::make_pair(cid, channel));return true;}void closeChannel(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr getChannel(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end()) return nullptr;return it->second; }private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}