当前位置: 首页 > news >正文

【仿RabbitMQ的发布订阅式消息队列】--- 客户端模块

 Welcome to 9ilk's Code World

       

(๑•́ ₃ •̀๑) 个人主页:       9ilk

(๑•́ ₃ •̀๑) 文章专栏:    项目


本篇主要是对客户端模块提供服务,客户端弱化了Client的概念,不会向用户展示网络通信的概念,而是以一种提供服务的形式来体现。我们要实现的客户端其实就是两个,一个是发布客户端(生产者),一个是订阅客户端(消费者)。

主要实现思想:一个接口实现一个功能,接口内部完成向服务端请求的过程,但是对外并不需要体现客户端与服务端通信的概念,用户需要什么服务就调用什么接口就行。

基于上面的思想,客户端可以分为四大模块:

1. 订阅者模块

  • 一个并不直接对用户展示的模块,其在客户端体现的作用就是对于角色的描述,表示这是一个消费者

2. 信道模块:

  • 一个直接面向用户的模块,内部包含多个向外提供的服务接口,用户需要什么服务,调用对应接口即可
  • 该模块包含交换机声明/删除,队列声明/删除,绑定/解绑,消息发布/确认,订阅/解除订阅等服务。

3. 连接模块:

  • 这是唯一能体现出网络通信概念的一个模块,它向用户提供的功能是用于打开/关闭信道。

4. 异步线程模块:

  • 需求1:网络通信需要包含一个网络通信IO事件监控线程模块用于进行客户端连接的IO事件监控,以便于事件触发后进行IO操作
  • 需求2:对于客户端来说,当一个信道作为消费者时,服务端会向该信道推送消息,此时用户需要对收到的消息进行不同的业务处理,而这个消息的处理需要一个异步工作线程池来完成。
  • 概括来说,该模块包含两个部分:a. 客户端连接的IO事件监控线程 b. 推送过来的消息的异步处理线程。

经过上面的分析,实现一个客户端的流程就比较容易了:

1. 实例化异步线程对象

2. 实例化连接对象

3. 通过连接对象创建信道

4. 根据信道获取自己所需的服务

5. 关闭信道

6. 关闭连接

订阅者模块

        该模块其实和服务端的消费者模块类似,当进行队列消息订阅的时候,就会创建一个订阅者对象,这个订阅者对象有如下作用:

  • 描述当前信道订阅了哪个队列的信息。
  • 描述收到消息后应该如何对这条消息进行处理。
  • 描述收到消息后是否需要进行确认回复。

订阅者信息:

(1)订阅者标识

(2)订阅队列名

(3)是否自动确认标志

(4)回调处理函数(收到消息后该如何处理的回调函数对象)

 using ConsumerCallback = std::function<void(const string&,const BasicProperties*,const string&)>;//消费者类struct Consumer{using ptr = shared_ptr<Consumer>;string _tag; //消费者标识string _qname; //消费者订阅的队列名称bool _auto_ack;//自动确认标志ConsumerCallback _callback;Consumer(){DBG_LOG("new Consumer:%p",this);}Consumer(const string& ctag,const string& qname,bool auto_ack,const ConsumerCallback& cb):_tag(ctag),_qname(qname),_auto_ack(auto_ack),_callback(cb){DBG_LOG("new Consumer:%p",this);}~Consumer(){DBG_LOG("delete Consumer:%p",this);}};

信道管理模块

        服务端的信道是为客户端的队友请求提供服务的,而客户端的接口服务是为了用户具体需要而服务的,即用户通过客户端channel的接口调用来向服务端发送对应请求,获取请求的服务。

1. 信道管理数据:

(1)信道ID

(2)信道关联的网络通信连接对象

(3)protobuf协议处理对象

(4)信道关联的消费者

