当前位置: 首页 > news >正文

时序数据库系列(六):物联网监控系统实战

1 项目背景与架构设计

在这里插入图片描述

图1-1:物联网监控系统整体架构与数据流

在这里插入图片描述

图1-2:技术选型对比分析与决策矩阵

假设你要为一个智能工厂搭建监控系统,需要实时采集温度、湿度、压力、振动等传感器数据,并提供实时监控和历史数据分析功能。这种场景下,InfluxDB是最佳选择。

我们的系统架构包含几个核心组件:传感器设备、数据采集网关、InfluxDB存储、Java后端服务、Web监控界面。整个数据流是这样的:传感器→网关→InfluxDB→后端API→前端展示。

1.1 系统需求分析

数据采集需求

  • 支持多种传感器类型(温湿度、压力、振动、电流等)
  • 数据采集频率:每秒到每分钟不等
  • 设备数量:100-1000个传感器节点
  • 数据保留:实时数据保留7天,聚合数据保留1年

监控功能需求

  • 实时数据展示和告警
  • 历史趋势分析
  • 设备状态监控
  • 数据导出和报表生成

1.2 技术架构选型

数据存储层

  • InfluxDB 2.x:时序数据存储
  • Redis:缓存和会话管理

应用服务层

  • Spring Boot:Java后端框架
  • InfluxDB Java Client:数据库连接
  • WebSocket:实时数据推送

前端展示层

  • Vue.js + ECharts:数据可视化
  • Element UI:界面组件

2 InfluxDB数据模型设计

在这里插入图片描述

图2-1:InfluxDB数据模型结构与查询策略

2.1 数据结构规划

根据物联网场景的特点,我们设计如下的数据模型:

传感器数据表(sensor_data)

measurement: sensor_data
tags: - device_id: 设备ID- sensor_type: 传感器类型(temperature, humidity, pressure等)- location: 设备位置- workshop: 车间编号
fields:- value: 传感器数值- status: 设备状态(0正常,1异常)- battery: 电池电量(可选)
timestamp: 数据采集时间

设备状态表(device_status)

measurement: device_status
tags:- device_id: 设备ID- device_type: 设备类型
fields:- online: 是否在线- last_heartbeat: 最后心跳时间- signal_strength: 信号强度
timestamp: 状态更新时间

2.2 数据写入策略

考虑到物联网设备的特点,我们采用批量写入策略来提高性能:

// 批量数据写入配置
WriteOptions writeOptions = WriteOptions.builder().batchSize(1000)           // 批量大小.flushInterval(5000)       // 5秒刷新一次.bufferLimit(10000)        // 缓冲区大小.retryInterval(1000)       // 重试间隔.build();

3 Java后端服务实现

在这里插入图片描述

图3-1:Java后端服务架构与组件关系

在这里插入图片描述

图3-2:物联网监控系统数据流处理过程

3.1 项目依赖配置

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>com.influxdb</groupId><artifactId>influxdb-client-java</artifactId><version>6.7.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency>
</dependencies>

3.2 InfluxDB配置类

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class InfluxDBConfig {@Value("${influxdb.url}")private String influxUrl;@Value("${influxdb.token}")private String token;@Value("${influxdb.org}")private String org;@Value("${influxdb.bucket}")private String bucket;@Beanpublic InfluxDBClient influxDBClient() {return InfluxDBClientFactory.create(influxUrl, token.toCharArray());}@Beanpublic WriteApiBlocking writeApi(InfluxDBClient client) {return client.getWriteApiBlocking();}
}

3.3 数据模型定义

