当前位置: 首页 > news >正文

服务端实现

服务端实现

代码如下:

#ifndef __M_BROKER_H__
#define __M_BROKER_H__
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "../mqcommon/mq_threadpool.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_logger.hpp"
#include "mq_connection.hpp"
#include "mq_consumer.hpp"
#include "mq_host.hpp"namespace xypmq
{#define DBFILE "/meta.db"#define HOSTNAME "MyVirtualHost"class Server{public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Server(int port,const std::string &basedir):_server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"Server",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<threadpool>()){//针对历史消息中的所有队列,别忘了,初始化队列的消费者管理结构QueueMap qm=_virtual_host->allQueues();for(auto &q:qm){_consumer_manager->initQueueConsumer(q.first);}//注册业务请求函数_dispatcher.registerMessageCallback<xypmq::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<xypmq::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection,this,std::placeholders::_1));}void start(){_server.start();_baseloop.loop();}private://打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp){Connection::ptr mconn=_connection_manager->getConnection(conn);if(mconn.get()==nullptr){DLOG("打开信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->openChannel(message);}//关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->closeChannel(message);}//声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp=mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明交换机时,没有找到信道!");return;}return cp->declareExchange(message);}//删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除交换机时,没有找到信道!");return;}return cp->deleteExchange(message);}//声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明队列时,没有找到信道!");return;}return cp->declareQueue(message);}//删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除队列时,没有找到信道!");return;}return cp->deleteQueue(message);}//队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列绑定时,没有找到信道!");return;}return cp->queueBind(message);}//队列解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列解除绑定时,没有找到信道!");return;}return cp->queueUnBind(message);}//消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("发布消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("发布消息时,没有找到信道!");return;}return cp->basicPublish(message);}//消息确认void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("确认消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("确认消息时,没有找到信道!");return;}return cp->basicAck(message);}//队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息订阅时,没有找到信道!");return;}return cp->basicConsume(message);}//队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到信道!");return;}return cp->basicCancel(message);}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message,muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}   void onConnection(const muduo::net::TcpConnectionPtr&conn){if(conn->connected()){_connection_manager->newConnection(_virtual_host,_consumer_manager,_codec,conn,_threadpool);}else{_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;//服务器对象ProtobufDispatcher _dispatcher;//请求分发器对象,要想其中注册请求函数ProtobufCodecPtr _codec;//protobuf协议处理器,针对收到请求的数据进行protobuf协议处理VirtualHost::ptr _virtual_host;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;threadpool::ptr _threadpool;};
}
#endif
http://www.dtcms.com/a/393089.html

相关文章:

  • 深入AQS源码:解密Condition的await与signal
  • ceph存储配置大全
  • 数据库造神计划第十六天---索引(1)
  • 【软件推荐】免费图片视频管理工具,让灵感库告别混乱
  • C语言入门教程 | 阶段二:循环语句详解(while、do...while、for)
  • GEO(Generative Engine Optimization)完全指南:从原理到实践
  • Msyql日期时间总结
  • IP地址入门基础
  • 【ROS2】Beginner: CLI tools
  • LeetCode刷题记录----279.完全平方数(Medium)
  • H7-TOOL的250M示波器模组采集CANFD标准波形效果,开口逻辑0,闭口逻辑1
  • 打工人日报#20250920
  • 详解C/C++内存管理
  • SSM(springboot部分)
  • C++ std:string和Qt的QString有哪些差异?
  • FunASR开源项目实战:解锁语音识别新姿势
  • (华为杯)数学建模比赛编程助手
  • 通义千问对postgresql wire协议的连接和执行SQL过程的解释
  • 钣金折弯机被远程锁机了怎么办
  • 基于陌讯AIGC检测算法的高性能部署实践:FastAPI与多进程并发设计详解
  • 群晖 NAS 远程访问痛点解决:神卓 N600 公网 IP 盒实战体验
  • JavaWeb之HttpServletRequest与HttpServletResponse详解及快递管理系统实践
  • Git详细介绍
  • 大话计算机网络(上)
  • JVM方法调用机制深度解析:从aload_1到invokevirtual的完整旅程
  • STM32CubeIDE学习——安装
  • 追觅宣布进军手机市场,已经白热化的手机赛道追觅优势何在?
  • AI智能体开发工作流的成功案例分享及思路
  • 【算法基础】String、Hash 与 Stack
  • 使用springboot开发一个宿舍管理系统练习项目