物联网中台搭建以及规则定义

物联网中台中 Node-RED 的标准化引擎作用与数据存储策略
一、物联网中台架构概览
在典型的物联网中台架构中,我们通常分为以下几层:
- 业务层:提供管理后台、监控大屏、移动APP、API对接等前端服务。
- 设备管理服务层:包括设备管理、告警规则、数据服务、业务处理等核心服务。
- 数据层:使用 MongoDB 存储业务数据,Redis 作为状态缓存。
- 物联网底层:核心是 EMQX(MQTT Broker)和 Node-RED 标准化引擎。
- 设备层:支持多种设备和协议接入,如 MQTT、HTTP、Modbus TCP 等。
本文将重点解析 Node-RED 在其中的作用,并说明不同类型数据的存储路径。
二、Node-RED:物联网数据流标准化引擎
1. Node-RED 是什么?
Node-RED 是一个基于流的可视化编程工具,最初由 IBM 开发,用于快速连接硬件设备、API和在线服务。在物联网中台中,它承担了协议转换、数据标准化、流程编排等重要职责。
2. Node-RED 在架构中的位置
位于“物联网底层”,紧邻 EMQX,负责接收来自设备层的原始数据,进行处理后转发至上层服务或存储。
3. Node-RED 数据处理流程
以下是 Node-RED 处理设备数据的典型流程:
设备数据 → EMQX(MQTT) → Node-RED →(原数据存储) 数据解析 → 标准化(业务规则) → 存储/转发(MQTT)
或者
设备数据→ Node-RED → (原数据存储) 数据解析 → 标准化 (业务规则)→ 存储/转发(MQTT)
步骤详解:
-
数据接入
- 设备通过 MQTT/HTTP/Modbus 等协议发布数据至 EMQX。
- Node-RED 通过 MQTT 节点订阅相关主题,接收原始数据。
-
数据解析与清洗
- 使用
function节点或自定义节点解析 payload(如 JSON、二进制数据)。 - 过滤无效数据、补全缺失字段、单位转换等。
- 使用
-
标准化处理
- 将不同设备、不同协议的数据转换为统一格式(如统一 JSON Schema)。
- 添加设备ID、时间戳、数据来源等元数据。
-
业务逻辑处理
- 判断数据是否触发告警(如阈值判断)。
- 调用外部服务(如设备管理服务)进行状态更新。
-
数据路由与存储
- 将处理后的数据分发至不同目的地:
- 业务数据 → 通过EMQX发送至“业务处理服务”,最终存入 MongoDB
- 告警数据 → 通过EMQX推送至“告警与规则服务”
- 状态数据 → 实时更新至 Redis 缓存
- 将处理后的数据分发至不同目的地:
-
设备反控
- 接收来自业务层的控制指令,通过 Node-RED 转换为设备能识别的协议格式,经 EMQX 下发至设备。
三、数据类型与存储策略
在物联网中台中,数据根据其用途和特性,会被存储到不同的数据库中:
| 数据类型 | 存储数据库 | 说明 |
|---|---|---|
| 业务数据(如设备上报的温度、湿度) | MongoDB | 适合非结构化、频繁写入和查询的场景 |
| 设备状态(在线/离线、最新数据) | Redis | 高速读写,适合缓存实时状态 |
| 告警记录与规则 | MongoDB / MySQL | 根据业务复杂度选择 |
举例说明:
-
设备上报一条温湿度数据:
- Node-RED 解析后,将其格式化为标准 JSON。
- 同时更新 Redis 中该设备的“最新状态”。
- 将完整数据发送至业务服务,同时存入 MongoDB。
-
业务层下发控制指令:
- 指令经由 Node-RED 转换为 MQTT 消息,通过 EMQX 下发至设备。
- 指令执行结果再次经 Node-RED 反馈至业务层。
好的,我们来详细制定物联网平台中关于EMQX消息、MongoDB和TDengine的设计规则。这将为系统开发提供清晰的规范。
物联网平台数据层设计规范:EMQX主题、MongoDB与TDengine库表设计
一、EMQX消息主题(Topic)设计规则
为了确保消息路由的清晰性和可扩展性,制定以下主题设计规范:
1. 主题结构标准
采用分层结构,清晰标识设备、数据类型和方向:
iot/{project}/{device_type}/{device_id}/{message_type}/{direction}
2. 各层级定义
| 层级 | 说明 | 示例 |
|---|---|---|
iot | 根目录,标识物联网项目 | iot |
{project} | 项目名称 | smart_farm, factory_energy |
{device_type} | 设备类型 | sensor, controller, gateway |
{device_id} | 设备唯一标识 | temp_sensor_001, pump_controller_002 |
{message_type} | 消息类型 | data, status, control, config |
{direction} | 数据方向 | up(设备上行), down(平台下行) |
3. 主题示例
设备上报数据:
iot/smart_farm/sensor/temp_sensor_001/data/up
平台控制设备:
iot/smart_farm/controller/pump_002/control/down
设备状态上报:
iot/smart_farm/gateway/gw_001/status/up
4. 通配符使用规则
+:单层通配符,用于订阅同一类型的所有设备#:多层通配符,用于订阅所有子主题
订阅示例:
# 订阅所有温度传感器数据
iot/smart_farm/sensor/+/data/up# 订阅整个项目所有上行消息
iot/smart_farm/+/+/+/up
二、MongoDB数据库设计规则
MongoDB用于存储非结构化业务数据,如设备信息、告警记录、配置信息等。
1. 数据库命名规范
iot_{project_name}
示例:iot_smart_farm, iot_factory_energy
2. 集合设计规范
2.1 设备信息集合:devices
{"_id": ObjectId("..."),"device_id": "temp_sensor_001", // 设备唯一标识"device_name": "温室温度传感器1号","device_type": "sensor","project": "smart_farm","specs": {"manufacturer": "某厂商","model": "TEMP-2023","communication": "MQTT"},"location": {"building": "1号温室","coordinate": {"x": 12.5, "y": 8.3}},"config": {"data_interval": 60, // 上报间隔(秒)"alarm_threshold": 35.0 // 告警阈值},"status": "online", // online, offline, fault"last_online": ISODate("2024-01-15T10:30:00Z"),"create_time": ISODate("2024-01-01T00:00:00Z"),"update_time": ISODate("2024-01-15T10:30:00Z")
}
2.2 告警记录集合:alarms
{"_id": ObjectId("..."),"alarm_id": "AL202401151030001","device_id": "temp_sensor_001","alarm_type": "over_temperature","alarm_level": "high", // low, medium, high, critical"description": "温度超过阈值35°C","trigger_value": 36.5,"trigger_time": ISODate("2024-01-15T10:30:00Z"),"recover_time": ISODate("2024-01-15T10:35:00Z"),"status": "recovered", // active, recovered, acknowledged"ack_user": "operator01","ack_time": ISODate("2024-01-15T10:31:00Z")
}
2.3 设备操作日志集合:operation_logs
{"_id": ObjectId("..."),"log_id": "LOG202401151030001","device_id": "pump_controller_002","operation_type": "control", // control, config, restart"operator": "admin01","operation": "start_pump","parameters": {"power": 75},"result": "success", // success, failed, timeout"operation_time": ISODate("2024-01-15T10:30:00Z"),"response_time": ISODate("2024-01-15T10:30:05Z")
}
3. 索引设计
// 设备集合索引
db.devices.createIndex({"device_id": 1}, {unique: true})
db.devices.createIndex({"project": 1, "device_type": 1})
db.devices.createIndex({"status": 1, "last_online": -1})// 告警集合索引
db.alarms.createIndex({"device_id": 1, "trigger_time": -1})
db.alarms.createIndex({"alarm_type": 1, "status": 1})
db.alarms.createIndex({"trigger_time": -1})// 操作日志索引
db.operation_logs.createIndex({"device_id": 1, "operation_time": -1})
db.operation_logs.createIndex({"operator": 1, "operation_time": -1})
三、TDengine数据库设计规则
TDengine用于存储时序数据,如传感器数据、设备状态变化等。
1. 数据库创建
CREATE DATABASE iot_data KEEP 365 DAYS 10 BLOCKS 6;
USE iot_data;
2. 超级表设计规范
2.1 传感器数据超级表:sensor_data
CREATE STABLE sensor_data (ts TIMESTAMP,temperature FLOAT,humidity FLOAT,pressure FLOAT,pm25 FLOAT,voltage FLOAT,current FLOAT,signal_strength INT
) TAGS (device_id NCHAR(64),device_type NCHAR(32),project NCHAR(32),location NCHAR(128)
);
2.2 设备状态超级表:device_status
CREATE STABLE device_status (ts TIMESTAMP,status NCHAR(16),online_duration BIGINT,cpu_usage FLOAT,memory_usage FLOAT,disk_usage FLOAT,network_rtt FLOAT
) TAGS (device_id NCHAR(64),device_type NCHAR(32),project NCHAR(32),version NCHAR(32)
);
3. 子表创建规则
子表在数据首次写入时自动创建,命名规则为:
{device_id}_{data_type}
示例:
-- 温度传感器数据子表(自动创建)
INSERT INTO temp_sensor_001_sensor USING sensor_data TAGS
('temp_sensor_001', 'sensor', 'smart_farm', '1号温室')
VALUES (NOW, 25.3, 65.2, NULL, NULL, 3.7, NULL, -45);-- 网关状态子表
INSERT INTO gw_001_status USING device_status TAGS
('gw_001', 'gateway', 'smart_farm', 'v2.1.5')
VALUES (NOW, 'online', 86400, 15.3, 42.1, 28.7, 12.5);
4. 数据保留与压缩策略
-- 设置数据保留策略
ALTER DATABASE iot_data KEEP 365;-- 创建数据自动压缩
ALTER DATABASE iot_data COMP 2;
5. 查询优化索引
-- 为常用查询字段创建索引
CREATE INDEX idx_device_time ON sensor_data (device_id, ts);
CREATE INDEX idx_project_device ON sensor_data (project, device_type);
五、总结
Node-RED 在物联网中台中扮演了“数据流中枢”的角色,它通过可视化流程将设备数据标准化、路由、存储,大大提升了系统的灵活性和可维护性。结合 EMQX 实现双向通信,配合 MongoDB 和 Redis 实现多类型数据的高效存储,构建了一个稳定、可扩展的物联网平台基础。
通过以上设计规范,我们实现了:
- EMQX主题标准化 - 清晰的消息路由和权限控制
- MongoDB业务数据存储 - 灵活的文档存储,适合设备管理、告警、日志等
- TDengine时序数据存储 - 高性能的传感器数据存储和查询
- Node-RED路由转发 - 根据数据类型自动选择存储路径
这套规范确保了物联网平台的数据一致性、查询效率和系统可扩展性。