import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import java.time.Instant;@Measurement(name = "sensor_data")
public class SensorData {@Column(tag = true)private String deviceId;@Column(tag = true)private String sensorType;@Column(tag = true)private String location;@Column(tag = true)private String workshop;@Columnprivate Double value;@Columnprivate Integer status;@Columnprivate Double battery;@Column(timestamp = true)private Instant timestamp;// 构造函数public SensorData() {}public SensorData(String deviceId, String sensorType, String location, String workshop, Double value, Integer status) {this.deviceId = deviceId;this.sensorType = sensorType;this.location = location;this.workshop = workshop;this.value = value;this.status = status;this.timestamp = Instant.now();}// getters and setters...public String getDeviceId() { return deviceId; }public void setDeviceId(String deviceId) { this.deviceId = deviceId; }public String getSensorType() { return sensorType; }public void setSensorType(String sensorType) { this.sensorType = sensorType; }public String getLocation() { return location; }public void setLocation(String location) { this.location = location; }public String getWorkshop() { return workshop; }public void setWorkshop(String workshop) { this.workshop = workshop; }public Double getValue() { return value; }public void setValue(Double value) { this.value = value; }public Integer getStatus() { return status; }public void setStatus(Integer status) { this.status = status; }public Double getBattery() { return battery; }public void setBattery(Double battery) { this.battery = battery; }public Instant getTimestamp() { return timestamp; }public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
}@Measurement(name = "device_status")
public class DeviceStatus {@Column(tag = true)private String deviceId;@Column(tag = true)private String deviceType;@Columnprivate Boolean online;@Columnprivate Long lastHeartbeat;@Columnprivate Integer signalStrength;@Column(timestamp = true)private Instant timestamp;// 构造函数和getter/setter方法...public DeviceStatus() {}public DeviceStatus(String deviceId, String deviceType, Boolean online, Integer signalStrength) {this.deviceId = deviceId;this.deviceType = deviceType;this.online = online;this.signalStrength = signalStrength;this.lastHeartbeat = System.currentTimeMillis();this.timestamp = Instant.now();}// getters and setters...
}

3.4 数据访问层实现

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.query.FluxTable;
import com.influxdb.query.FluxRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;@Repository
public class SensorDataRepository {@Autowiredprivate InfluxDBClient influxDBClient;@Autowiredprivate WriteApiBlocking writeApi;@Value("${influxdb.bucket}")private String bucket;@Value("${influxdb.org}")private String org;// 写入单条传感器数据public void writeSensorData(SensorData data) {writeApi.writeMeasurement(WritePrecision.NS, data);}// 批量写入传感器数据public void writeSensorDataBatch(List<SensorData> dataList) {writeApi.writeMeasurements(WritePrecision.NS, dataList);}// 查询最新的传感器数据public List<SensorData> getLatestSensorData(String deviceId, int limit) {String flux = String.format("""from(bucket: "%s")|> range(start: -1h)|> filter(fn: (r) => r["_measurement"] == "sensor_data")|> filter(fn: (r) => r["device_id"] == "%s")|> sort(columns: ["_time"], desc: true)|> limit(n: %d)""", bucket, deviceId, limit);return executeQueryForSensorData(flux);}// 查询指定时间范围的传感器数据public List<SensorData> getSensorDataByTimeRange(String deviceId, String sensorType, Instant start, Instant end) {String flux = String.format("""from(bucket: "%s")|> range(start: %s, stop: %s)|> filter(fn: (r) => r["_measurement"] == "sensor_data")|> filter(fn: (r) => r["device_id"] == "%s")|> filter(fn: (r) => r["sensor_type"] == "%s")|> sort(columns: ["_time"])""", bucket, start, end, deviceId, sensorType);return executeQueryForSensorData(flux);}// 获取聚合数据(按时间窗口)public List<Map<String, Object>> getAggregatedData(String deviceId, String sensorType, String timeWindow, int hours) {String flux = String.format("""from(bucket: "%s")|> range(start: -%dh)|> filter(fn: (r) => r["_measurement"] == "sensor_data")|> filter(fn: (r) => r["device_id"] == "%s")|> filter(fn: (r) => r["sensor_type"] == "%s")|> filter(fn: (r) => r["_field"] == "value")|> aggregateWindow(every: %s, fn: mean, createEmpty: false)|> yield(name: "mean")""", bucket, hours, deviceId, sensorType, timeWindow);return executeQueryForAggregatedData(flux);}// 获取设备状态统计public Map<String, Object> getDeviceStatusStats() {String flux = String.format("""from(bucket: "%s")|> range(start: -5m)|> filter(fn: (r) => r["_measurement"] == "device_status")|> filter(fn: (r) => r["_field"] == "online")|> group(columns: ["device_id"])|> last()|> group()|> sum(column: "_value")""", bucket);QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);Map<String, Object> stats = new HashMap<>();int onlineCount = 0;int totalCount = 0;for (FluxTable table : tables) {for (FluxRecord record : table.getRecords()) {if (record.getValue() != null) {onlineCount = ((Number) record.getValue()).intValue();}}}// 获取总设备数String totalFlux = String.format("""from(bucket: "%s")|> range(start: -1h)|> filter(fn: (r) => r["_measurement"] == "device_status")|> group(columns: ["device_id"])|> last()|> group()|> count()""", bucket);List<FluxTable> totalTables = queryApi.query(totalFlux, org);for (FluxTable table : totalTables) {for (FluxRecord record : table.getRecords()) {if (record.getValue() != null) {totalCount = ((Number) record.getValue()).intValue();}}}stats.put("onlineCount", onlineCount);stats.put("totalCount", totalCount);stats.put("offlineCount", totalCount - onlineCount);stats.put("onlineRate", totalCount > 0 ? (double) onlineCount / totalCount : 0.0);return stats;}private List<SensorData> executeQueryForSensorData(String flux) {QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);List<SensorData> result = new ArrayList<>();for (FluxTable table : tables) {for (FluxRecord record : table.getRecords()) {SensorData data = new SensorData();data.setTimestamp(record.getTime());data.setDeviceId((String) record.getValueByKey("device_id"));data.setSensorType((String) record.getValueByKey("sensor_type"));data.setLocation((String) record.getValueByKey("location"));data.setWorkshop((String) record.getValueByKey("workshop"));if ("value".equals(record.getField())) {data.setValue((Double) record.getValue());} else if ("status".equals(record.getField())) {data.setStatus(((Number) record.getValue()).intValue());} else if ("battery".equals(record.getField())) {data.setBattery((Double) record.getValue());}result.add(data);}}return result;}private List<Map<String, Object>> executeQueryForAggregatedData(String flux) {QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);List<Map<String, Object>> result = new ArrayList<>();for (FluxTable table : tables) {for (FluxRecord record : table.getRecords()) {Map<String, Object> dataPoint = new HashMap<>();dataPoint.put("timestamp", record.getTime());dataPoint.put("value", record.getValue());result.add(dataPoint);}}return result;}
}

