InfluxDB 3 Core + Java 11 + Spring Boot:打造高效物联网数据平台
一、 引言:为什么选择InfluxDB 3?
- 项目背景:
- 在我们的隧道风机监控系统中,实时数据的采集、存储和高效查询是至关重要的核心需求。风机运行产生的振动、倾角、电流、温度等参数是典型的时序数据,具有高并发写入、数据量持续增长以及对近期数据查询性能要求高的特点。传统关系型数据库在应对这类场景时,往往面临性能瓶颈和复杂的查询优化问题。
- 时序数据库专门设计用于处理时间序列数据,这类数据通常以极高的频率生成,例如传感器数据、日志数据或金融交易数据。
- InfluxDB 3的核心特性吸引点:
InfluxDB 3 Core 在架构和功能上进行了多项改进,与 InfluxDB 1.x/2.x 相比,具有更高的性能、更好的扩展性和更广泛的数据处理能力。旧版本在处理大规模数据时可能会遇到性能瓶颈,而 3 Core 通过优化存储引擎和查询性能,显著提升了处理能力。此外,3 Core 对 SQL 的支持和 Arrow Flight 的集成,使得它在数据查询和系统集成方面更具优势。
InfluxDB 3的数据库与MySQL数据库的对比
是的,在概念层面,InfluxDB 3中的“数据库”与MySQL中的“数据库”是类似的。它们都是一个逻辑上的容器,用来组织和隔离数据。
相似之处:
- 容器:两者都充当最高级别的数据组织单元。在一个MySQL服务器上可以有多个数据库,同样,在一个InfluxDB 3实例中也可以创建多个数据库。
- 隔离:不同的数据库通常用于存放不同项目、应用或数据集的数据,实现数据隔离。
- 包含表(Tables):MySQL数据库包含多个表(Table),InfluxDB 3的数据库也包含多个表(Table)。
不同之处(关键区别):
- 数据模型和用途:
- MySQL:是关系型数据库(RDBMS),数据以结构化的行和列存储在表中,非常适合事务处理、复杂关系查询。
- InfluxDB 3:是时间序列数据库(TSDB),专门为处理带有时间戳的数据(如监控指标、传感器数据、事件日志)而设计和优化。它的核心是时间。
- 表(Table)的概念:
- MySQL:表有预定义的列和数据类型。
- InfluxDB 3:文档中提到 “A
table
is equivalent to ameasurement
”。measurement
是InfluxDB早期版本的概念。在InfluxDB 3中,一个表通常对应一种类型的时间序列数据(例如,cpu
表存放CPU指标,temperature
表存放温度读数)。
- 核心组件:
- InfluxDB 3的表:包含
tags
(标签,用于索引和快速过滤的元数据,通常是字符串键值对)、fields
(字段,实际的度量值,可以是整数、浮点数、布尔值、字符串) 和一个特殊的time
列 (纳秒级精度的时间戳)。 - MySQL的表:由列组成,每列有其数据类型,通过主键、外键等维护关系。
- InfluxDB 3的表:包含
- Schema(模式):
- InfluxDB 3:是 “schema-on-write”(写入时定义模式)。当你第一次写入数据到一个新表时,InfluxDB会根据写入的数据动态创建表的模式(特别是
tag
列的集合和顺序,这些一旦创建就不可更改)。之后可以动态添加新的field
列,但不能再为该表添加新的tag
列。 - MySQL:通常是 “schema-on-read” 或更准确地说是预定义模式,你需要在写入数据前明确定义表的结构(列名、数据类型、约束等)。
- InfluxDB 3:是 “schema-on-write”(写入时定义模式)。当你第一次写入数据到一个新表时,InfluxDB会根据写入的数据动态创建表的模式(特别是
总结:你可以把InfluxDB 3的数据库理解为一个专门存放时间序列数据的大容器。在这个容器里,你会按数据类型(比如CPU使用率、温度、订单量)创建不同的“表”(在InfluxDB的语境下也叫measurement)。
二、 环境准备与InfluxDB 3 Core的安装与启动 (Windows 11)
1.下载与解压:
- 在[InfluxDB 3 Core官方文档(https://docs.influxdb.org.cn/influxdb3/core/)中获取InfluxDB 3 Core 压缩包,并解压到目录。
- 我的解压目录位于
G:\浏览器下载\influxdb3-core-3.0.1-windows_amd64\
- 启动InfluxDB 3 Core服务:
使用 influxdb3 serve 命令来启动服务器,并可以通过参数指定对象存储类型和数据目录。
我的启动命令如下:
.\influxdb3.exe serve --object-store file --node-id mywindowsnode --data-dir "G:\浏览器下载\influxdb3-core-3.0.1-windows_amd64\data"
参数解释:
- object-store file
: 指定使用本地文件系统作为对象存储。这是默认选项,但明确指出总是个好习惯。
- node-id <你的节点ID>
: 为你的InfluxDB节点指定一个唯一的ID,例如 mynode 或 win11node。
- data-dir "<你的数据目录>"
: 指定数据存储的目录。
- 创建管理员令牌 (Admin Token):
在你启动InfluxDB 3服务器之后,你就可以创建管理员令牌了。管理员令牌拥有对InfluxDB 3实例所有操作的最高权限。
2. 如何创建管理员令牌 (Admin Token)
文档中提到,在你启动InfluxDB 3服务器之后,你就可以创建管理员令牌了。管理员令牌拥有对InfluxDB 3实例所有操作的最高权限。
步骤如下:
-
确保InfluxDB 3服务器正在运行:
你在上一步已经启动了服务器,它应该在第一个命令提示符/PowerShell窗口中运行。 -
打开一个新的命令提示符 (CMD) 或 PowerShell 窗口:
- 不要关闭正在运行服务器的那个窗口。你需要一个新的窗口来执行
influxdb3
的客户端命令。
- 不要关闭正在运行服务器的那个窗口。你需要一个新的窗口来执行
-
导航到InfluxDB 3的目录(如果
influxdb3.exe
不在系统PATH中):
在新打开的窗口中,输入:cd G:\浏览器下载\influxdb3-core-3.0.1-windows_amd64
-
执行创建管理员令牌的命令:
根据文档,使用influxdb3 create token --admin
子命令。你可能还需要指定InfluxDB服务器的地址(如果不是默认的http://localhost:8181
)。.\influxdb3.exe create token --admin --host http://localhost:8181
或者,如果
influxdb3.exe
已经在你的系统路径 (PATH) 中,或者你就在其目录下,可以简化为:influxdb3 create token --admin --host http://localhost:8181
--admin
:表示你正在创建一个管理员级别的令牌。--host http://localhost:8181
:指定了InfluxDB服务器正在监听的地址和端口。如果你的服务器运行在其他地址或端口,请相应修改。
-
保存令牌:
执行命令后,它会在窗口中直接输出一个很长的字符串,这就是你的管理员令牌。
非常重要:文档中强调 “InfluxDB lets you view the token string only when you create the token. Store your token in a secure location, as you cannot retrieve it from the database later.”
这意味着:- 立即复制这个令牌字符串。
- 将它保存在一个安全的地方 (比如密码管理器或一个受保护的文本文件中)。
- 一旦你关闭了这个窗口或者执行了其他命令,你就无法再次从InfluxDB中找回这个确切的令牌字符串了。InfluxDB内部只存储令牌的哈希值。
-
(可选但推荐)设置环境变量:
为了方便以后使用influxdb3
CLI而不需要每次都输入--token
参数,你可以将这个令牌设置到一个环境变量中。文档推荐的环境变量名是INFLUXDB3_AUTH_TOKEN
。
在新的命令提示符窗口中设置(仅对当前窗口有效):set INFLUXDB3_AUTH_TOKEN=你的令牌字符串粘贴在这里
或者在PowerShell中(仅对当前窗口有效):
$env:INFLUXDB3_AUTH_TOKEN="你的令牌字符串粘贴在这里"
如果你想让这个环境变量在系统重启后依然有效,你需要通过系统属性来设置它(搜索“编辑系统环境变量”)。
现在你就有了一个管理员令牌,可以用它来进行后续的数据库创建、数据写入和查询等操作了。
三、 Java项目集成:Spring Boot (RuoYi) 与 InfluxDB 3的连接
- 选择合适的Java客户端库:
对于InfluxDB 3.x,官方推荐使用influxdb3-java
这个新的、轻量级的、社区维护的客户端库。 - Maven依赖配置:
在pom.xml
中添加influxdb3-java
(最新稳定版) 的依赖。
<dependency><groupId>com.influxdb</groupId><artifactId>influxdb3-java</artifactId><version>1.0.0</version> <!-- 请使用最新的稳定版本 --></dependency>
- 关键JVM参数配置:
由于 influxdb3-java 底层使用了Apache Arrow Flight,它可能需要访问一些通常被模块系统封装的JDK内部API。你需要在启动你的Spring Boot应用程序时,添加JVM参数。
在 IntelliJ IDEA 中添加 JVM 参数:
--add-opens=java.base/java.nio=ALL-UNNAMED
在命令行中运行 JAR 包时:
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar your-application.jar
- Spring Boot配置文件 (
application.yml
或.properties
):- 配置InfluxDB 3的连接信息 (
host
,token
,database
)。
- 配置InfluxDB 3的连接信息 (
influxdb:client3:host: http://localhost:8181token: 123456database: tunnel_fengji # 你的InfluxDB 3数据库名
- 创建InfluxDB服务层 (
IInfluxDBService
接口和InfluxDBService
实现类):InfluxDBService
实现:@PostConstruct
初始化InfluxDBClient.getInstance()
。@PreDestroy
关闭客户端client.close()
。- 数据库自动创建机制:解释InfluxDB 3在首次写入时会自动创建数据库和表。
package com.ruoyi.oil.service.impl;import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;
import com.influxdb.v3.client.query.QueryOptions;
import com.ruoyi.oil.service.IInfluxDBService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import com.fasterxml.jackson.databind.ObjectMapper; // 用于JSON解析
import com.ruoyi.oil.domain.DeviceDataMessage; // 导入你创建的POJO@Service
public class InfluxDBService implements IInfluxDBService {private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);private final ObjectMapper objectMapper = new ObjectMapper(); // 用于将JSON字符串转换为对象@Value("${influxdb.client3.host}")private String host;@Value("${influxdb.client3.token}")private String token;@Value("${influxdb.client3.database}")private String database;private InfluxDBClient client;@PostConstructpublic void init() {logger.info("Initializing InfluxDB 3 native client for host: {}, database: {}", host, database);try {this.client = InfluxDBClient.getInstance(host, token.toCharArray(), database);logger.info("InfluxDB 3 native client initialized successfully.");} catch (Exception e) {logger.error("Failed to initialize InfluxDB 3 native client", e);}}@PreDestroypublic void close() {if (this.client != null) {try {this.client.close();logger.info("InfluxDB 3 native client closed.");} catch (Exception e) {logger.error("Error closing InfluxDB 3 native client", e);}}}/*** 处理并写入从硬件设备接收到的JSON消息。** @param jsonMessage 收到的JSON字符串* @return 如果处理和写入至少一个点成功则返回true,否则false*/public boolean processAndWriteDeviceData(String jsonMessage) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot process message.");return false;}try {DeviceDataMessage message = objectMapper.readValue(jsonMessage, DeviceDataMessage.class);logger.info("Parsed device data message: {}", message.getDeviceName());if (message.getItems() == null) {logger.warn("No items found in the message for device: {}", message.getDeviceName());return false;}List<Point> pointsToWrite = new ArrayList<>();String measurement = "fan_data"; // 你可以根据 deviceType 或其他逻辑动态设置// 通用标签,适用于该消息中的所有数据点Map<String, String> commonTags = new HashMap<>();commonTags.put("iotId", message.getIotId());commonTags.put("productKey", message.getProductKey());commonTags.put("deviceName", message.getDeviceName());if (message.getDeviceType() != null) {commonTags.put("deviceType", message.getDeviceType());}DeviceDataMessage.Items items = message.getItems();// 处理每个itemif (items.getLightCurrent() != null && items.getLightCurrent().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"LightCurrent", items.getLightCurrent()));}if (items.getAx() != null && items.getAx().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"ax", items.getAx()));}if (items.getRoll() != null && items.getRoll().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"roll", items.getRoll()));}if (items.getAy() != null && items.getAy().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"ay", items.getAy()));}if (items.getTemperature() != null && items.getTemperature().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"temperature", items.getTemperature()));}if (items.getAz() != null && items.getAz().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"az", items.getAz()));}if (items.getPitch() != null && items.getPitch().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"pitch", items.getPitch()));}if (items.getYaw() != null && items.getYaw().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"yaw", items.getYaw()));}if (pointsToWrite.isEmpty()) {logger.warn("No valid data points to write from message for device: {}", message.getDeviceName());return false;}return writePoints(pointsToWrite);} catch (Exception e) {logger.error("Error processing and writing device data: {}", e.getMessage(), e);return false;}}/*** 辅助方法,从ItemData创建InfluxDB Point。*/private Point createPointFromItemData(String measurement, Map<String, String> commonTags,String fieldName, DeviceDataMessage.ItemData itemData) {Point point = Point.measurement(measurement).setTimestamp(Instant.ofEpochMilli(itemData.getTime())); // 从毫秒时间戳创建InstantcommonTags.forEach(point::setTag);point.setField(fieldName, itemData.getValue()); // ItemData中的value是Doublereturn point;}@Overridepublic boolean writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timestamp) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write point.");return false;}try {Point point = Point.measurement(measurement);if (timestamp != null) {point.setTimestamp(timestamp);} else {point.setTimestamp(Instant.now());}if (tags != null) {tags.forEach(point::setTag);}if (fields != null) {fields.forEach((key, value) -> {if (value instanceof Long) point.setField(key, (Long) value);else if (value instanceof Double) point.setField(key, (Double) value);else if (value instanceof Boolean) point.setField(key, (Boolean) value);else if (value instanceof String) point.setField(key, (String) value);else if (value instanceof Integer) point.setField(key, ((Integer)value).longValue());else if (value instanceof Float) point.setField(key, ((Float)value).doubleValue());else {logger.warn("Unsupported field type for key '{}': {}. Converting to string.", key, value.getClass().getName());point.setField(key, value.toString());}});}client.writePoint(point); // 默认写入到客户端初始化时指定的databaselogger.debug("Successfully wrote point using influxdb3-java: {}", point.toLineProtocol());return true;} catch (Exception e) {logger.error("Error writing point with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic boolean writePoints(List<Point> points) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write points.");return false;}if (points == null || points.isEmpty()) {logger.warn("Point list is empty or null. Nothing to write.");return true;}try {client.writePoints(points); // 默认写入到客户端初始化时指定的databaselogger.debug("Successfully wrote {} points using influxdb3-java.", points.size());return true;} catch (Exception e) {logger.error("Error writing points with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic boolean writeRecord(String lineProtocol) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write record.");return false;}try {client.writeRecord(lineProtocol); // 默认写入到客户端初始化时指定的databaselogger.debug("Successfully wrote line protocol record using influxdb3-java.");return true;} catch (Exception e) {logger.error("Error writing line protocol record with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic Stream<Object[]> queryRaw(String sqlQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing SQL query (raw Object[]): {}", sqlQuery);try {return client.query(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<Object[]> queryRaw(String sqlQuery, Map<String, Object> params) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing parametrized SQL query (raw Object[]): {} with params: {}", sqlQuery, params);try {return client.query(sqlQuery, params);} catch (Exception e) {logger.error("Error executing parametrized SQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<Object[]> queryRawWithInfluxQL(String influxQLQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing InfluxQL query (raw Object[]): {}", influxQLQuery);try {return client.query(influxQLQuery, QueryOptions.INFLUX_QL);} catch (Exception e) {logger.error("Error executing InfluxQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<PointValues> queryPoints(String sqlQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query points.");return Stream.empty();}logger.debug("Executing SQL query for PointValues: {}", sqlQuery);try {return client.queryPoints(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query for PointValues: {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<PointValues> queryPoints(String sqlQuery, Map<String, Object> params) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query points with params.");return Stream.empty();}logger.warn("Executing parametrized SQL query for PointValues. " +"The influxdb3-java client.queryPoints API in README (1.0.0) " +"does not show direct Map-based parameterization for PointValues stream. " +"This method might require constructing SQL with parameters manually if not supported by API. " +"Falling back to non-parametrized queryPoints for now if params are present and non-empty, or use queryRaw.");// 根据 README, client.query(sql, params) 返回 Stream<Object[]>// client.queryPoints(sql) 返回 Stream<PointValues> 但没有 params map// 如果确实需要参数化并得到 PointValues,需要检查库是否有其他方法,或手动处理if (params != null && !params.isEmpty()) {logger.error("Parameter-map based queryPoints is not directly supported by this example based on README. " +"Use queryRaw(sql, params) and process Object[] or construct SQL string with parameters manually for queryPoints.");// 或者可以尝试动态构建SQL字符串,但要注意SQL注入风险// String finalSql = replaceQueryParameters(sqlQuery, params); // 你需要实现这个方法// return client.queryPoints(finalSql);return Stream.empty(); // 或者抛出 UnsupportedOperationException}try {return client.queryPoints(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query for PointValues (fallback non-parametrized): {}", e.getMessage(), e);return Stream.empty();}}// 示例方法 (与之前一致,只是调用的方法现在是基于influxdb3-java的)public void writeFanMetricsExample() {String measurement = "fan_sensor_data";Map<String, String> tags = new HashMap<>();tags.put("tunnel_id", "T002");tags.put("fan_id", "Fan_D01");Map<String, Object> fields = new HashMap<>();fields.put("vibration_x", 0.33);fields.put("temperature_celsius", 31.5);fields.put("active_power", 2.5);writePoint(measurement, tags, fields, Instant.now());}public void queryFanMetricsExample() {String sql = "SELECT time, tunnel_id, fan_id, vibration_x, temperature_celsius, active_power " +"FROM fan_sensor_data WHERE fan_id = 'Fan_D01' AND time >= now() - interval '1 hour' " +"ORDER BY time DESC LIMIT 3";logger.info("Querying with SQL for PointValues stream (Recommended for typed access):");try (Stream<PointValues> stream = queryPoints(sql)) { // 使用实现了的 queryPointsstream.forEach((PointValues p) -> {// 根据你的SELECT语句,你知道这些字段和标签是存在的System.out.printf("| Time: %-30s | Tunnel: %-8s | Fan: %-8s | VibX: %-8.3f | Temp: %-8.2f | Power: %-8.2f |%n",p.getTimestamp(), // 主时间戳p.getTag("tunnel_id"),p.getTag("fan_id"),p.getField("vibration_x", Double.class),p.getField("temperature_celsius", Double.class),p.getField("active_power", Double.class));});} catch (Exception e) {logger.error("Error in queryFanMetricsExample (queryPoints): ", e);}System.out.println("----------------------------------------------------------------------------------------------------------");logger.info("Querying with SQL for raw Object[] stream (manual handling based on SELECT order):");// 列顺序: time, tunnel_id, fan_id, vibration_x, temperature_celsius, active_powertry (Stream<Object[]> stream = queryRaw(sql)) { // 使用实现了的 queryRawstream.forEach(row -> {if (row != null && row.length == 6) {System.out.printf("| Time: %-30s | Tunnel: %-8s | Fan: %-8s | VibX: %-8s | Temp: %-8s | Power: %-8s |%n",row[0], row[1], row[2], row[3], row[4], row[5]);} else {logger.warn("Unexpected row format in raw query: {}", (Object)row);}});} catch (Exception e) {logger.error("Error in queryFanMetricsExample (queryRaw): ", e);}System.out.println("----------------------------------------------------------------------------------------------------------");}
}
IInfluxDBService实现:
package com.ruoyi.oil.service;import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;public interface IInfluxDBService {boolean writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timestamp);boolean writePoints(List<Point> points);boolean writeRecord(String lineProtocol);/*** 执行SQL查询并返回原始的Object数组流。* 调用者负责关闭Stream,并根据SQL查询的SELECT子句解析Object[]中的数据。** @param sqlQuery SQL查询语句* @return 代表结果行的Object数组流*/Stream<Object[]> queryRaw(String sqlQuery);/*** 执行参数化的SQL查询并返回原始的Object数组流。* 调用者负责关闭Stream,并根据SQL查询的SELECT子句解析Object[]中的数据。** @param sqlQuery 参数化的SQL查询语句 (例如 "SELECT * FROM table WHERE tag1 = $param1")* @param params 参数名和值的Map* @return 代表结果行的Object数组流*/Stream<Object[]> queryRaw(String sqlQuery, Map<String, Object> params);/*** 执行InfluxQL查询并返回原始的Object数组流。* 调用者负责关闭Stream,并根据InfluxQL查询的SELECT子句解析Object[]中的数据。** @param influxQLQuery InfluxQL查询语句* @return 代表结果行的Object数组流*/Stream<Object[]> queryRawWithInfluxQL(String influxQLQuery);boolean processAndWriteDeviceData(String jsonMessage);/*** 执行SQL查询并返回 PointValues 流,方便按类型获取字段和标签。* 调用者负责关闭Stream。** @param sqlQuery SQL查询语句* @return PointValues对象的流*/Stream<PointValues> queryPoints(String sqlQuery);/*** 执行参数化的SQL查询并返回 PointValues 流。* 注意:influxdb3-java 1.0.0 的 queryPoints API 可能不直接支持 Map 形式的参数化。* 此方法目前可能回退到非参数化版本或需要调用者自行构造含参数的SQL。** @param sqlQuery 参数化的SQL查询语句* @param params 参数名和值的Map (其在此方法中的支持取决于客户端库的实际能力)* @return PointValues对象的流*/Stream<PointValues> queryPoints(String sqlQuery, Map<String, Object> params);
}
四、 核心操作:数据写入与查询
数据模型设计 (针对隧道风机监控)
在InfluxDB 3中,数据组织的核心概念包括数据库(Database)、表(Table,在InfluxDB语境下也常称为Measurement)、标签(Tag)和字段(Field)。时间戳(Time)是每条记录固有的组成部分。
对于我们的隧道风机监控系统,我们设计了如下的数据模型:
-
数据库 (Database):我们创建了一个名为
tunnel_fan_monitoring
(或根据实际项目命名)的数据库,作为所有风机监控数据的统一存储容器。 -
表/Measurement (Table / Measurement):
- 考虑到风机产生的各类传感器数据(振动、倾角、电流、温度等)通常是描述同一设备在相近时间点的状态,并且我们可能需要将这些数据一起分析,我们决定采用一个统一的表来存储这些指标。
- 表名:
device_metrics
- 这个表将包含所有风机的各类传感器读数。如果未来有特定类型的传感器数据量极大或查询模式非常独立,也可以考虑拆分为更细粒度的表。
-
标签 (Tags):
标签用于存储元数据,是数据点的主要索引维度,常用于WHERE
子句的过滤和GROUP BY
子句的分组。在我们的风机监控场景中,关键的标签包括:iotId
(String): 硬件设备在物联网平台上的唯一标识符。productKey
(String): 设备所属的产品型号标识。deviceName
(String): 设备的自定义名称,例如 “tunnel-A-fan-01”,这是我们系统中标识具体风机的主要业务ID。deviceType
(String): 设备类型,例如 “FanDevice”, “SensorHub”,用于区分不同类型的硬件。(可选) location_zone
(String): 风机所在的隧道区域或更细分的地理位置标签,如果需要按区域进行聚合分析。
重要特性:标签集合与顺序的不可变性
InfluxDB 3的一个核心设计是,当数据首次写入一个新表时,该表中出现的标签键及其顺序(InfluxDB内部决定的顺序)就被固定下来了。之后,你不能再为这个表添加新的标签键。这意味着在设计初期,必须仔细规划好一个表需要哪些核心的、用于索引和分组的维度作为标签。如果后续确实需要新的索引维度,可能需要重新设计表结构或创建新表。 -
字段 (Fields):
字段用于存储实际的测量值或具体的属性信息。对于风机监控数据,字段将包括:ax
,ay
,az
(Double): X, Y, Z轴的振动值。roll
,pitch
,yaw
(Double): 翻滚角、俯仰角、偏航角。LightCurrent
(Double): 光照传感器电流(或根据实际意义命名,如operating_current
)。temperature
(Double): 温度读数。(可选) status_message
(String): 风机的详细状态描述或错误信息(如果不是主要用于过滤或聚合)。(可选) online_status
(Boolean/Integer): 表示设备在线状态的布尔值或整数值,如果设备上下线事件也作为时序数据记录。
-
时间戳 (Time):
- 每条数据点都必须有一个时间戳,表示数据采集或事件发生的时间。InfluxDB 3支持纳秒级精度,我们在Java客户端中统一使用
Instant
对象,并以纳秒精度写入。
- 每条数据点都必须有一个时间戳,表示数据采集或事件发生的时间。InfluxDB 3支持纳秒级精度,我们在Java客户端中统一使用
这个数据模型旨在平衡查询灵活性和InfluxDB的性能特点。通过合理的标签设计,我们可以高效地根据设备ID、类型或位置筛选数据,并通过字段获取具体的监控指标。
定义数据POJO类:
- 展示如何为接收到的不同JSON消息结构(设备状态消息、传感器数据消息)创建对应的Java POJO类 (DeviceStatusMessage, DeviceDataMessage 及其内部类)。
- 使用Jackson注解 (@JsonProperty) 处理JSON键名与Java变量名不一致的情况。
package com.ruoyi.oil.controller;import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.oil.domain.IotAlert;
import com.ruoyi.oil.domain.IotDevice;
import com.ruoyi.oil.domain.dto.DeptsDTO;
import com.ruoyi.oil.domain.dto.DeviceQuery;
import com.ruoyi.oil.domain.dto.DeviceTopic;
import com.ruoyi.oil.domain.dto.DeviceTsDTO;
import com.ruoyi.oil.domain.vo.DeviceInfoVO;
import com.ruoyi.oil.service.IInfluxDBService;
import com.ruoyi.oil.service.IIotDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.http.ResponseEntity;
import com.influxdb.v3.client.PointValues; // 如果直接处理Stream<PointValues>
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.LinkedHashMap;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.util.List;@Anonymous
@RestController
@RequestMapping("/iot/data")
public class IotDataController extends BaseController {@Autowiredprivate IInfluxDBService influxDBService;/*** 获取特定风机在指定时间范围内的所有指标 (使用 queryPoints)*/@Anonymous@GetMapping("/query")public ResponseEntity<List<Map<String, Object>>> getDataByFan(@RequestParam String deviceName,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime startTime,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime endTime,@RequestParam(defaultValue = "100") int limit) {// 基础SQL查询语句StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT time, ").append("\"deviceName\", ") // <--- 修改这里.append("\"iotId\", ") // <--- 建议所有tag和field名都用双引号,特别是如果它们不是纯小写.append("\"productKey\", ").append("ax, ay, az, roll, pitch, yaw, temperature, \"LightCurrent\" "); // LightCurrent也用引号sqlBuilder.append("FROM ").append("fan_data").append(" ");sqlBuilder.append("WHERE \"deviceName\" = '").append(escapeSqlIdentifier(deviceName)).append("'"); // 添加时间过滤条件(如果提供了时间参数)if (startTime != null) {sqlBuilder.append(" AND time >= '").append(startTime.toInstant().toString()).append("'");}if (endTime != null) {sqlBuilder.append(" AND time < '").append(endTime.toInstant().toString()).append("'");}sqlBuilder.append(" ORDER BY time DESC LIMIT ").append(limit);String sqlQuery = sqlBuilder.toString();List<Map<String, Object>> results = new ArrayList<>();try (Stream<PointValues> stream = influxDBService.queryPoints(sqlQuery)) {results = stream.map(pv -> {Map<String, Object> row = new LinkedHashMap<>();if (pv.getTimestamp() != null) row.put("time", pv.getTimestamp());// 根据你的SELECT语句明确提取if (pv.getTag("deviceName") != null) row.put("deviceName", pv.getTag("deviceName"));if (pv.getTag("iotId") != null) row.put("iotId", pv.getTag("iotId"));if (pv.getTag("productKey") != null) row.put("productKey", pv.getTag("productKey"));putFieldIfPresent(pv, row, "ax", Double.class);putFieldIfPresent(pv, row, "ay", Double.class);putFieldIfPresent(pv, row, "az", Double.class);putFieldIfPresent(pv, row, "roll", Double.class);putFieldIfPresent(pv, row, "pitch", Double.class);putFieldIfPresent(pv, row, "yaw", Double.class);putFieldIfPresent(pv, row, "temperature", Double.class);putFieldIfPresent(pv, row, "LightCurrent", Double.class);return row;}).collect(Collectors.toList());} catch (Exception e) {// log errorreturn ResponseEntity.status(500).body(null);}return ResponseEntity.ok(results);}// 辅助方法,用于从 PointValues 安全地获取字段并放入 Mapprivate <T> void putFieldIfPresent(PointValues pv, Map<String, Object> map, String fieldName, Class<T> type) {try {T value = pv.getField(fieldName, type);if (value != null) {map.put(fieldName, value);}} catch (Exception e) {// 字段不存在或类型不匹配时,getField会抛异常// logger.trace("Field '{}' not found or type mismatch in PointValues", fieldName);}}// 非常基础的SQL标识符清理,防止简单注入。生产环境需要更健壮的方案或使用预编译语句。private String escapeSqlIdentifier(String identifier) {if (identifier == null) return null;return identifier.replace("'", "''");}
}
package com.ruoyi.oil.controller;import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.oil.domain.IotAlert;
import com.ruoyi.oil.domain.IotDevice;
import com.ruoyi.oil.domain.dto.DeptsDTO;
import com.ruoyi.oil.domain.dto.DeviceQuery;
import com.ruoyi.oil.domain.dto.DeviceTopic;
import com.ruoyi.oil.domain.dto.DeviceTsDTO;
import com.ruoyi.oil.domain.vo.DeviceInfoVO;
import com.ruoyi.oil.service.IIotDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;import javax.servlet.http.HttpServletResponse;
import java.util.List;/*** 设备管理Controller* * @author wanglong* @date 2024-04-15*/@RestController
@RequestMapping("/iot/device")
public class IotDeviceController extends BaseController
{@Autowiredprivate IIotDeviceService iotDeviceService;/*** 查询设备管理列表*/@PreAuthorize("@ss.hasPermi('iot:device:list')")@GetMapping("/list")public TableDataInfo list(DeviceQuery deviceQuery){startPage();List<DeviceInfoVO> list = iotDeviceService.selectIotDeviceDataList(deviceQuery);return getDataTable(list);}@PreAuthorize("@ss.hasPermi('iot:device:list')")@GetMapping("/listF")public TableDataInfo listF(IotDevice iotDevice){startPage();List<IotDevice> list = iotDeviceService.selectIotDeviceDataListF(iotDevice);return getDataTable(list);}/*** 导出设备管理列表*/@PreAuthorize("@ss.hasPermi('iot:device:export')")@PostMapping("/export")public void export(HttpServletResponse response, IotDevice iotDevice){List<IotDevice> list = iotDeviceService.selectIotDeviceList(iotDevice);ExcelUtil<IotDevice> util = new ExcelUtil<IotDevice>(IotDevice.class);util.exportExcel(response, list, "设备管理数据");}/*** 获取设备管理详细信息*/@PreAuthorize("@ss.hasPermi('iot:device:query')")@GetMapping(value = "/{deviceId}")public AjaxResult getInfo(@PathVariable("deviceId") Long deviceId){return success(iotDeviceService.selectIotDeviceByDeviceId(deviceId));}/*** 新增设备管理*/@PreAuthorize("@ss.hasPermi('iot:device:add')")@Log(title = "设备管理", businessType = BusinessType.INSERT)@PostMappingpublic AjaxResult add(@RequestBody IotDevice iotDevice){return toAjax(iotDeviceService.insertIotDevice(iotDevice));}/*** 修改设备管理*/@PreAuthorize("@ss.hasPermi('iot:device:edit')")@Log(title = "设备管理", businessType = BusinessType.UPDATE)@PutMapping@Anonymouspublic AjaxResult edit(@RequestBody IotDevice iotDevice){return toAjax(iotDeviceService.updateIotDevice(iotDevice));}/*** 删除设备管理*/@PreAuthorize("@ss.hasPermi('iot:device:remove')")@Log(title = "设备管理", businessType = BusinessType.DELETE)@DeleteMapping("/{deviceIds}")public AjaxResult remove(@PathVariable Long[] deviceIds){return toAjax(iotDeviceService.deleteIotDeviceByDeviceIds(deviceIds));}/*** 获取设备警报事件记录* @param iotAlert* @return*/@GetMapping("/getEvents")public TableDataInfo getEvent(IotAlert iotAlert) {startPage();List<IotAlert> iotAlerts = iotDeviceService.queryEvent(iotAlert);return getDataTable(iotAlerts);}/*** 修改设备运转周期及电机开关* @param deviceTopic* @return*/@Log(title = "设备管理", businessType = BusinessType.UPDATE)@PreAuthorize("@ss.hasPermi('iot:device:setZhouqi')")@PostMapping("cycle")public AjaxResult cycle(@RequestBody DeviceTopic deviceTopic) {iotDeviceService.updateCycleStatus(deviceTopic);return success();}/*** 激活设备* @param deviceTopic* @return*/@PostMapping("setActive")public AjaxResult activeDevice(@RequestBody DeviceTopic deviceTopic) {iotDeviceService.setActiveCode(deviceTopic);return success();}/*** 导入excel*/@Log(title = "设备管理", businessType = BusinessType.IMPORT)@PreAuthorize("@ss.hasPermi('iot:device:import')")@PostMapping("/importData")public AjaxResult importData(MultipartFile file) throws Exception{ExcelUtil<IotDevice> util = new ExcelUtil(IotDevice.class);List<IotDevice> IotDeviceList = util.importExcel(file.getInputStream());String operName = getUsername();String message = iotDeviceService.importData(IotDeviceList, operName);return success(message);}/*** 导出数据模板* @param response*/@PostMapping("/importTemplate")public void importTemplate(HttpServletResponse response){ExcelUtil<IotDevice> util = new ExcelUtil<IotDevice>(IotDevice.class);util.importTemplateExcel(response, "用户数据");}@PostMapping("/tempAcc")public TableDataInfo getTempInfo(@RequestBody DeviceTsDTO deviceTsDTO) {return getDataTable(iotDeviceService.selectTempAcc(deviceTsDTO));}/*** 获取每天全部设备的油量使用量* @param iotDevice* @return*/@GetMapping("/getDayFuel")public AjaxResult getDayFuel(IotDevice iotDevice){return success(iotDeviceService.getDayFue(iotDevice));}/*** 更新部门* @param deptsDTO* @return*/@Log(title = "设备管理", businessType = BusinessType.UPDATE)@PreAuthorize("@ss.hasPermi('iot:device:updateDepts')")@PostMapping("/updateDepts")public AjaxResult updateDepts(@RequestBody DeptsDTO deptsDTO){return toAjax(iotDeviceService.updateDepts(deptsDTO));}/*** 上传图片* @param file* @return* @throws Exception*/@PostMapping("/uploadImage")public AjaxResult uploadImage(@RequestParam("file") MultipartFile file) throws Exception{return iotDeviceService.uploadImage(file);}/*** 根据路径删除照片* @param path* @return*/@PostMapping("/deletePhoto")public AjaxResult deletePhoto(@RequestParam("path") String path){return toAjax(iotDeviceService.deletePhoto(path));}@GetMapping("/getDeviceInfo")public AjaxResult getDeviceInfo(@RequestParam Long deviceId) {return success(iotDeviceService.getDeviceInfo(deviceId));}
}
Controller层数据读取示例:
- 构建动态SQL(根据tag筛选、时间范围筛选)。
/*** 获取特定风机在指定时间范围内的所有指标 (使用 queryPoints)*/@Anonymous@GetMapping("/query")public ResponseEntity<List<Map<String, Object>>> getDataByFan(@RequestParam String deviceName,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime startTime,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime endTime,@RequestParam(defaultValue = "100") int limit) {// 基础SQL查询语句StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT time, ").append("\"deviceName\", ") // <--- 修改这里.append("\"iotId\", ") // <--- 建议所有tag和field名都用双引号,特别是如果它们不是纯小写.append("\"productKey\", ").append("ax, ay, az, roll, pitch, yaw, temperature, \"LightCurrent\" "); // LightCurrent也用引号sqlBuilder.append("FROM ").append("fan_data").append(" ");sqlBuilder.append("WHERE \"deviceName\" = '").append(escapeSqlIdentifier(deviceName)).append("'"); // 添加时间过滤条件(如果提供了时间参数)if (startTime != null) {sqlBuilder.append(" AND time >= '").append(startTime.toInstant().toString()).append("'");}if (endTime != null) {sqlBuilder.append(" AND time < '").append(endTime.toInstant().toString()).append("'");}sqlBuilder.append(" ORDER BY time DESC LIMIT ").append(limit);String sqlQuery = sqlBuilder.toString();List<Map<String, Object>> results = new ArrayList<>();try (Stream<PointValues> stream = influxDBService.queryPoints(sqlQuery)) {results = stream.map(pv -> {Map<String, Object> row = new LinkedHashMap<>();if (pv.getTimestamp() != null) row.put("time", pv.getTimestamp());// 根据你的SELECT语句明确提取if (pv.getTag("deviceName") != null) row.put("deviceName", pv.getTag("deviceName"));if (pv.getTag("iotId") != null) row.put("iotId", pv.getTag("iotId"));if (pv.getTag("productKey") != null) row.put("productKey", pv.getTag("productKey"));putFieldIfPresent(pv, row, "ax", Double.class);putFieldIfPresent(pv, row, "ay", Double.class);putFieldIfPresent(pv, row, "az", Double.class);putFieldIfPresent(pv, row, "roll", Double.class);putFieldIfPresent(pv, row, "pitch", Double.class);putFieldIfPresent(pv, row, "yaw", Double.class);putFieldIfPresent(pv, row, "temperature", Double.class);putFieldIfPresent(pv, row, "LightCurrent", Double.class);return row;}).collect(Collectors.toList());} catch (Exception e) {// log errorreturn ResponseEntity.status(500).body(null);}return ResponseEntity.ok(results);}
五、 遇到的问题与解决方案(踩坑实录)
ClassNotFoundException: com.influxdb.v3.client.PointValues
:- 原因分析:Maven依赖问题、IDE缓存。
- 解决方法:强制刷新Maven依赖、清理IDE缓存、检查
pom.xml
。
java.net.ConnectException: Connection refused
:- 原因分析:InfluxDB服务未运行、配置的URL/端口错误、防火墙。
- 排查步骤。
- SQL查询报错
Schema error: No field named ...
:- 原因分析:SQL中列名大小写与实际存储不一致,未用双引号包裹驼峰式或特殊标识符。
- 解决方法:在SQL中对这类标识符使用双引号并保持大小写。
- 处理不同结构的JSON消息:
- 问题:设备上线消息和数据消息格式不同。
- 解决方案:使用
ObjectMapper.readTree()
预解析判断关键字段,然后反序列化到不同的POJO。
PointValues
API的理解:- 初期可能误以为有
getTagKeys()
等方法,实际需要按名获取。 - 如何有效地从
PointValues
提取数据到自定义结构。
- 初期可能误以为有
- JVM参数
--add-opens
的重要性:不加此参数可能会导致运行时与Arrow Flight相关的底层错误。
本文的仓库地址:https://github.com/dream-one/infulxDB3-JAVA