当前位置: 首页 > news >正文

宁夏建设工程招投标管理中心网站工程建设标准化期刊网站

宁夏建设工程招投标管理中心网站,工程建设标准化期刊网站,wordpress获取分类列表标题,电销客户数据怎么买目录 创建 MQBrokerServer 服务器模块我们借助 Muduo 网络库来实现。 _server:Muduo 库提供的一个通用 TCP 服务器, 我们可以封装这个服务器进行TCP 通信。_baseloop:主事件循环器, 用于响应 IO 事件和定时器事件&…

 

目录

创建 MQBrokerServer 


       服务器模块我们借助 Muduo 网络库来实现。        

  • _server:Muduo 库提供的一个通用 TCP 服务器, 我们可以封装这个服务器进行TCP 通信。
  • _baseloop:主事件循环器, 用于响应 IO 事件和定时器事件,主 loop 主要是为了响应监听描述符的 IO 事件。
  • _codec: 一个 protobuf 编解码器, 我们在 TCP 服务器上设计了一层应用层协议,这个编解码器主要就是负责实现应用层协议的解析和封装。
  • _dispatcher:一个消息分发器, 当 Socket 接收到一个报文消息后, 我们需要按照消息的类型, 即上面提到的 typeName 进行消息分发, 会不不同类型的消息分发相对应的的处理函数中。
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步工作线程池,主要用于队列消息的推送工作。
  • _connections: 连接管理句柄,管理当前服务器上的所有已经建立的通信连接。
  • _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理。

创建 MQBrokerServer 

        BrokerServer 模块是对整体服务器所有模块的整合,接收客户端的请求,并提供服务。基于前边实现的简单的翻译服务器代码,进行改造,只需要实现服务器内部提供服务的各个业务接口即可。
        在各个业务处理函数中,也比较简单,创建信道后,每次请求过来后,找到请求对应的信道句柄,通过句柄调用前边封装好的处理接口进行请求处理,最终返回处理结果。

#pragma once
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include "connection.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"#include <iostream>
#include <unistd.h>
#include <unordered_map>namespace jiuqi
{#define DBFILE "/meta.db"#define VHOST "vhost"class BrokerServer{public:using MessagePtr = std::shared_ptr<google::protobuf::Message>;BrokerServer(int port, const std::string &basicDir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&BrokerServer::onUnknownMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_vhost(std::make_shared<VirtualHost>(VHOST, basicDir, basicDir + DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<ThreadPool>()){QueueMap qm = _vhost->allqueue();for (auto &queue : qm){_consumer_manager->initQueueConsumer(queue.first);}// 注册请求处理函数_dispatcher.registerMessageCallback<jiuqi::openChannelRequest>(std::bind(&BrokerServer::onOpenChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::closeChannelRequest>(std::bind(&BrokerServer::onCloseChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::declareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::deleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::declareQueueRequest>(std::bind(&BrokerServer::onDeclareQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::deleteQueueRequest>(std::bind(&BrokerServer::onDeleteQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::queueBindRequest>(std::bind(&BrokerServer::onQueueBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::queueUnbindRequest>(std::bind(&BrokerServer::onQueueUnbind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::basicPublishRequest>(std::bind(&BrokerServer::onBasicPublish, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::basicAckRequest>(std::bind(&BrokerServer::onBasicAck, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::basicConsumeRequest>(std::bind(&BrokerServer::onBasicConsume, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::basicCancelRequest>(std::bind(&BrokerServer::onBasicCancel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));}void start(){_server.start();_baseloop.loop();}private:// 打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr &conn,const openChannelRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("打开信道时,没有找到对应的连接对象");conn->shutdown();return;}mconn->openChannel(message);}// 关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr &conn,const closeChannelRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("关闭信道时,没有找到对应的连接对象");conn->shutdown();return;}mconn->closeChannel(message);}// 声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn,const declareExchangeRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("声明交换机时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("声明交换机时,没有找到对应的信道");return;}channel->declareExchange(message);}// 删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn,const deleteExchangeRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("删除交换机时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("删除交换机时,没有找到对应的信道");return;}channel->deleteExchange(message);           }// 声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn,const declareQueueRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("声明队列时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("声明队列时,没有找到对应的信道");return;}channel->declareQueue(message);            }// 删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn,const deleteQueueRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("删除队列时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("删除队列时,没有找到对应的信道");return;}channel->deleteQueue(message);                  }// 队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr &conn,const queueBindRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("队列绑定时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("队列绑定时,没有找到对应的信道");return;}channel->queueBind(message);                 }// 队列解绑void onQueueUnbind(const muduo::net::TcpConnectionPtr &conn,const queueUnbindRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("队列解绑时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("队列解绑时,没有找到对应的信道");return;}channel->queueUnbind(message);              }// 消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr &conn,const basicPublishRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("消息发布时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("消息发布时,没有找到对应的信道");return;}channel->basicPublish(message);               }// 消息确认void onBasicAck(const muduo::net::TcpConnectionPtr &conn,const basicAckRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("消息确认时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("消息确认时,没有找到对应的信道");return;}channel->basicAck(message);                   }// 队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr &conn,const basicConsumeRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("队列消息订阅时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("队列消息订阅时,没有找到对应的信道");return;}channel->basicConsumer(message);                  }// 队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr &conn,const basicCancelRequestPtr &message,muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr) {DEBUG("队列消息取消订阅时,没有找到对应的连接对象");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel == nullptr){DEBUG("队列消息取消订阅时,没有找到对应的信道");return;}channel->basicCancel(message);                 }void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message,muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_connection_manager->newConnection(conn, _codec, _consumer_manager, _vhost, _threadpool);LOG_INFO << "连接建立成功";}else{_connection_manager->deleteConnection(conn);LOG_INFO << "连接关闭";}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;  // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数ProtobufCodecPtr _codec;        // protobuf协议处理器--针对收到的请求数据进行prototo协议处理VirtualHost::ptr _vhost;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;ThreadPool::ptr _threadpool;};
}