4 实时数据采集服务

在这里插入图片描述

图4-1:实时数据采集服务架构与处理流程

4.1 数据接收控制器

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.messaging.simp.SimpMessagingTemplate;import java.util.List;
import java.util.Map;@RestController
@RequestMapping("/api/sensor")
@CrossOrigin(origins = "*")
public class SensorDataController {@Autowiredprivate SensorDataRepository sensorDataRepository;@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate AlertService alertService;// 接收传感器数据@PostMapping("/data")public ResponseResult receiveSensorData(@RequestBody SensorDataRequest request) {try {// 数据验证if (request.getDeviceId() == null || request.getValue() == null) {return ResponseResult.error("设备ID和数值不能为空");}// 创建传感器数据对象SensorData sensorData = new SensorData(request.getDeviceId(),request.getSensorType(),request.getLocation(),request.getWorkshop(),request.getValue(),request.getStatus());if (request.getBattery() != null) {sensorData.setBattery(request.getBattery());}// 写入数据库sensorDataRepository.writeSensorData(sensorData);// 实时推送到前端messagingTemplate.convertAndSend("/topic/sensor-data", sensorData);// 检查告警条件alertService.checkAlerts(sensorData);return ResponseResult.success("数据接收成功");} catch (Exception e) {return ResponseResult.error("数据处理失败: " + e.getMessage());}}// 批量接收传感器数据@PostMapping("/data/batch")public ResponseResult receiveBatchSensorData(@RequestBody List<SensorDataRequest> requests) {try {List<SensorData> sensorDataList = requests.stream().map(request -> new SensorData(request.getDeviceId(),request.getSensorType(),request.getLocation(),request.getWorkshop(),request.getValue(),request.getStatus())).toList();// 批量写入sensorDataRepository.writeSensorDataBatch(sensorDataList);// 批量推送messagingTemplate.convertAndSend("/topic/sensor-data-batch", sensorDataList);return ResponseResult.success("批量数据接收成功,共处理 " + requests.size() + " 条数据");} catch (Exception e) {return ResponseResult.error("批量数据处理失败: " + e.getMessage());}}// 获取最新传感器数据@GetMapping("/latest/{deviceId}")public ResponseResult getLatestData(@PathVariable String deviceId,@RequestParam(defaultValue = "10") int limit) {try {List<SensorData> data = sensorDataRepository.getLatestSensorData(deviceId, limit);return ResponseResult.success(data);} catch (Exception e) {return ResponseResult.error("查询失败: " + e.getMessage());}}// 获取历史数据@GetMapping("/history")public ResponseResult getHistoryData(@RequestParam String deviceId,@RequestParam String sensorType,@RequestParam String startTime,@RequestParam String endTime) {try {Instant start = Instant.parse(startTime);Instant end = Instant.parse(endTime);List<SensorData> data = sensorDataRepository.getSensorDataByTimeRange(deviceId, sensorType, start, end);return ResponseResult.success(data);} catch (Exception e) {return ResponseResult.error("查询历史数据失败: " + e.getMessage());}}// 获取聚合统计数据@GetMapping("/stats")public ResponseResult getAggregatedStats(@RequestParam String deviceId,@RequestParam String sensorType,@RequestParam(defaultValue = "5m") String timeWindow,@RequestParam(defaultValue = "24") int hours) {try {List<Map<String, Object>> data = sensorDataRepository.getAggregatedData(deviceId, sensorType, timeWindow, hours);return ResponseResult.success(data);} catch (Exception e) {return ResponseResult.error("查询统计数据失败: " + e.getMessage());}}
}// 请求数据模型
class SensorDataRequest {private String deviceId;private String sensorType;private String location;private String workshop;private Double value;private Integer status;private Double battery;// getters and setters...
}// 响应结果模型
class ResponseResult {private boolean success;private String message;private Object data;public static ResponseResult success(Object data) {ResponseResult result = new ResponseResult();result.success = true;result.data = data;return result;}public static ResponseResult error(String message) {ResponseResult result = new ResponseResult();result.success = false;result.message = message;return result;}// getters and setters...
}

