当前位置: 首页 > news >正文

InfluxDB 与 MQTT 协议集成实践(二)

集成实现步骤

配置 MQTT 客户端

在 Python 中,我们使用paho - mqtt库来配置 MQTT 客户端。paho - mqtt库是一个 Python 的 MQTT 客户端库,它提供了简单易用的接口,方便我们与 MQTT Broker 进行通信。

首先,确保已经安装了paho - mqtt库。如果没有安装,可以使用以下命令进行安装 :

pip install paho - mqtt

下面是一个简单的 MQTT 客户端配置示例,展示了如何连接到 MQTT Broker、订阅主题以及接收消息 :

import paho.mqtt.client as mqtt

# 连接成功回调函数

def on_connect(client, userdata, flags, rc):

if rc == 0:

print("Connected to MQTT Broker!")

# 订阅主题

client.subscribe("sensor/data")

else:

print(f"Failed to connect, return code {rc}")

# 接收消息回调函数

def on_message(client, userdata, msg):

print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")

# 创建MQTT客户端实例

client = mqtt.Client()

# 设置连接成功回调函数

client.on_connect = on_connect

# 设置接收消息回调函数

client.on_message = on_message

# 设置MQTT Broker地址和端口

broker_address = "localhost"

port = 1883

# 连接到MQTT Broker

client.connect(broker_address, port, 60)

# 启动循环,保持连接并接收消息

client.loop_start()

try:

while True:

pass

except KeyboardInterrupt:

print("Exiting...")

# 停止循环

client.loop_stop()

# 断开连接

client.disconnect()

在上述代码中:

  • on_connect函数是连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会调用这个函数。在这个函数中,首先检查连接结果rc,如果rc为 0,表示连接成功,然后订阅了名为sensor/data的主题。
  • on_message函数是接收消息后的回调函数,当客户端接收到订阅主题的消息时,会调用这个函数。它打印出接收到消息的主题和内容。
  • 创建了一个mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。
  • 使用client.connect方法连接到指定地址和端口的 MQTT Broker。
  • 通过client.loop_start启动一个循环,用于保持与 MQTT Broker 的连接,并接收来自 Broker 的消息。最后,通过KeyboardInterrupt捕获用户的中断操作(如按下 Ctrl+C),停止循环并断开与 MQTT Broker 的连接。

数据写入 InfluxDB

接下来,我们需要将接收到的 MQTT 消息写入 InfluxDB。这里使用influxdb - client库来与 InfluxDB 进行交互。首先,安装influxdb - client库 :

pip install influxdb - client

假设已经在 MQTT 客户端的on_message回调函数中接收到了消息,下面展示如何将这些消息写入 InfluxDB :

from influxdb_client import InfluxDBClient, Point

from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB配置

token = "your_token"

org = "your_org"

bucket = "your_bucket"

url = "http://localhost:8086"

# 创建InfluxDB客户端实例

client = InfluxDBClient(url=url, token=token, org=org)

write_api = client.write_api(write_options=SYNCHRONOUS)

def on_message(client, userdata, msg):

# 解析MQTT消息内容

message = msg.payload.decode()

# 假设消息内容是JSON格式,解析JSON数据

try:

import json

data = json.loads(message)

# 构建InfluxDB的Point对象

point = Point("sensor_measurement")

for key, value in data.items():

point.field(key, value)

# 写入数据到InfluxDB

write_api.write(bucket, org, point)

print(f"Data written to InfluxDB: {data}")

except json.JSONDecodeError:

print("Received message is not in valid JSON format")

在上述代码中:

  • 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL。
  • 创建了一个InfluxDBClient实例,并通过write_api获取写入 API,设置写入选项为同步模式SYNCHRONOUS,这样可以确保数据写入操作是同步进行的,便于调试和保证数据写入的顺序性。
  • 在on_message函数中,首先将接收到的 MQTT 消息负载解码为字符串。然后假设消息内容是 JSON 格式,尝试解析 JSON 数据。如果解析成功,遍历解析后的数据,为每个键值对添加一个字段到Point对象中,Point对象的名称为sensor_measurement,表示测量值(类似于 InfluxDB 中的表名)。最后,使用write_api.write方法将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。

数据格式转换与处理

在将 MQTT 消息写入 InfluxDB 之前,通常需要进行数据格式转换和处理。MQTT 消息的格式多种多样,而 InfluxDB 要求数据以特定的 Line Protocol 格式写入。例如,如果 MQTT 消息是 JSON 格式,我们需要将其转换为 InfluxDB 的 Line Protocol 格式。

以下是一个将 JSON 格式的 MQTT 消息转换为 Line Protocol 格式的示例 :

import json

def json_to_line_protocol(json_data, measurement):

point = f"{measurement}"

tags = ""

fields = ""

timestamp = ""

for key, value in json_data.items():

if isinstance(value, (int, float)):

if fields:

fields += ","

fields += f"{key}={value}"

elif isinstance(value, str):

if tags:

tags += ","

tags += f"{key}=\"{value}\""

elif key == "time":

