当前位置: 首页 > news >正文

使用ZMQ和protobuf实现C++程序与Python程序的通信

文章目录

      • 背景
      • 一 应用场景与需求
      • 二 Protobuf: 跨语言数据交换的基石
      • 三 通信方案 ZMQ (ZeroMQ) —— 高性能消息中间件
      • 四 进阶: 安全性与性能优化
      • 五 实践例子: 工厂温度监控系统
        • 5.1 场景描述
        • 5.2 Protobuf数据结构定义
        • 5.3 C++数据采集与发布
        • 5.4 Python数据接收与可视化
        • 5.5 关键实现细节
        • 5.6 部署与运行
        • 5.7 扩展场景

背景

C++与Python的混合编程能充分发挥两者的优势: C++处理高性能计算, Python快速实现业务逻辑. 而实现跨语言协作的核心在于通信协议数据结构兼容性. 本文将介绍基于Protobuf的序列化方案与ZMQ通信框架的整合应用, 实现C++程序和python程序协同运行.


一 应用场景与需求

  1. 高性能计算+业务逻辑分离

    • 场景: C++负责实时图像处理/数值计算, Python处理结果可视化或业务决策.
    • 需求: 低延迟, 高吞吐量的跨进程通信.
  2. 分布式系统协作

    • 场景: C++程序作为服务端运行, Python作为客户端动态调整参数; 或两者独立运行, 通过消息中间件交互.
    • 需求: 松耦合, 支持异步通信.
  3. 跨语言数据一致性

    • 核心问题: C++的struct与Python的dict无法直接兼容.
    • 解决方案: 使用Protobuf定义统一数据结构, 确保序列化一致性.

二 Protobuf: 跨语言数据交换的基石

  1. 定义数据结构
    创建.proto文件 (如message.proto) , 定义字段类型与结构:
   syntax = "proto3";
   message DataPacket {
     int32 id = 1;
     string content = 2;
     repeated float values = 3;
   }
  1. 生成语言绑定代码

    • C++: 通过protoc生成message.pb.cc message.pb.h, 集成至项目.

    • Python: 安装protobuf包后, 直接导入生成的message_pb2.py .

  2. 序列化与反序列化

   # Python端
   data = DataPacket(id=1, content="test", values=[1.0, 2.0])
   serialized_data = data.SerializeToString()
   // C++端
   DataPacket data;
   data.set_id(1); 
   std::string serialized_data = data.SerializeAsString();

三 通信方案 ZMQ (ZeroMQ) —— 高性能消息中间件

  • 优势: 支持多种通信模式 (PUB/SUB, REQ/REP, PUSH/PULL) , 无需依赖中心节点.
  • 实战步骤:
    1. C++发布端 (PUB):
     zmq::context_t context(1);
     zmq::socket_t publisher(context, ZMQ_PUB);
     publisher.bind("tcp://*:5555");
     DataPacket data;
     // ...填充数据...
     zmq::message_t msg(data.ByteSizeLong());
     memcpy(msg.data(), data.SerializeAsString().c_str(), data.ByteSizeLong());
     publisher.send(msg);
  1. Python订阅端 (SUB):
     import zmq
     context = zmq.Context()
     subscriber = context.socket(zmq.SUB)
     subscriber.connect("tcp://localhost:5555")
     subscriber.setsockopt(zmq.SUBSCRIBE, b'')
     while True:
         msg = subscriber.recv()
         data = DataPacket()
         data.ParseFromString(msg)
  • 注意点:
    • 使用send_multipart处理多帧消息 (如消息头+数据体) .
    • 避免阻塞: 设置ZMQ_NOBLOCK标志或使用异步IO.

四 进阶: 安全性与性能优化

  1. ZMQ安全机制

    • 使用Curve25519加密通信.
    • 设置IP白名单防止未授权访问.
  2. 性能优化

    • 批处理: 合并多个Protobuf消息一次性发送.
    • Zero-Copy: 在C++中通过zmq::message_t直接引用内存, 避免数据复制.

五 实践例子: 工厂温度监控系统

5.1 场景描述
  • C++程序: 运行在嵌入式设备上, 实时采集传感器温度数据 (每秒10次) , 进行滤波计算.
  • Python程序: 运行在控制中心, 实时显示温度曲线, 触发高温报警.
  • 通信需求: C++将结构化数据 (时间戳, 温度值, 设备ID) 发送给Python, 延迟需小于50ms.

5.2 Protobuf数据结构定义

创建 temperature.proto:

syntax = "proto3";

message TemperatureData {
  string device_id = 1;     // 设备编号
  double timestamp = 2;     // Unix时间戳 (毫秒) 
  float temperature_c = 3;  // 摄氏度
  uint32 status = 4;        // 状态码 (0=正常, 1=异常) 
}

生成代码:

# C++代码生成
protoc --cpp_out=. temperature.proto

# Python代码生成
protoc --python_out=. temperature.proto

5.3 C++数据采集与发布
#include <zmq.hpp>
#include "temperature.pb.h"
#include <chrono>