4.2 告警服务实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.HashMap;
import java.util.Map;@Service
public class AlertService {@Autowiredprivate SimpMessagingTemplate messagingTemplate;// 告警规则配置private final Map<String, AlertRule> alertRules = new HashMap<>();public AlertService() {// 初始化告警规则alertRules.put("temperature", new AlertRule("温度", -10.0, 60.0));alertRules.put("humidity", new AlertRule("湿度", 20.0, 80.0));alertRules.put("pressure", new AlertRule("压力", 0.8, 1.2));alertRules.put("vibration", new AlertRule("振动", 0.0, 5.0));}public void checkAlerts(SensorData sensorData) {AlertRule rule = alertRules.get(sensorData.getSensorType());if (rule == null) return;Double value = sensorData.getValue();if (value == null) return;// 检查是否超出正常范围if (value < rule.getMinValue() || value > rule.getMaxValue()) {Alert alert = new Alert(sensorData.getDeviceId(),sensorData.getSensorType(),rule.getName(),value,rule.getMinValue(),rule.getMaxValue(),"数值超出正常范围",Instant.now());// 发送告警sendAlert(alert);}// 检查设备状态if (sensorData.getStatus() != null && sensorData.getStatus() != 0) {Alert alert = new Alert(sensorData.getDeviceId(),sensorData.getSensorType(),rule.getName(),value,null,null,"设备状态异常",Instant.now());sendAlert(alert);}// 检查电池电量if (sensorData.getBattery() != null && sensorData.getBattery() < 20.0) {Alert alert = new Alert(sensorData.getDeviceId(),"battery","电池电量",sensorData.getBattery(),20.0,100.0,"电池电量过低",Instant.now());sendAlert(alert);}}private void sendAlert(Alert alert) {// 推送到前端messagingTemplate.convertAndSend("/topic/alerts", alert);// 这里可以添加其他告警方式,如邮件、短信等System.out.println("告警: " + alert.getMessage() + " - 设备: " + alert.getDeviceId() + " - 当前值: " + alert.getCurrentValue());}
}// 告警规则类
class AlertRule {private String name;private Double minValue;private Double maxValue;public AlertRule(String name, Double minValue, Double maxValue) {this.name = name;this.minValue = minValue;this.maxValue = maxValue;}// getters and setters...
}// 告警信息类
class Alert {private String deviceId;private String sensorType;private String sensorName;private Double currentValue;private Double minValue;private Double maxValue;private String message;private Instant timestamp;public Alert(String deviceId, String sensorType, String sensorName, Double currentValue, Double minValue, Double maxValue, String message, Instant timestamp) {this.deviceId = deviceId;this.sensorType = sensorType;this.sensorName = sensorName;this.currentValue = currentValue;this.minValue = minValue;this.maxValue = maxValue;this.message = message;this.timestamp = timestamp;}// getters and setters...
}