(5)请求对应的响应信息map(注意用户可能收到通用响应也可能收到消息推送,这里map使用<请求id,响应>以便于查找指定的响应,而消息推送是单独处理的

(6)互斥锁&条件变量(大部分请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,比如send发送请求只是放到缓冲区,什么时候发送我们是不知道的,因此需要我们自己在收到响应后,通过判断是否等待到指定响应来进行同步

2. 信道操作:

(1)创建信道

(2)删除信道

(3)声明交换机

(4)删除交换机

(5)创建队列

(6)删除队列

(7)交换机-队列绑定

(8)交换机-队列解绑

(9)添加订阅

(10)取消订阅

(11)发布消息

(12)确认消息

    using ProtobufCodecPtr = shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr  = shared_ptr<basicCommonResponse>;class Channel{public:using ptr =  shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(uuidHelper::uuid()),_conn(conn),_codec(codec){}Channel(){}~Channel(){basicCancel();}  //声明交换机bool declareExchange(const string& ename,ExchangeType etype,bool edurable,bool eauto_delete, google::protobuf::Map<std::string, std::string>&eargs){//1.构造生命交换机的请求对象string rid = uuidHelper::uuid();declareExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_exchange_type(etype);req.set_durable(edurable);req.set_auto_delete(eauto_delete);req.mutable_args()->swap(eargs);//2.向服务器发送请求_codec->send(_conn,req);//3.等待服务器的响应basicCommonResponsePtr resp = waitResponse(rid);//4.返回响应结果return resp->ok();}//删除交换机void deleteExchange(const string& ename){string rid = uuidHelper::uuid();//1.构造删除交换机请求对象deleteExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);//2.发送给服务器_codec->send(_conn,req);//3.等待服务器响应结果basicCommonResponsePtr resp = waitResponse(rid);return;}//声明队列bool declareQueue(const string& qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string, std::string>& qargs){string rid = uuidHelper::uuid();//1.创建声明队列请求对象declareQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_exclusive(qexclusive);req.set_durable(qdurable);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);//2.发送给服务器_codec->send(_conn,req);//3.等待服务器响应basicCommonResponsePtr resp =  waitResponse(rid);//4.返回响应结果return resp->ok();}//删除队列void deleteQueue(const string& qname){string rid = uuidHelper::uuid();deleteQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);_codec->send(_conn,req);waitResponse(rid);return;}//队列-交换机绑定bool queueBind(const string& ename,const string& qname,const string& key){string rid = uuidHelper::uuid();queueBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}//队列-交换机解绑void queueUnBind(const string& ename,const string& qname){string rid = uuidHelper::uuid();queueUnBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);}//发布消息bool basicPublish(const string& ename,BasicProperties* bp,const string& body){string rid = uuidHelper::uuid();basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_body(body);if(bp != nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn,req);DBG_LOG("等待响应...")basicCommonResponsePtr resp = waitResponse(rid);DBG_LOG("获得响应...")return resp->ok();}//对指定队列的指定消息进行应答void basicAck(const string& msg_id){if(_consumer.get() == nullptr){DBG_LOG("消息确认时找不到消费者信息");return;}string rid = uuidHelper::uuid();basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->_qname);req.set_message_id(msg_id);_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);}//消息订阅bool basicConsume(const string& consumer_tag,const string& qname,bool auto_ack,const ConsumerCallback& cb){if(_consumer.get() != nullptr){DBG_LOG("当前信道已经订阅其他队列信息!");return false;}string rid = uuidHelper::uuid();//1.构造消息订阅对象basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consumer_tag(consumer_tag);req.set_queue_name(qname);req.set_auto_ack(auto_ack);//2.发送给服务器_codec->send(_conn,req);//3.等待服务器响应basicCommonResponsePtr resp = waitResponse(rid);if(resp->ok() == false){ERR_LOG("订阅队列%s消息失败",qname.c_str());return false;}//4.成功则创建消费者_consumer = make_shared<Consumer>(consumer_tag,qname,auto_ack,cb);return true;}//消息取消订阅void basicCancel(){if(_consumer.get() == nullptr)return;string rid = uuidHelper::uuid();basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consumer_tag(_consumer->_tag);req.set_queue_name(_consumer->_qname);_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);_consumer.reset();return;}//创建信道bool openChannel() {string rid = uuidHelper::uuid();openChannelRequest req;req.set_rid(rid);req.set_cid(_cid); _codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}//删除信道void closeChannel() {string rid = uuidHelper::uuid();closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid); _codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);}public:  //连接收到基础响应后 向hash_map中添加响应void putBasicReponse(const basicCommonResponsePtr& resp){unique_lock<std::mutex> lock(_mtx);_basic_resp.insert(make_pair(resp->rid(), resp));//唤醒在waitresponse()中的阻塞_cv.notify_all();}//连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr& resp){//1.先判断该信道是否是消费者if(_consumer.get() == nullptr){ERR_LOG("该信道%s没有关联消费者!",_cid.c_str());return;}//2.判断该信道关联的消费者是不是响应中对应的消费者if(_consumer->_tag != resp->consumer_tag()){ERR_LOG("该信道关联的消费者%s不是目标消费者%s!",_consumer->_tag.c_str(),resp->consumer_tag().c_str());return;}//3.调用回调函数_consumer->_callback(resp->consumer_tag(),resp->mutable_properties(),resp->body());;}string Cid(){return _cid;}private:basicCommonResponsePtr waitResponse(const string& rid){unique_lock<std::mutex> lock(_mtx);//等到hash表中有对应请求id的回复_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid) != _basic_resp.end();});//有回复了一定要先获取再从hash表中删除!basicCommonResponsePtr resp = _basic_resp[rid];_basic_resp.erase(rid);return resp;}private:std::mutex _mtx;std::condition_variable _cv;string _cid;//信道idmuduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer; //不在构造的时候实例化需要等到订阅的时候才实例化对象std::unordered_map<string, basicCommonResponsePtr> _basic_resp; //请求对应的响应信息队列<请求id,响应信息>};//信道的管理class ChannelManager{public:using ptr = shared_ptr<ChannelManager>;ChannelManager(){}~ChannelManager(){}//创建信道Channel::ptr createChannel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec){unique_lock<std::mutex> lock(_mtx);Channel::ptr channel = make_shared<Channel>(conn,codec);_channels.insert(make_pair(channel->Cid(), channel));return channel;}//删除信道void removeChannel(const string& channel_id){unique_lock<std::mutex> lock(_mtx);auto it = _channels.find(channel_id);if(it == _channels.end()) return;_channels.erase(it);}//获取信道Channel::ptr getChannel(const string& channel_id){unique_lock<std::mutex> lock(_mtx);auto it = _channels.find(channel_id);if(it == _channels.end()){DBG_LOG("没有对应信道%s!",channel_id.c_str());return Channel::ptr();}return it->second;}private:std::mutex _mtx;unordered_map<string,Channel::ptr> _channels;};

对于普通服务,主要的思路其实都是构造对应的请求,然后发送给服务端等待响应,由于muduo库的接口是异步的,因此需要互斥锁&&条件变量来保证同步等待响应,因此我们统一封装了等待响应的接口waitresponse(),同时还要提供接口putBasicReponse()方便到时响应到来时,上层(连接模块)唤醒阻塞线程获取响应。

          //连接收到基础响应后 向hash_map中添加响应void putBasicReponse(const basicCommonResponsePtr& resp){unique_lock<std::mutex> lock(_mtx);_basic_resp.insert(make_pair(resp->rid(), resp));//唤醒在waitresponse()中的阻塞_cv.notify_all();}basicCommonResponsePtr waitResponse(const string& rid){unique_lock<std::mutex> lock(_mtx);//等到hash表中有对应请求id的回复_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid) != _basic_resp.end();});//有回复了一定要先获取再从hash表中删除!basicCommonResponsePtr resp = _basic_resp[rid];_basic_resp.erase(rid);return resp;}

而对于消息推送,如果连接收到消息推送的响应,此时上层(Connection模块)就需要传达给信道进行处理,因此信道还要提供个接口调用判断自身是不是消费者,然后调用回调处理(这个回调是需要用户自己设置的):

  //连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr& resp){//1.先判断该信道是否是消费者if(_consumer.get() == nullptr){ERR_LOG("该信道%s没有关联消费者!",_cid.c_str());return;}//2.判断该信道关联的消费者是不是响应中对应的消费者if(_consumer->_tag != resp->consumer_tag()){ERR_LOG("该信道关联的消费者%s不是目标消费者%s!",_consumer->_tag.c_str(),resp->consumer_tag().c_str());return;}//3.调用回调函数_consumer->_callback(resp->consumer_tag(),resp->mutable_properties(),resp->body());;}

异步工作线程模块

        客户端这边存在两个异步工作线程,一个是muduo库中客户端连接的异步循环线程EventLoopThread,一个是当收到消息后进行异步处理的工作线程池。需要注意的是,这两项都不是以连接为单元进行创建的,而是创建后,可以用于多个连接中,因此单独进行封装。

  class AsyncWorker{public:using ptr = shared_ptr<AsyncWorker>;muduo::net::EventLoopThread _loopthread;ThreadPool _pool;};class ThreadPool
{
public:using ptr = shared_ptr<ThreadPool>;using Functor = std::function<void(void)>;ThreadPool(int thread_count = 1):_stop(false){for(int  i = 0 ; i < thread_count ; i++)_threads.emplace_back(&ThreadPool::ThreadEntry,this);}~ThreadPool(){stop();}//push传入的首先是一个用户要执行的函数,再是不定参,表示要处理的数据也就是要//传入函数的参数,push函数内部会将这个传入函数封装成一个异步任务(packaged_task)//返回future对象方便用户获取异步执行结果template<typename F,typename... Args>auto push(F&& func,Args&& ... args) ->std::future<decltype(func(args...))>{//1.将传入的函数封装成packaged_task任务using return_type = decltype(func(args...));auto task = std::bind(std::forward<F>(func),std::forward<Args>(args)...);auto ptask = std::make_shared<std::packaged_task<return_type()>>(task);std::future<return_type> fu = ptask->get_future();//2.构造一个lambda匿名函数,函数内部执行packaged_task任务{unique_lock<std::mutex> lock(_mtx);//3.将构造出的匿名函数对象抛入任务池中_tasks.push_back([ptask](){(*ptask)();});//唤醒工作线程执行任务_cv.notify_all();}return fu;}
private:void stop() //停止运行{_stop = true;//唤醒线程_cv.notify_all();//回收线程for(int i = 0 ; i < _threads.size() ; i++)_threads[i].join();}    void ThreadEntry() //内部不断从任务池中取出任务并执行{while(!_stop){vector<Functor> tmp_tasks;{unique_lock<std::mutex> lock(_mtx);//任务池有数据/stop设置为true_cv.wait(lock,[this](){return _stop || !_tasks.empty();});tmp_tasks.swap(_tasks);}//执行任务for(auto& task : tmp_tasks){task();}}}    private:std::mutex _mtx;std::condition_variable _cv;///  注意互斥锁和条件变量要在线程之前初始化好,否则线程先运行但用到他们就会崩溃std::atomic<bool> _stop;vector<thread> _threads;vector<Functor> _tasks;
};

连接管理模块

        RabbitMQ弱化了客户端的概念,因为用户所需服务都是通过信道提供的,因此主要操作流程是创建连接->创建信道->使用信道提供服务。该模块同样是针对muduo库客户端的二次封装(与服务端不同的是,客户端只需要向服务发起一个连接,服务端需要接受多个连接),该模块主要就是向用户提供创建channel信道的接口,创建信道之后,拿着信道对象使用服务。

成员如下:

  1. 实现同步的CountDownLatch对象
  2. 客户端对应的muduo库连接对象
  3. TcpClient对象
  4. 请求分发器dispatcher
  5. 协议处理器codec
  6. 异步线程模块asyncWorker
  7. 信道管理句柄_channel_manager

提供服务:

  1. 创建信道
  2. 关闭信道
  3. 收到服务器发送的响应进行处理的回调

  4. 收到服务端推送过来的消息的回调

    class Connection{public:using ptr = shared_ptr<Connection>;using basicCommonResponsePtr = shared_ptr<basicCommonResponse>;using basicConsumeResponsePtr = shared_ptr<basicConsumeResponse>;typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //这里不要忘了Connection(const string& server_ip,int port,const AsyncWorker::ptr &worker):_latch(1),_client(worker->_loopthread.startLoop(),muduo::net::InetAddress(server_ip,port),"MsgQueue_client"),_dispatcher(std::bind(&Connection::onUnknownMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()) {//为请求设置回调_dispatcher.registerMessageCallback<zmq::basicCommonResponse>(std::bind(&Connection::onCommonResponse,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<zmq::basicConsumeResponse>(std::bind(&Connection::onConsumResponse,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));//设置连接建立和收到消息的回调_client.setConnectionCallback(std::bind(&Connection::onConnection,this,std::placeholders::_1));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));//2.连接服务器 --- 需要阻塞等待连接建立成功之后返回_client.connect(); //异步直接返回_latch.wait(); //阻塞等待直到连接建立成功DBG_LOG("阻塞结束,连接建立成功%p",_conn.get());}    public:Channel::ptr openChannel(){Channel::ptr channel = _channel_manager->createChannel(_conn,_codec);bool ret = channel->openChannel();if(ret == false){ERR_LOG("打开信道失败!");return Channel::ptr();}DBG_LOG("打开信道%p成功!",channel.get());return channel;}void closeChannel(const Channel::ptr& channel){channel->closeChannel();_channel_manager->removeChannel(channel->Cid());}   private:bool send(const google::protobuf::Message* message) //序列化之后再发送 前提是连接状态正常{if(_conn->connected()) //连接状态正常再发送否则返回false{_codec->send(_conn,*message);return true;}return false;}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp){LOG_INFO << "onUnknownMessage:" << message->GetTypeName();//关闭连接conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){//应该放在唤醒之前赋值 因为muduo库的事件循环在一个线程中执行onConnection回调//构造函数在另一个线程等待,当countDown()被调用时等待线程会被立即唤醒并继续执行//但是此时onConnection函数可能还没执行完_conn = conn这一行_conn = conn;  _latch.countDown();//唤醒主线程中的线程 //_conn应该放前面还是?DBG_LOG("连接建立成功:%p",_conn.get());}else //关闭连接{_conn.reset();DBG_LOG("连接建立失败..");}}void onCommonResponse(const muduo::net::TcpConnectionPtr& conn,const basicCommonResponsePtr& message,muduo::Timestamp){//1.找到信道Channel::ptr channel = _channel_manager->getChannel(message->cid());if(channel.get() == nullptr){ERR_LOG("未找到信道信息");return;}//2.添加到hashmap中channel->putBasicReponse(message);}void onConsumResponse(const muduo::net::TcpConnectionPtr& conn,const basicConsumeResponsePtr& message,muduo::Timestamp){//1.找到信道Channel::ptr channel = _channel_manager->getChannel(message->cid());if(channel.get() == nullptr){ERR_LOG("未找到信道信息");return;}//2.封装消息处理任务 抛入线程池中_worker->_pool.push([channel,message](){channel->consume(message);}); }private: muduo::CountDownLatch _latch;  //实现同步的muduo::net::TcpConnectionPtr _conn;muduo::net::TcpClient _client;ProtobufDispatcher _dispatcher;//请求分发ProtobufCodecPtr _codec;//协议处理//codec要用到dispatcher因此分发器要先声明AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager; };

生产者客户端实现

    //1.实例化异步工作线程对现场zmq::AsyncWorker::ptr awp = make_shared<zmq::AsyncWorker>();//2.实例化连接对象zmq::Connection::ptr conn = make_shared<zmq::Connection>("127.0.0.1",8085,awp);//3.通过连接创建信道zmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需//4.1声明一个交换机 exchange1,交换机类型为广播模式google::protobuf::Map<string,string> tmp_map;channel->declareExchange("exchange1",zmq::ExchangeType::TOPIC,true,false,tmp_map);//4.2声明两个队列channel->declareQueue("queue1",true,false,false,tmp_map);channel->declareQueue("queue2",true,false,false,tmp_map);//4.3绑定 queue1-exchange1 且bing_key设置为queue1channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5. 循环向交换机发布消息for(int i = 0 ; i < 10 ; i++){//广播// channel->basicPublish("exchange1",nullptr,"Hello Queue-"+to_string(i));//直接zmq::BasicProperties bp;bp.set_id(zmq::uuidHelper::uuid());bp.set_delivery_mode(zmq::DeliveryMode::DURABLE);bp.set_routing_key("queue1");channel->basicPublish("exchange1",&bp,"Hello Queue-"+to_string(i));}zmq::BasicProperties bp;bp.set_id(zmq::uuidHelper::uuid());bp.set_delivery_mode(zmq::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");channel->basicPublish("exchange1",&bp,"Hello pop");bp.set_id(zmq::uuidHelper::uuid());bp.set_delivery_mode(zmq::DeliveryMode::DURABLE);bp.set_routing_key("news.sport");channel->basicPublish("exchange1",&bp,"Hello music");//6.关闭信道conn->closeChannel(channel);

消费者客户端

 //1.实例化异步工作线程对现场zmq::AsyncWorker::ptr awp = make_shared<zmq::AsyncWorker>();//2.实例化连接对象zmq::Connection::ptr conn = make_shared<zmq::Connection>("127.0.0.1",8085,awp);//3.通过连接创建信道zmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需//4.1声明一个交换机 exchange1,交换机类型为广播模式google::protobuf::Map<string,string> tmp_map;channel->declareExchange("exchange1",zmq::ExchangeType::TOPIC,true,false,tmp_map);//4.2声明两个队列channel->declareQueue("queue1",true,false,false,tmp_map);channel->declareQueue("queue2",true,false,false,tmp_map);//4.3绑定 queue1-exchange1 且bing_key设置为queue1channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//6.订阅队列消息auto functor = std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);channel->basicConsume("consumer1",argv[1],false,functor);//7.关闭信道while(1){std::this_thread::sleep_for(std::chrono::seconds(3));}conn->closeChannel(channel);

  

http://www.dtcms.com/a/561139.html

相关文章:

  • python 初学 3 --字符串编码
  • 企网站建设比价网站怎么做的
  • Linux磁盘性能优化:文件系统选择与挂载参数调整(附案例)
  • 如何建设网站首页网站备案照
  • “RAG简单介绍
  • Spring_cloud(1)
  • 终结Linux系统崩溃——Aptitude:以搜狗输入法与fcitx/ibus依赖冲突的终极解决方案为例
  • 关于 ComfyUI 的 Windows 本地部署系统环境教程(详细讲解Windows 10/11、NVIDIA GPU、Python、PyTorch环境等)
  • 网站开发包含什么百度手机
  • 部门网站建设管理典型经验材料广东住房和城乡建设厅官方网站
  • PHP 基金会宣布:Streams 现代化 将引入事件循环与异步新能力
  • 网站建设武清wordpress 朋友圈
  • 后端八股之消息队列
  • Segment Anything: SAM SAM2
  • Oracle Linux 9 的 MySQL 8.0 完整安装与远程连接配置
  • 剑三做月饼活动网站网站制作公司司
  • 网站建设推广公司排名钓鱼链接生成器
  • 十字链表和邻接多重表
  • 中国排建设银行悦生活网站企业网站制作 深圳
  • Vue过度与动画
  • 陕西省高速建设集团公司网站商业空间设计书籍
  • 【快速入门】JMeter
  • 建立网站的基本条件免费广州网站开发维护
  • 【每天一个AI小知识】:什么是Prompt?
  • pytest核心用法
  • Linux下的简单进度条程序
  • 【ComfyUI】Stable Zero123 单图生成3D视图
  • 今日策略:年化398%,回撤11%,夏普5.0 | 金融量化多智能体架构方案
  • 16-Redis 消息通知实战指南:任务队列与发布订阅模式全解析
  • 江苏网站建设效果个人微信小程序免费制作