int main() {
    // ZMQ初始化
    zmq::context_t ctx(1);
    zmq::socket_t publisher(ctx, ZMQ_PUB);
    publisher.bind("tcp://*:5555");

    // 模拟传感器数据
    while (true) {
        TemperatureData data;
        data.set_device_id("sensor_001");
        data.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>(
                std::chrono::system_clock::now().time_since_epoch()
            ).count()
        );
        data.set_temperature_c(25.0 + (rand() % 100) * 0.1); // 模拟温度波动
        data.set_status(0);

        // Protobuf序列化
        std::string serialized_str;
        data.SerializeToString(&serialized_str);

        // ZMQ发送 (带多帧消息头) 
        zmq::message_t header("temperature", 11);  // 消息类型标识
        zmq::message_t payload(serialized_str.data(), serialized_str.size());
        publisher.send(header, ZMQ_SNDMORE);        // 发送多帧消息
        publisher.send(payload);
        
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    return 0;
}

5.4 Python数据接收与可视化
import zmq
import matplotlib.pyplot as plt
from temperature_pb2 import TemperatureData
from collections import deque

# 初始化ZMQ
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5555")
subscriber.setsockopt(zmq.SUBSCRIBE, b'temperature')  # 过滤指定消息头

# 数据缓存
max_len = 50
timestamps = deque(maxlen=max_len)
temps = deque(maxlen=max_len)

# 实时绘图
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([], [], 'r-')
ax.set_ylim(20, 35)

while True:
    # 接收两帧消息 (header + payload) 
    header = subscriber.recv()
    if header != b'temperature':
        continue
    
    msg = subscriber.recv()
    data = TemperatureData()
    data.ParseFromString(msg)
    
    # 更新数据
    timestamps.append(data.timestamp)
    temps.append(data.temperature_c)
    
    # 刷新图表
    line.set_xdata(range(len(timestamps)))
    line.set_ydata(temps)
    ax.relim()
    ax.autoscale_view()
    fig.canvas.draw()
    fig.canvas.flush_events()
    
    # 报警检测
    if data.temperature_c > 30:
        print(f"高温警报!设备 {data.device_id} 当前温度: {data.temperature_c}C")

5.5 关键实现细节
  1. ZMQ多帧消息

    • 使用ZMQ_SNDMORE标识多帧消息, 第一帧为消息头 (temperature) , 第二帧为实际数据.
    • Python端通过setsockopt(zmq.SUBSCRIBE, b'temperature')精准订阅, 避免接收无关数据.
  2. 时间戳同步

    • C++使用std::chrono获取精确到毫秒的Unix时间戳, 保证跨系统时间一致性.
  3. 可视化性能优化

    • 使用deque限制数据队列长度, 防止内存无限增长.
    • 通过plt.ion()启用交互模式, 避免反复创建绘图对象.

5.6 部署与运行
  1. 安装依赖

    # C++依赖
    sudo apt-get install libzmq3-dev protobuf-compiler
    
    # Python依赖
    pip install pyzmq protobuf matplotlib
    
  2. 编译C++程序

    g++ sensor_publisher.cpp temperature.pb.cc -o sensor \
      -lprotobuf -lzmq -std=c++11
    
  3. 启动程序

    # C++数据发布端
    ./sensor
    
    # Python可视化端
    python monitor.py
    

5.7 扩展场景
  • 多设备支持: 在.proto中增加repeated TemperatureData字段, 实现批量传输.
  • 双向通信: 改用ZMQ的REQ/REP模式, Python可向C++发送控制指令 (如调整采样频率) .
  • 数据持久化: 在Python端添加关联数据库存储, 用于历史数据分析.

相关文章:

  • 多行为推荐综述
  • 混境之地1
  • 批量删除 PDF 中的所有图片、所有二维码图片以及指定的某张图片
  • CCF CSP 第33次(2024.03)(2_相似度计算_C++)(字符串中字母大小写转换+哈希集合)
  • Mysql的单表查询和多表查询
  • Cookie、sessionStorage、localStorage
  • vue3(笔记)5.0--pinia工具的知识扩展
  • 系统工程-信息系统的分类
  • How to use pgbench to test performance for PostgreSQL?
  • 【C++】String类的模拟实现
  • [Qt5] QMetaObject::invokeMethod使用
  • Netty源码—7.ByteBuf原理三
  • 蓝桥云客-染色时间
  • 1424.对角线遍历
  • 322 零钱兑换
  • 【大模型基础_毛玉仁】4.4 低秩适配方法
  • RocketMQ 4.x、5.x 性能对比
  • 绩效考核如何从形式化任务升级为公司战略工具?
  • 2025.3.25
  • 基于CondLaneNet论文和全卷积分割头的车道线head设计
  • 云南多地突查公职人员违规饮酒:公安局门口开展酒精吹气测试
  • 碧桂园:砸锅卖铁保交房、持续推进保主体,尽快让公司恢复正常经营
  • 体坛联播|郑钦文收获红土赛季首胜,国际乒联公布财报
  • 铲屎官花5万带猫狗旅行,宠旅生意有多赚?
  • 马上评|颜宁“简历打假”的启示
  • 黄仁勋:中国AI市场将达500亿美元,美国企业若无法参与是巨大损失