5 监控仪表板实现

在这里插入图片描述

图5-1:监控仪表板前端架构与组件设计

5.1 设备监控控制器

@RestController
@RequestMapping("/api/dashboard")
@CrossOrigin(origins = "*")
public class DashboardController {@Autowiredprivate SensorDataRepository sensorDataRepository;// 获取仪表板概览数据@GetMapping("/overview")public ResponseResult getDashboardOverview() {try {Map<String, Object> overview = new HashMap<>();// 设备状态统计Map<String, Object> deviceStats = sensorDataRepository.getDeviceStatusStats();overview.put("deviceStats", deviceStats);// 最新数据统计overview.put("latestDataCount", getLatestDataCount());// 告警统计overview.put("alertStats", getAlertStats());return ResponseResult.success(overview);} catch (Exception e) {return ResponseResult.error("获取概览数据失败: " + e.getMessage());}}// 获取实时数据流@GetMapping("/realtime/{deviceId}")public ResponseResult getRealtimeData(@PathVariable String deviceId) {try {List<SensorData> data = sensorDataRepository.getLatestSensorData(deviceId, 50);return ResponseResult.success(data);} catch (Exception e) {return ResponseResult.error("获取实时数据失败: " + e.getMessage());}}// 获取趋势分析数据@GetMapping("/trend")public ResponseResult getTrendData(@RequestParam String deviceId,@RequestParam String sensorType,@RequestParam(defaultValue = "1h") String timeWindow,@RequestParam(defaultValue = "24") int hours) {try {List<Map<String, Object>> trendData = sensorDataRepository.getAggregatedData(deviceId, sensorType, timeWindow, hours);// 计算趋势指标Map<String, Object> result = new HashMap<>();result.put("data", trendData);result.put("trend", calculateTrend(trendData));return ResponseResult.success(result);} catch (Exception e) {return ResponseResult.error("获取趋势数据失败: " + e.getMessage());}}private int getLatestDataCount() {// 这里可以实现获取最近5分钟的数据量统计return 1250; // 示例数据}private Map<String, Object> getAlertStats() {Map<String, Object> alertStats = new HashMap<>();alertStats.put("totalAlerts", 15);alertStats.put("criticalAlerts", 3);alertStats.put("warningAlerts", 12);return alertStats;}private Map<String, Object> calculateTrend(List<Map<String, Object>> data) {if (data.size() < 2) {return Map.of("direction", "stable", "change", 0.0);}Double firstValue = (Double) data.get(0).get("value");Double lastValue = (Double) data.get(data.size() - 1).get("value");if (firstValue == null || lastValue == null) {return Map.of("direction", "stable", "change", 0.0);}double change = ((lastValue - firstValue) / firstValue) * 100;String direction = change > 5 ? "up" : change < -5 ? "down" : "stable";return Map.of("direction", direction, "change", Math.round(change * 100.0) / 100.0);}
}