timestamp = f" {int(value)}" if isinstance(value, int) else f" {int(pd.Timestamp(value).timestamp() * 1000 * 1000 * 1000)}"

if tags:

point += f",{tags}"

if fields:

point += f" {fields}"

if timestamp:

point += timestamp

return point

# 示例JSON数据

json_data = {

"sensor_id": "sensor1",

"temperature": 25.5,

"humidity": 60,

"time": "2023-10-05T12:00:00Z"

}

measurement_name = "environment_data"

line_protocol = json_to_line_protocol(json_data, measurement_name)

print(line_protocol)

在上述代码中:

  • json_to_line_protocol函数接收 JSON 数据和测量值名称作为参数。
  • 遍历 JSON 数据中的每个键值对,根据值的类型进行处理。如果值是数字类型(int或float),将其作为字段添加到fields字符串中;如果值是字符串类型,将其作为标签添加到tags字符串中;如果键是time,则提取时间戳并进行格式化处理,存储到timestamp字符串中。
  • 最后,将测量值名称、标签、字段和时间戳按照 Line Protocol 的格式拼接成一个完整的字符串并返回。

此外,在实际应用中,还可能需要对数据进行清洗和预处理,例如去除异常值、填补缺失值等。以下是一个简单的数据清洗示例,假设我们要去除温度数据中的异常值(假设温度值应该在 - 20 到 50 之间) :

def clean_data(data):

if "temperature" in data:

temperature = data["temperature"]

if temperature < -20 or temperature > 50:

del data["temperature"]

return data

# 示例数据

raw_data = {

"sensor_id": "sensor1",

"temperature": 100,

"humidity": 60

}

cleaned_data = clean_data(raw_data)

print(cleaned_data)

在这个示例中,clean_data函数检查数据中的温度值,如果温度值超出了合理范围(-20 到 50),则删除该温度字段,从而实现对数据的清洗。通过这些数据格式转换和处理步骤,可以确保 MQTT 消息能够正确、有效地写入 InfluxDB 中。

案例实践

场景描述

为了更直观地展示 InfluxDB 与 MQTT 协议集成的实际应用效果,我们以智能工厂中的设备监控场景为例进行实践。在这个智能工厂中,分布着大量的生产设备,每台设备上都安装了多种传感器,如温度传感器、压力传感器、转速传感器等 。这些传感器以一定的时间间隔(例如每 5 秒)采集设备的运行数据,并通过 MQTT 协议将数据发送到 Mosquitto 消息代理。

Mosquitto 消息代理负责接收来自各个传感器的消息,并根据消息的主题进行分发。我们的 Python 应用程序作为 MQTT 客户端,订阅了与设备数据相关的主题(如 “factory/device1/data”“factory/device2/data” 等),当接收到消息时,会对消息进行处理和解析,然后将解析后的数据写入 InfluxDB 中。

InfluxDB 作为时间序列数据库,负责存储这些设备运行数据。我们可以通过 InfluxDB 的查询功能,对存储的数据进行分析和查询,例如查询某台设备在过去一小时内的平均温度、某时间段内设备的运行状态变化等。通过对这些数据的分析,工厂管理人员可以及时了解设备的运行状况,预测设备故障,优化生产流程,提高生产效率和产品质量。

代码实现细节

下面展示完整的 Python 代码,该代码实现了 MQTT 订阅、数据处理和 InfluxDB 写入的功能 :

import paho.mqtt.client as mqtt

from influxdb_client import InfluxDBClient, Point

from influxdb_client.client.write_api import SYNCHRONOUS

import json

# InfluxDB配置

token = "your_token"

org = "your_org"

bucket = "your_bucket"

url = "http://localhost:8086"

# 创建InfluxDB客户端实例

influx_client = InfluxDBClient(url=url, token=token, org=org)

write_api = influx_client.write_api(write_options=SYNCHRONOUS)

# MQTT连接成功回调函数

def on_connect(client, userdata, flags, rc):

if rc == 0:

print("Connected to MQTT Broker!")

# 订阅主题

client.subscribe("factory/#")

else:

print(f"Failed to connect, return code {rc}")

# MQTT接收消息回调函数

def on_message(client, userdata, msg):

try:

# 解析MQTT消息内容,假设消息是JSON格式

message = msg.payload.decode()

data = json.loads(message)

# 构建InfluxDB的Point对象

point = Point("device_measurement")

for key, value in data.items():

point.field(key, value)

# 获取设备ID作为标签

device_id = data.get("device_id")

if device_id:

point.tag("device_id", device_id)

# 写入数据到InfluxDB

write_api.write(bucket, org, point)

print(f"Data written to InfluxDB: {data}")

except json.JSONDecodeError:

print("Received message is not in valid JSON format")

# 创建MQTT客户端实例

mqtt_client = mqtt.Client()

# 设置连接成功回调函数

mqtt_client.on_connect = on_connect

# 设置接收消息回调函数

mqtt_client.on_message = on_message

# 设置MQTT Broker地址和端口

broker_address = "localhost"

port = 1883

# 设置MQTT用户名和密码(如果需要)

# mqtt_client.username_pw_set("your_username", "your_password")

# 连接到MQTT Broker

