当前位置: 首页 > wzjs >正文

什么网站做美食最好最专业做动漫短视频网站

什么网站做美食最好最专业,做动漫短视频网站,菠菜网站怎样做安全,集团企业网站建设方案策划书在物联网系统中,时序数据库(TSDB)和关系型数据库(RDBMS)的存储顺序设计需要根据数据特性、业务需求和系统架构综合考虑。以下是典型的设计方案和逻辑顺序:1. 常见存储顺序方案 方案一:先写时序数…

在物联网系统中,时序数据库(TSDB)和关系型数据库(RDBMS)的存储顺序设计需要根据数据特性、业务需求和系统架构综合考虑。以下是典型的设计方案和逻辑顺序:


1. 常见存储顺序方案

方案一:先写时序数据库,后异步同步到关系型数据库

适用场景:高频传感器数据为主,业务数据可容忍短暂延迟。
流程

  1. MQTT Broker 接收设备原始数据(如 devices/A/temperature)。
  2. 数据首先写入时序数据库(如InfluxDB):
    • 存储原始时间序列数据(高吞吐、低延迟)。
  3. 异步处理层(如Kafka/Flink)消费数据,处理后写入关系型数据库:
    • 提取关键状态(如最新温度值)写入MySQL的device_status表。
    • 关联设备元数据(如设备所属用户)。

优点

  • 确保传感器数据的写入性能最大化。
  • 避免高频写入拖累关系型数据库。

示例代码(伪代码):

# MQTT回调处理
def on_mqtt_message(topic, payload):# 1. 原始数据写入InfluxDBinflux.write({"measurement": "sensor_data","tags": {"device_id": topic.split('/')[1]},"fields": {"temperature": payload.temp},"time": payload.timestamp})# 2. 异步推送至Kafka,后续处理kafka.produce("device_updates", key=device_id, value=payload)# Kafka消费者处理业务逻辑
def kafka_consumer():for message in kafka.consume():# 3. 关联设备元数据并写入MySQLdevice = mysql.query("SELECT * FROM devices WHERE id = ?", message.device_id)mysql.execute("UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?",message.temp, message.device_id)

示例代码(以下是使用Java实现的等效代码,包含MQTT回调处理、InfluxDB写入和通过Kafka异步处理写入MySQL的逻辑):