5.2 WebSocket配置

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {// 启用简单消息代理config.enableSimpleBroker("/topic");// 设置应用程序目标前缀config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册STOMP端点registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}

6 数据模拟器

在这里插入图片描述

图6-1:数据模拟器生成流程与策略设计

为了测试系统,我们创建一个数据模拟器来生成传感器数据:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;
import java.util.Random;@Component
public class SensorDataSimulator {@Autowiredprivate SensorDataRepository sensorDataRepository;private final Random random = new Random();private final String[] deviceIds = {"TEMP_001", "TEMP_002", "HUM_001", "HUM_002", "PRESS_001"};private final String[] locations = {"车间A", "车间B", "车间C", "仓库", "办公区"};private final String[] workshops = {"WS001", "WS002", "WS003", "WS004", "WS005"};// 每30秒生成一批模拟数据@Scheduled(fixedRate = 30000)public void generateSimulatedData() {List<SensorData> dataList = new ArrayList<>();for (String deviceId : deviceIds) {String sensorType = getSensorType(deviceId);SensorData data = new SensorData(deviceId,sensorType,locations[random.nextInt(locations.length)],workshops[random.nextInt(workshops.length)],generateSensorValue(sensorType),random.nextInt(100) < 95 ? 0 : 1 // 95%概率正常状态);// 随机设置电池电量if (random.nextBoolean()) {data.setBattery(80.0 + random.nextDouble() * 20.0);}dataList.add(data);}// 批量写入sensorDataRepository.writeSensorDataBatch(dataList);System.out.println("生成了 " + dataList.size() + " 条模拟传感器数据");}private String getSensorType(String deviceId) {if (deviceId.startsWith("TEMP")) return "temperature";if (deviceId.startsWith("HUM")) return "humidity";if (deviceId.startsWith("PRESS")) return "pressure";return "unknown";}private Double generateSensorValue(String sensorType) {switch (sensorType) {case "temperature":return 20.0 + random.nextGaussian() * 5.0; // 平均20度,标准差5度case "humidity":return 50.0 + random.nextGaussian() * 10.0; // 平均50%,标准差10%case "pressure":return 1.0 + random.nextGaussian() * 0.1; // 平均1.0,标准差0.1default:return random.nextDouble() * 100;}}
}

7 性能优化与最佳实践

在这里插入图片描述

图7-1:性能优化策略对比分析与效果评估

7.1 批量写入优化

@Service
public class OptimizedDataService {@Autowiredprivate InfluxDBClient influxDBClient;@Value("${influxdb.bucket}")private String bucket;@Value("${influxdb.org}")private String org;// 使用异步写入API提高性能public void writeDataAsync(List<SensorData> dataList) {WriteApi writeApi = influxDBClient.getWriteApi(WriteOptions.builder().batchSize(1000).flushInterval(5000).bufferLimit(10000).build());writeApi.writeMeasurements(WritePrecision.NS, dataList);writeApi.close(); // 确保数据被刷新}// 数据压缩和预处理public List<SensorData> preprocessData(List<SensorData> rawData) {return rawData.stream().filter(data -> data.getValue() != null) // 过滤空值.filter(data -> isValidRange(data)) // 过滤异常值.map(this::normalizeData) // 数据标准化.collect(Collectors.toList());}private boolean isValidRange(SensorData data) {Double value = data.getValue();String type = data.getSensorType();switch (type) {case "temperature":return value >= -50 && value <= 100;case "humidity":return value >= 0 && value <= 100;case "pressure":return value >= 0 && value <= 10;default:return true;}}private SensorData normalizeData(SensorData data) {// 数据精度控制if (data.getValue() != null) {data.setValue(Math.round(data.getValue() * 100.0) / 100.0);}return data;}
}

7.2 查询性能优化

@Service
public class OptimizedQueryService {@Autowiredprivate InfluxDBClient influxDBClient;@Value("${influxdb.bucket}")private String bucket;@Value("${influxdb.org}")private String org;// 使用连接池和缓存优化查询@Cacheable(value = "sensorData", key = "#deviceId + '_' + #hours")public List<Map<String, Object>> getCachedAggregatedData(String deviceId, int hours) {String flux = String.format("""from(bucket: "%s")|> range(start: -%dh)|> filter(fn: (r) => r["_measurement"] == "sensor_data")|> filter(fn: (r) => r["device_id"] == "%s")|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)""", bucket, hours, deviceId);return executeOptimizedQuery(flux);}// 分页查询大数据集public List<SensorData> getPagedSensorData(String deviceId, int page, int size) {int offset = page * size;String flux = String.format("""from(bucket: "%s")|> range(start: -24h)|> filter(fn: (r) => r["_measurement"] == "sensor_data")|> filter(fn: (r) => r["device_id"] == "%s")|> sort(columns: ["_time"], desc: true)|> limit(n: %d, offset: %d)""", bucket, deviceId, size, offset);return executeQueryForSensorData(flux);}private List<Map<String, Object>> executeOptimizedQuery(String flux) {QueryApi queryApi = influxDBClient.getQueryApi();// 使用流式查询减少内存占用List<Map<String, Object>> result = new ArrayList<>();queryApi.query(flux, org, (cancellable, record) -> {Map<String, Object> dataPoint = new HashMap<>();dataPoint.put("timestamp", record.getTime());dataPoint.put("value", record.getValue());result.add(dataPoint);});return result;}
}

这个物联网监控系统展示了InfluxDB在实际项目中的应用。通过合理的数据模型设计、高效的批量写入、实时数据推送和智能告警机制,我们构建了一个完整的时序数据处理平台。

系统的核心优势包括:高并发数据写入能力、灵活的查询和聚合功能、实时监控和告警、可扩展的架构设计。这些特性让它能够很好地适应各种物联网和监控场景的需求。

http://www.dtcms.com/a/582045.html

相关文章:

  • 迁移学习基础知识——迁移学习的问题形式化
  • java基础-ArrayList集合
  • 做网站运营优质做网站价格
  • 【双机位A卷】华为OD笔试之【哈希表】双机位A-采购订单【Py/Java/C++/C/JS/Go六种语言】【欧弟算法】全网注释最详细分类最全的华子OD真题题解
  • 第十章、GPT1:Improving Language Understanding by Generative Pre-Training(代码部分)
  • 2025全球生成式人工智能AIGC产业全景与行业应用研究报告|附900+份报告PDF、数据、可视化模板汇总下载
  • 网站广告销售怎们做网站开发确认书
  • 常见的模型性能评估图表案例解读
  • 网站推广服务网站连锁金融网站怎么做
  • 从协议中成长
  • ⚡️2025-11-07GitHub日榜Top5|AI舆情分析系统
  • 云建站淘宝客网页设计教程 表单
  • 石河子农八师建设兵团社保网站餐饮营销方案
  • P1012 [NOIP 1998 提高组] 拼数
  • 第四阶段C#通讯开发-9:网络协议Modbus下的TCP与UDP
  • 《计算机操作系统》_并发 bug 和应对 (死锁/数据竞争/原子性违反;防御性编程和动态分析)20251106
  • 【算法】递归的艺术:从本质思想到递归树,深入剖析算法的性能权衡
  • 网上怎么做网站赚钱seo初级入门教程
  • MySQL GROUP BY 和 GROUP_CONCAT 使用方法总结,group by后将其他的字段整合到一个字段中 并通过逗号链接或指定其他链接符号
  • 数字人|数字人企业技术派选择
  • 简单实现文字两端对齐
  • Flink Rebalance触发乱序的问题
  • 联合建设官方网站公司邮箱怎么在手机上登录
  • 代理龙华网站建设深圳英迈思做网站好么
  • UE网络复制中的可靠函数是什么意思 什么时候要勾选什么时候不勾?
  • 沈阳做网站建设微信公众号小程序开发教程
  • slice在Python和Go中的异同
  • 科技公司网站设计公司天津定制开发网站
  • 3 个近期 yyds 的 AI 开源项目, 有点绝。
  • 智启未来 共筑开放新生态——2025进博会人工智能亮点纷呈