在 UNS 中如何使用 Avro + Schema Registry 管理 MQTT 数据模型
一、方案背景
在工业 4.0 和数字化工厂体系中,UNS(Unified Namespace) 是实现跨层级系统语义统一与数据实时共享的核心架构。UNS 是企业数据的“单一事实源”,其语义组织通常遵循 ISA-95 结构,用于统一表达企业、工厂、产线、设备等多层级信息。
然而,随着数据量与接口数量急剧增长,MQTT Topic 与 Payload 模型需要满足以下要求:
可扩展、版本可管理;
高效、紧凑的传输;
支持结构演化与验证;
支持跨系统一致解析。
为此,引入 Apache Avro 和 Schema Registry,构建基于 UNS 的“数据模型管理层”,实现 MQTT 数据的强结构化管理与语义自描述传输。
Apache Avro —— 现代工业数据架构(包括 UNS 和 IIoT 系统)中常用的数据序列化与 Schema 管理技术之一。
特点:
行式数据序列化格式(类似 JSON、Protobuf、Parquet 的兄弟);
属于 Apache Hadoop 生态;
主要特点是:
✅ 自描述(Self-describing)
✅ 二进制高效
✅ 支持 Schema 演化(Schema Evolution)
它通常用于:
分布式系统数据传输(Kafka、MQTT、REST);
数据湖 / 数据仓库的持久化;
事件驱动架构中的 Schema 统一。
二、总体架构设计
2.1 架构逻辑分层
| 层级 | 功能描述 | 技术选型 |
|---|---|---|
| 设备接入层 | 采集实时数据(PLC、传感器、机器人等) | OPC UA、Modbus、MQTT Edge |
| 网关层(Edge Gateway) | 数据预处理与 Avro 编码 | Node-RED / nifi + Avro SDK |
| 消息总线层 | MQTT 作为 UNS 数据分发核心 | EMQX / HiveMQ |
| Schema 管理层 | 统一管理 Avro Schema 版本与兼容性 | Confluent Schema Registry |
| 语义命名层(UNS) | 按 ISA-95 层级组织 Topic 语义 | Enterprise/Site/Area/Line/... |
| 消费层 | MES、ERP、BI、AI 等系统订阅数据 | Kafka Connect、Python SDK、Spark |
2.2 数据流逻辑
[设备/PLC]│ 数据采集(原始)▼
[Edge Gateway]├─ 转换为标准 JSON├─ 使用 Avro Schema 编码(二进制)├─ 附带 Schema ID(来自 Registry)▼
[MQTT Broker / UNS]├─ 按命名空间(Topic)发布▼
[消费者]├─ 通过 Schema ID 向 Registry 查询结构├─ 自动反序列化为 JSON 或对象└─ 持久化 / 分析 / 可视化
三、Avro Schema 与 UNS Topic 的对应设计
3.1 命名空间(Topic)设计原则
遵循 ISA-95 的层级语义结构:
Enterprise/Site/Area/Line/Machine/Function/Parameter
例如:
factoryA/plant1/assembly/line03/press01/status
factoryA/plant1/assembly/line03/press01/temperature
每个功能节点对应一个 Avro Schema,其定义存放在 Schema Registry 中。
3.2 示例 Schema 定义
{"type": "record","name": "PressStatus","namespace": "factoryA.plant1.line03.press01","fields": [{ "name": "timestamp", "type": "long" },{ "name": "machine_id", "type": "string" },{ "name": "status", "type": ["null", "string"], "default": null },{ "name": "temperature", "type": ["null", "float"], "default": null },{ "name": "pressure", "type": ["null", "float"], "default": null }]
}
注册到 Schema Registry 后返回:
Schema ID: 0127
Edge 端发布数据时:
Avro 二进制编码;
MQTT Payload =
{SchemaID + AvroData};Topic =
factoryA/plant1/assembly/line03/press01/status
四、Schema Registry 管理机制
4.1 核心功能
Schema 版本控制:每次更新 Schema 会生成新版本;
兼容性策略:
Backward Compatible(向后兼容);
Forward Compatible(向前兼容);
Full Compatible(双向兼容);
API 管理:
POST /subjects/{topic}/versions→ 注册;GET /schemas/ids/{id}→ 查询;GET /subjects/{topic}/versions/latest→ 获取最新版本。
4.2 与 MQTT 结合策略
虽然 MQTT 不直接支持 Schema ID,但可通过:
将 Schema ID 放入 payload 前缀;
或在 Topic metadata 中嵌入:
factoryA/plant1/line03/press01/status/schema:0127
消费者在订阅时自动解析 Schema ID 并从 Registry 获取结构定义。
五、部署与实现步骤
| 步骤 | 内容 | 工具/实现 |
|---|---|---|
| 1 | 建立 UNS 语义命名体系 | 参考 ISA-95 层级模型 |
| 2 | 设计数据点模型(Avro Schema) | JSON Schema 文件 |
| 3 | 部署 Schema Registry 服务 | Confluent / Redpanda |
| 4 | 部署 MQTT Broker | EMQX / HiveMQ |
| 5 | 开发 Edge 发布端 | Python Avro SDK / Node-RED |
| 6 | 消费端实现解码与订阅 | Kafka Connect / Python Consumer |
| 7 | 监控与版本审计 | Schema Registry UI / Grafana |
六、优势分析
| 维度 | Avro + Schema Registry 优势 |
|---|---|
| 可扩展性 | Schema 演化机制支持设备模型迭代 |
| 性能 | 二进制序列化高压缩比 |
| 一致性 | Schema Registry 保证 Topic 与结构一致 |
| 标准化 | 兼容 MQTT、Kafka、Sparkplug B |
| 可追溯性 | 每个 Schema 有版本号和变更记录 |
| 安全性 | 支持 Schema 审批与签名验证 |
七、典型应用场景
多产线设备数据统一建模
各生产线使用不同品牌 PLC;
通过 Avro + UNS 统一结构;
MES / BI 可跨线对比。
边缘智能监控
边缘节点根据 Schema 动态生成解析逻辑;
降低运维部署复杂度。
AI 模型输入标准化
Avro Schema 定义数据输入字段;
保证数据集训练一致性。
八、总结与展望
通过在 UNS 架构中引入 Avro + Schema Registry:
MQTT Topic 拥有强语义与强结构;
数据格式可版本化、可演化;
消费者解耦于设备厂商与协议;
企业数据成为真正的“实时单一事实源”。