http://www.dtcms.com/a/520067.html

相关文章:

  • 网站建设模板ppt模板微信公众网站开发
  • ElasticSearch倒排索引、ES核心概念、JAVA集成ES操作
  • window安装Elasticsearch(es)
  • 【AI编程实战】零基础用ChatGPT+Cursor开发完整Web应用:30分钟从idea到上线
  • 亚马逊网站建设评述wordpress php环境
  • 网站收录是什么意思最新网站网址永久发布
  • MySQL的增删改查
  • 反无人机蜂群杀伤链动态构建策略研究
  • GCC /Clang __attribute__
  • 阮一峰《TypeScript 教程》学习笔记——Enum 类型
  • 人工只能综合项目开发8---手势识别data_processing
  • C primer plus (第六版)第十一章 编程练习第13题
  • 网站被k申述泉州专业网站建设公司
  • FLUMINER福禄T3 115T挖矿机深度评测:智能管理与高效性能如何平衡?
  • 怎么调网站兼容性公益网站怎么做
  • 压缩与缓存调优实战指南:从0到1根治性能瓶颈(四)
  • 嵌入式软件架构--显示界面架构(工厂流水线模型,HOME界面,命令界面)
  • Ubuntu20.04 + QT5.14.2 + Android23的开发平台搭建总结
  • 【思维链条CoT与React模式深度解析】AI智能体的核心推理框架
  • svchost第一个是rpcss第二个是termsvcs第三个是NetworkService第四个是LocalService第五个是netsvcs----备忘
  • 餐饮网站模板免费下载jetpack wordpress
  • Hadoop High Availability 简介
  • Tier 1 供应商EDI对接:Forvia EDI需求分析
  • 2025最新策略答案引擎优化(AEO):在AI搜索引擎中获得更多曝光
  • SpringAI Redis RAG 搜索
  • 服务器和域名都有了 怎么做网站网站seo诊断分析报告
  • SpringBoot的Web开发
  • 基于springboot的大创管理系统开发与设计
  • GitHub 热榜项目 - 日榜(2025-10-23)
  • RAG:让大模型“既懂又查”的智能系统