import org.eclipse.paho.client.mqttv3.*;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.sql.*;
import java.time.Instant;
import java.util.Properties;public class MqttDataProcessor {// InfluxDB 配置private final InfluxDBClient influxDBClient;// Kafka 生产者private final KafkaProducer<String, DeviceData> kafkaProducer;// MySQL 连接private final Connection mysqlConnection;public MqttDataProcessor(InfluxDBClient influxDBClient, KafkaProducer<String, DeviceData> kafkaProducer,Connection mysqlConnection) {this.influxDBClient = influxDBClient;this.kafkaProducer = kafkaProducer;this.mysqlConnection = mysqlConnection;}// MQTT回调处理public IMqttMessageListener createMqttListener() {return (topic, message) -> {try {// 解析payloadDeviceData data = parsePayload(topic, message.getPayload());// 1. 原始数据写入InfluxDBwriteToInfluxDB(data);// 2. 异步推送至KafkasendToKafka(data);} catch (Exception e) {e.printStackTrace();}};}private DeviceData parsePayload(String topic, byte[] payload) {// 这里应该是你的实际payload解析逻辑String deviceId = topic.split("/")[1];// 示例: 假设payload是JSON格式 {"temp": 25.5, "timestamp": 123456789}String json = new String(payload);// 实际项目中可以使用Gson/Jackson等库double temp = Double.parseDouble(json.split("\"temp\":")[1].split(",")[0]);long timestamp = Long.parseLong(json.split("\"timestamp\":")[1].split("}")[0]);return new DeviceData(deviceId, temp, Instant.ofEpochSecond(timestamp));}private void writeToInfluxDB(DeviceData data) {try (WriteApi writeApi = influxDBClient.getWriteApi()) {Point point = Point.measurement("sensor_data").addTag("device_id", data.getDeviceId()).addField("temperature", data.getTemp()).time(data.getTimestamp(), WritePrecision.S);writeApi.writePoint(point);}}private void sendToKafka(DeviceData data) {ProducerRecord<String, DeviceData> record = new ProducerRecord<>("device_updates", data.getDeviceId(), data);kafkaProducer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();}});}// Kafka消费者处理业务逻辑public void startKafkaConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-data-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataDeserializer"); // 需要自定义try (KafkaConsumer<String, DeviceData> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(List.of("device_updates"));while (true) {ConsumerRecords<String, DeviceData> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, DeviceData> record : records) {// 3. 关联设备元数据并写入MySQLupdateMySQL(record.value());}}}}private void updateMySQL(DeviceData data) {String query = "SELECT * FROM devices WHERE id = ?";String update = "UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?";try (PreparedStatement selectStmt = mysqlConnection.prepareStatement(query);PreparedStatement updateStmt = mysqlConnection.prepareStatement(update)) {// 查询设备元数据selectStmt.setString(1, data.getDeviceId());ResultSet rs = selectStmt.executeQuery();if (rs.next()) {// 更新设备状态updateStmt.setDouble(1, data.getTemp());updateStmt.setString(2, data.getDeviceId());updateStmt.executeUpdate();}} catch (SQLException e) {e.printStackTrace();}}// 设备数据DTOpublic static class DeviceData {private String deviceId;private double temp;private Instant timestamp;// 构造器、getter和setterpublic DeviceData(String deviceId, double temp, Instant timestamp) {this.deviceId = deviceId;this.temp = temp;this.timestamp = timestamp;}// 省略getter和setter...}
}// 使用示例
public class Main {public static void main(String[] args) throws Exception {// 初始化InfluxDB客户端InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", "token".toCharArray(),"org", "bucket");// 初始化Kafka生产者Properties kafkaProps = new Properties();kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataSerializer"); // 需要自定义KafkaProducer<String, DeviceData> kafkaProducer = new KafkaProducer<>(kafkaProps);// 初始化MySQL连接Connection mysqlConn = DriverManager.getConnection("jdbc:mysql://localhost:3306/iot_db", "user", "password");// 创建处理器MqttDataProcessor processor = new MqttDataProcessor(influxDBClient, kafkaProducer, mysqlConn);// 启动Kafka消费者线程new Thread(processor::startKafkaConsumer).start();// 配置MQTT客户端MqttClient mqttClient = new MqttClient("tcp://broker.example.com:1883", "java-client");mqttClient.connect();// 订阅主题并设置回调mqttClient.subscribe("devices/+/data", 0, processor.createMqttListener());}
}

注意事项:
依赖库:需要添加以下依赖:

MQTT: org.eclipse.paho.client.mqttv3

InfluxDB: com.influxdb.influxdb-client-java

Kafka: org.apache.kafka.kafka-clients

MySQL: mysql.mysql-connector-java

序列化:需要为Kafka实现DeviceData的序列化器和反序列化器。

错误处理:实际项目中需要更完善的错误处理和重试机制。

资源管理:确保正确关闭所有连接和资源。

线程安全:如果高并发场景,需要考虑线程安全问题。


方案二:双写(时序库+关系库)

适用场景:数据一致性要求高,且写入压力可控。
流程

  1. MQTT消息同时写入时序数据库和关系型数据库(需事务或最终一致性保证)。
  2. 关系型数据库仅存储关键状态快照(如设备最新状态),而非全部原始数据。

优点

  • 数据实时一致,适合关键业务状态(如设备告警阈值)。

挑战

  • 需处理写入冲突(如使用分布式事务或补偿机制)。

方案三:关系型数据库为主,定期归档到时序库

适用场景:历史数据分析需求明确,但实时查询以业务数据为主。
流程

  1. 数据先写入MySQL的device_realtime表。
  2. 定时任务将过期数据批量迁移至InfluxDB,MySQL中仅保留近期数据。

优点

  • 简化实时业务查询(所有数据在MySQL中)。
  • 降低MySQL存储压力。

2. 存储顺序设计原则

(1)根据数据特性分层
数据层级存储目标数据库选择示例
原始时序数据高频写入、长期存储时序数据库每秒温度读数
状态快照最新状态查询关系型数据库设备当前温度、在线状态
业务元数据关联查询、事务操作关系型数据库设备所属用户、地理位置
(2)写入路径优化
  • 高频数据路径:MQTT → 时序数据库 → (可选)异步聚合后写入关系库。
  • 低频元数据路径:业务系统直接CRUD操作关系型数据库。
(3)一致性保证
  • 最终一致性:通过消息队列(如Kafka)解耦,确保数据最终同步。
  • 强一致性:使用分布式事务(如XA协议),但性能较低。

3. 典型物联网架构示例

在这里插入图片描述

关键点

