ZeroMQ基础
bind和connect
数据流的终点(或中心节点)用 bind()
数据流的起点或中间节点用 connect()
bind()
是“我在等你联系我”
connect()
是“我去联系你”服务端(Server/Listener)用
bind()
客户端(Client/Caller)用
connect()
1.一对一REP-REQ
服务端
#include <zmq.hpp> // ZeroMQ C++ 绑定头文件(cppzmq)
#include <iostream> // 标准输入输出流(用于打印日志)
#include <chrono> // 时间库(用于 sleep)
#include <thread> // 线程库(配合 chrono 实现延时)
#include <cstring> // 必须包含!为了使用 memcpy 函数/*** 主函数:ZeroMQ REP 服务器* 功能:等待客户端连接,接收请求,延时1秒后返回响应*/
int main() {// =================== 1. 创建 ZeroMQ 上下文 ===================// 上下文是 ZeroMQ 的核心对象,管理所有套接字和 I/O 线程// 参数 1 表示使用 1 个 I/O 线程(对于简单应用足够)zmq::context_t context(1);// =================== 2. 创建 REP 类型套接字 ===================// ZMQ_REP = "应答模式",必须严格遵循 "recv → send → recv → send" 顺序// 客户端是 ZMQ_REQ,必须 "send → recv → send → recv"zmq::socket_t socket(context, ZMQ_REP);// =================== 3. 绑定监听地址 ===================// 绑定到所有网卡的 5555 端口,允许任何客户端连接// 格式:"tcp://IP:端口"// "0.0.0.0" 表示监听所有网络接口(等价于旧版的 "*")socket.bind("tcp://0.0.0.0:5555");// 启动提示std::cout << "✅ 服务器启动成功,等待客户端请求...(按 Ctrl+C 退出)" << std::endl;// =================== 4. 主循环:持续接收并响应客户端 ===================while (1) {// 创建一个空消息对象,用于接收客户端发来的数据zmq::message_t request;// 尝试接收客户端消息(阻塞式调用,直到收到消息)// zmq::recv_flags::none 表示无特殊标志(默认行为)zmq::recv_result_t result = socket.recv(request, zmq::recv_flags::none);// 检查接收是否成功(新版 cppzmq 支持直接 if(!result) 判断)if (!result) {// 如果接收失败(如连接断开、超时等),打印错误并跳过本次循环std::cerr << "❌ 接收消息失败,跳过本次请求..." << std::endl;continue; // 跳过,继续等待下一个请求}// =================== 5. 解析接收到的消息 ===================// 将二进制消息转换为 std::string// static_cast<char*>:将 void* 数据转为 char*// request.size():获取消息长度(字节数)std::string message_str(static_cast<char*>(request.data()), request.size());//std::string message_str=request.to_string();//相当于这一句// 打印收到的请求内容std::cout << "📩 收到请求: " << message_str << std::endl;// =================== 6. 模拟业务处理耗时 ===================// 暂停 1 秒,模拟服务器处理请求的时间(如数据库查询、计算等)std::this_thread::sleep_for(std::chrono::seconds(1));// =================== 7. 构造响应消息 ===================// 构造回复内容:在原始消息前后添加文字// 注意:你原代码是 "对 #"+message_str+"响应",建议加空格更清晰std::string response = "对 #" + message_str + " 的响应";// 创建消息对象,指定缓冲区大小(字节数)zmq::message_t reply(response.size());// 将 std::string 的内容复制到 ZeroMQ 消息缓冲区中// memcpy(目标地址, 源地址, 字节数)memcpy(reply.data(), response.c_str(), response.size());// =================== 8. 发送响应给客户端 ===================// 发送 reply 消息,无特殊标志// (可选)你也可以检查发送是否成功:// auto send_result = socket.send(reply, zmq::send_flags::none);socket.send(reply, zmq::send_flags::none);// 提示响应已发送(可选)std::cout << "📤 已发送响应: " << response << std::endl;}// 理论上不会执行到这里(因为 while(1) 死循环)return 0;
}
客户端
#include <zmq.hpp> // ZeroMQ C++ 绑定头文件(cppzmq)
#include <iostream> // 标准输入输出(用于打印日志)
#include <cstring> // 必须包含!为了使用 memcpy 函数/*** 主函数:ZeroMQ REQ 客户端* 功能:连接到服务器,发送5次请求,每次等待响应*/
int main() {// =================== 1. 创建 ZeroMQ 上下文 ===================// 上下文是 ZeroMQ 的核心对象,管理所有套接字和 I/O 线程// 参数 1 表示使用 1 个 I/O 线程(对于简单客户端足够)zmq::context_t context(1);// =================== 2. 创建 REQ 类型套接字 ===================// ZMQ_REQ = "请求模式",必须严格遵循 "send → recv → send → recv" 顺序// 不能连续发送两次,必须先收到响应才能发下一次zmq::socket_t socket(context, ZMQ_REQ);// =================== 3. 连接到服务器 ===================// 客户端使用 connect() 连接到服务器的地址// 服务器必须已经在 "tcp://localhost:5555" 上 bind() 并等待连接socket.connect("tcp://localhost:5555");// 启动提示std::cout << "✅ 客户端已连接,正在发送请求..." << std::endl;// =================== 4. 循环发送 5 次请求 ===================for (int i = 0; i < 5; i++) {// ---------------- 构造请求消息 ----------------std::string request = "请求" + std::to_string(i); // 拼接字符串:"请求0", "请求1", ...std::cout << "📤 发送请求: " << request << std::endl;// ---------------- 发送请求 ----------------// 创建一个 ZeroMQ 消息对象,指定缓冲区大小(字节数)zmq::message_t req_msg(request.size());// 将 std::string 的内容复制到 ZeroMQ 消息缓冲区中// memcpy(目标地址, 源地址, 字节数)// request.c_str() 返回 const char*,指向字符串内容memcpy(req_msg.data(), request.c_str(), request.size());// 发送消息,无特殊标志// ZMQ_REQ 模式下,必须先 send,然后才能 recvsocket.send(req_msg, zmq::send_flags::none);// ---------------- 接收服务器响应 ----------------zmq::message_t reply; // 用于接收响应消息// 阻塞等待服务器返回响应// zmq::recv_result_t 是一个可选类型(类似 std::optional),表示是否成功接收zmq::recv_result_t result = socket.recv(reply, zmq::recv_flags::none);// 检查接收是否成功if (!result) {// 如果接收失败(如超时、断开连接等),打印错误并跳过本次std::cerr << "❌ 接收响应失败!跳过本次..." << std::endl;continue; // 继续下一次循环(如果可能)}// ---------------- 解析响应消息 ----------------// 将 ZeroMQ 消息中的二进制数据转换为 std::string// reply.data() 返回 void*,需转换为 char* 才能构造字符串std::string reply_str(static_cast<char*>(reply.data()), reply.size());// 打印收到的响应内容std::cout << "📩 收到响应: " << reply_str << std::endl;// 可选:添加小延迟,避免太快(非必须)// std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 所有请求处理完成std::cout << "✅ 客户端已完成所有请求,程序退出。" << std::endl;return 0;
}
2.发布订阅PUB-SUB
ZeroMQ的订阅发布模式是一种单向的数据发布,当客户端向服务端订阅消息之后,服务端便会将产生的消息源源不断的推送给订阅者。
特点:一个发布者,多个订阅者,即1:n;
当发布者数据变化时发布数据,所有订阅者均能够接收到数据并处理。这就是发布/订阅模式。
注意:使用SUB设置一个订阅时,必须使用zmq_setsockopt( )对消息进行过滤;
发布者使用PUB套接字将消息发送到队列中,订阅者使用SUB套接字从队列中源源不断的iesho9u消息。新的订阅者可以随时加入,但之前的消息是无法接收到的;已有的订阅者可以随时退出;订阅者还可以添加“过滤器”用来有选择性的接收信息。
PUB
#include<zmq.hpp>
#include<iostream>
#include<vector>
#include<random>
#include<cstring>
#include<unistd.h>int main(){//上下文和套接字zmq::context_t context(1);zmq::socket_t publisher(context,ZMQ_PUB);//绑定多个节点publisher.bind("tcp://127.0.0.1:5556");publisher.bind("ipc://weather.ipc");std::cout<<"发布者启动"<<std::endl;// 生成随机数据std::random_device rd;//随机种子std::mt19937 gen(rd());std::uniform_int_distribution<> dist_temp(0,40);//随机温度std::vector<std::string> weather_conditions={"晴天", "阴天", "雨天", "多云"};std::uniform_int_distribution<> dist_weather(0,3);while(1){int tem=dist_temp(gen);std::string weather =weather_conditions[dist_weather(gen)];std::string topic="1001";std::string separator=" ";std::string message_str=topic+separator+weather+separator+std::to_string(tem);std::cout<<"发布消息:"<<message_str<<std::endl;//发送消息zmq::message_t message(message_str.size());std::memcpy(message.data(),message_str.data(),message_str.size());publisher.send(message,zmq::send_flags::none);sleep(1);}return 0;
}
SUB
#include<zmq.hpp>
#include<iostream>int main(){// 创建上下文和订阅者套接字zmq::context_t context(1);zmq::socket_t subscriber(context, ZMQ_SUB);// 连接到发布者// subscriber.connect("tcp://localhost:5556");subscriber.connect("ipc://weather.ipc");//订阅主题subscriber.set(zmq::sockopt::subscribe,"1001");// 4表示字符串长度std::cout<<"订阅启动"<<std::endl;while(1){// 接收消息zmq::message_t message;(void)subscriber.recv(message,zmq::recv_flags::none);// 处理消息,转为字符std::string msg_str(static_cast<char*>(message.data()),message.size());std::cout<<"接收消息:"<<msg_str<<std::endl;}return 0;
}
3.推拉模式PUSH-PULL
推拉模式,PUSH发送,send。PULL方接收,recv。PUSH可以和多个PULL建立连接,PUSH发送的数据被顺序发送给PULL方。比如你PUSH和三个PULL建立连接,分别是A,B,C。PUSH发送的第一数据会给A,第二数据会给B,第三个数据给C,第四个数据给A。一直这么循环。
- 最上面是产生任务的 分发者 ventilator
- 中间是执行者 worker
- 下面是收集结果的接收者 sink
PUSH