mqtt_client.connect(broker_address, port, 60)

# 启动循环,保持连接并接收消息

mqtt_client.loop_start()

try:

while True:

pass

except KeyboardInterrupt:

print("Exiting...")

# 停止循环

mqtt_client.loop_stop()

# 断开连接

mqtt_client.disconnect()

# 关闭InfluxDB客户端

influx_client.close()

在上述代码中:

  • 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL,并创建了InfluxDBClient实例和写入 API。
  • on_connect函数是 MQTT 连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会订阅 “factory/#” 主题,该主题可以接收以 “factory/” 开头的所有子主题消息,确保能够获取到工厂中所有设备的数据。
  • on_message函数是 MQTT 接收消息后的回调函数。在这个函数中,首先尝试解析接收到的 JSON 格式的消息。然后,根据消息内容构建 InfluxDB 的Point对象,将消息中的每个键值对作为字段添加到Point中。同时,从消息中获取设备 ID,并将其作为标签添加到Point中,以便在 InfluxDB 中可以根据设备 ID 对数据进行分类和查询。最后,使用 InfluxDB 的写入 API 将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。
  • 创建了mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。然后连接到指定地址和端口的 MQTT Broker,并启动循环以保持连接并接收消息。在程序结束时,通过捕获KeyboardInterrupt异常,停止 MQTT 循环,断开与 MQTT Broker 的连接,并关闭 InfluxDB 客户端。

结果展示与分析

为了展示 InfluxDB 中存储的数据,我们可以使用 Grafana 等数据可视化工具。首先,在 Grafana 中添加 InfluxDB 作为数据源,配置好 InfluxDB 的 URL、访问令牌、组织和存储桶等信息。然后,创建一个新的仪表板,添加图表组件,通过编写 InfluxQL 查询语句来获取我们需要展示的数据。

例如,我们想要展示设备 1 在过去 24 小时内的温度变化趋势,可以编写如下 InfluxQL 查询语句 :

SELECT mean("temperature") FROM "device_measurement" WHERE "device_id" = 'device1' AND time >= now() - 24h GROUP BY time(10m)

上述查询语句的含义是:从 “device_measurement” 测量值(类似于表)中选择 “temperature” 字段的平均值,条件是 “device_id” 标签为 “device1” 且时间在过去 24 小时内,最后按照每 10 分钟的时间间隔进行分组。

通过 Grafana 的图表展示,我们可以清晰地看到设备 1 的温度随时间的变化趋势,如下图所示:

[此处插入设备 1 温度变化趋势图]

从图表中可以分析出,设备 1 在某些时间段内温度较高,可能需要关注设备的散热情况;而在其他时间段内温度较为稳定。通过对这些数据的分析,我们可以及时发现设备运行过程中可能存在的问题,提前采取措施进行维护和优化,避免设备故障对生产造成影响。同时,也可以根据这些数据对设备的性能进行评估,为设备的升级和改造提供依据。此外,还可以进一步分析不同设备之间的数据差异,找出生产效率较高的设备,总结经验并推广到其他设备上,从而提高整个工厂的生产效率和产品质量。

http://www.dtcms.com/a/300471.html

相关文章:

  • Element表格单元格类名动态设置
  • Linux网络
  • libomxil-bellagio移植到OpenHarmony
  • Ubuntu简述及部署系统
  • MybatisPlus-19.插件功能-通用分页实体
  • JDK 11.0.16.1 Windows 安装教程 - 详细步骤+环境变量配置
  • Day44 Java数组08 冒泡排序
  • AI与区块链Web3技术融合:重塑数字经济的未来格局
  • SpringSecurity实战:核心配置技巧
  • 【前端】【vscode】【.vscode/settings.json】为单个项目配置自动格式化和开发环境
  • 【C++基础】类型转换:static_cast/dynamic_cast 面试高频考点与真题解析
  • Spring Retry 异常重试机制:从入门到生产实践
  • ESP32学习-FreeRTOS队列使用指南与实战
  • 【多模态】天池AFAC赛道四-智能体赋能的金融多模态报告自动化生成part2-报告输出
  • Java面试实战:企业级性能优化与JVM调优全解析
  • 小白成长之路-Ansible自动化(一)
  • 将远程 main 分支同步到 develop 分支的完整指南
  • 【硬件】嵌入式软件开发(2)
  • STM32-USART串口实现接收数据三种方法(1.根据\r\n标志符、2.空闲帧中断、3.根据定时器辅助接收)
  • Pytest 参数化进阶:掌握 parametrize 的多种用法
  • HCIP---MGRE实验
  • 嵌入式硬件篇---ESP32稳压板
  • OpenLayers 综合案例-轨迹回放
  • LeetCode|Day27|70. 爬楼梯|Python刷题笔记
  • catkin_make与catkin build的关系与区别(使用catkin build的好处)
  • MGRE实验
  • 深入解析 Vue 3 中 v-model 与表单元素的绑定机制
  • 多租户Kubernetes集群架构设计实践——隔离、安全与弹性扩缩容
  • Spring Boot自动配置原理深度解析
  • 昇思学习营-模型推理和性能优化