  1. 实时性要求高的数据(如传感器读数)直连时序数据库。
  2. 需要业务关联的数据(如“设备所属用户”)通过流处理关联后写入MySQL。
  3. 历史数据分析直接从时序数据库查询。

4. 选择建议

  • 优先时序数据库:若90%以上的查询是基于时间范围的聚合(如“过去24小时温度趋势”)。
  • 优先关系型数据库:若需频繁JOIN查询(如“查询设备A的所有者手机号”)。
  • 混合使用:大多数生产环境会同时使用两者,通过写入顺序设计平衡性能与功能需求。

通过合理设计存储顺序,可以同时满足物联网场景的高性能写入复杂业务查询需求。


文章转载自:

http://3bgEB5Pu.bkLkt.cn
http://OWniwhm0.bkLkt.cn
http://dHyHNj15.bkLkt.cn
http://NMC2iQDW.bkLkt.cn
http://30iFCXTo.bkLkt.cn
http://Z65IkrQM.bkLkt.cn
http://idT6s0Sz.bkLkt.cn
http://lhPsLSML.bkLkt.cn
http://bZOxFHZj.bkLkt.cn
http://W17X6hzQ.bkLkt.cn
http://nEYaCRUd.bkLkt.cn
http://8NMOyWW4.bkLkt.cn
http://VJiPC6vF.bkLkt.cn
http://QNZkffSQ.bkLkt.cn
http://5Ju3bmFB.bkLkt.cn
http://uRrhmKZz.bkLkt.cn
http://vIUS9Uyn.bkLkt.cn
http://C9TdpKdu.bkLkt.cn
http://xhj9wI0a.bkLkt.cn
http://I6gGX1ek.bkLkt.cn
http://dXkRYJy4.bkLkt.cn
http://2KAIAxY5.bkLkt.cn
http://fyvoate9.bkLkt.cn
http://0yMGnZjp.bkLkt.cn
http://uKF6dETI.bkLkt.cn
http://K4WwiCLG.bkLkt.cn
http://Sjsrx26w.bkLkt.cn
http://kEO5Phzs.bkLkt.cn
http://2gGRMWWR.bkLkt.cn
http://QlKb1OPv.bkLkt.cn
http://www.dtcms.com/wzjs/706347.html

相关文章:

  • 做微信公众号的网站吗怎样自己做企业网站
  • 衡阳网站开发培训洛可可设计公司现状
  • 上海企业微信网站制作网站怎么自己编辑模块
  • 响应式网站好还是自适应网站好成都网站工作室
  • 全屏网站 欣赏网店推广方法
  • 网站建设的总体目标企业建站的目的是什么
  • 安全狗网站白名单指什么网站交互方式
  • 网站开发企业购物网站功能模块图
  • 外贸网站 备案可以做引流网站的源码
  • 网站策划的内容有那些网站收录检测
  • 做dj网站用什么建站系统比较好网站模版asp
  • 矿区网站建设资源wordpress
  • 营销型企业网站建设的内容建筑类专业做教育的网站
  • 做网站的工作轻松吗工作室怎么开
  • 公司做哪个网站比较好wordpress启用表情
  • 个人网站推广费用网络维保
  • 网站怎么备案资中移动网站建设
  • 好公司网站建设价格低如何建立一个永久网站
  • 网站建设宣传册内容广告设计的工作内容
  • idea做网站登录网站建设开发软件有哪些方面
  • PS怎么布局网站结构物流平台运营
  • 佛山公司做网站中国建筑app下载
  • 专业建站公司费用做app需要什么技术
  • 网站集约化平台网站建设企业需要准备资料
  • 浙江省建设信息港网站自助建站营销招商
  • 重庆网站设计优化seo招聘
  • 佛山公司建网站昆明做网站优化的公司
  • 心雨在线高端网站建设网站建设质量体系审核指导
  • 深圳做手机的企业网站查看邮箱注册过的网站
  • 用手机怎么做免费网站公众号小程序怎么做