PULL
ventilator.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>
#include <chrono>
#include <thread>int main(){// 上下文和套接字zmq::context_t context(1);zmq::socket_t ventilator(context, ZMQ_PUSH);// 绑定节点ventilator.bind("tcp://127.0.0.1:6666");std::cout<<"发送者启动,等待执行者连接..."<<std::endl;std::cin.get();std::cout<<"发送任务给执行者..."<<std::endl;// 等待接收者连接while(1){const std::string msg="Hello World";zmq::message_t message(msg.size());//创建消息对象,大小为msg.size()std::memcpy(message.data(), msg.data(), msg.size());//将msg的数据复制到message中,size()表示消息的大小ventilator.send(message,zmq::send_flags::none);//发送消息,send_flags::none表示没有特殊的发送选项std::cout<<"发送消息:"<<msg<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));//每隔1秒发送一次消息}ventilator.close();context.close();return 0;
}
worker.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>
#include <cstring>int main(){// 上下文和套接字zmq::context_t context(1);// 创建PULL套接字,从上游接受消息zmq::socket_t worker(context, ZMQ_PULL);worker.connect("tcp://127.0.0.1:6666");// 创建PUSH套接字,向下游发送消息zmq::socket_t sink(context, ZMQ_PUSH);sink.connect("tcp://127.0.0.1:6667");std::cout<<"执行者启动,等待发送者连接..."<<std::endl;while(1){// 接收消息zmq::message_t request;//创建消息对象(void)worker.recv(request, zmq::recv_flags::none);//接收消息std::string msg_str(static_cast<char*>(request.data()),request.size());//将消息数据转换为字符串std::cout<<"worker接收消息:"<<msg_str<<std::endl;const std::string reply_msg="Yeah!";// 处理消息并转发zmq::message_t reply(reply_msg.size());std::memcpy(reply.data(), reply_msg.data(), reply_msg.size());sink.send(reply,zmq::send_flags::none);std::cout<<"worker转发消息:"<<reply_msg<<std::endl;}return 0;
}
sink.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>int main(){// 上下文和套接字zmq::context_t context(1);zmq::socket_t sinker(context, ZMQ_PULL);// 连接节点sinker.bind("tcp://127.0.0.1:6667");std::cout<<"接收者启动,等待执行者连接..."<<std::endl;while(1){//接受消息zmq::message_t request;(void)sinker.recv(request, zmq::recv_flags::none);std::string msg_str(static_cast<char*>(request.data()),request.size());std::cout<<"接收者接收消息:"<<msg_str<<std::endl;}return 0;
}