当前位置: 首页 > news >正文

TDengine 数据写入详细用户手册

在这里插入图片描述

TDengine 数据写入用户手册

概述

TDengine 提供了多种灵活的数据写入方式,以满足不同应用场景的需求。本手册将以智能电表场景为例,向初学者详细介绍各种数据写入方法的使用。

智能电表场景设定

假设我们需要为智能电表系统建立数据库:

-- 创建数据库
CREATE DATABASE test;
USE test;-- 创建超级表
CREATE STABLE meters (ts TIMESTAMP,           -- 时间戳current FLOAT,          -- 电流值voltage FLOAT,          -- 电压值phase FLOAT            -- 相位值
) TAGS (location VARCHAR(64),   -- 地理位置groupid INT            -- 设备组ID
);-- 创建子表
CREATE TABLE d1001 USING meters TAGS ('Beijing', 1);
CREATE TABLE d1002 USING meters TAGS ('Shanghai', 1);
CREATE TABLE d1003 USING meters TAGS ('Guangzhou', 2);

数据写入方式分类

1. SQL 语句写入

1.1 单条记录写入

最基本的写入方式,适合少量数据或测试场景。

-- 向子表写入单条记录
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95);-- 向多个子表写入数据
INSERT INTO d1001 VALUES ('2024-01-15 10:01:00', 12.8, 221.0, 0.96)d1002 VALUES ('2024-01-15 10:01:00', 15.2, 219.8, 0.94);
1.2 批量写入

一次性写入多条记录,提高写入效率。

-- 向单个表批量写入
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95),('2024-01-15 10:01:00', 12.8, 221.0, 0.96),('2024-01-15 10:02:00', 13.1, 220.8, 0.97);-- 向多个表批量写入
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95),('2024-01-15 10:01:00', 12.8, 221.0, 0.96)d1002 VALUES ('2024-01-15 10:00:00', 15.2, 219.8, 0.94),('2024-01-15 10:01:00', 15.5, 220.2, 0.95);
1.3 自动建表写入

写入数据时自动创建子表,简化开发流程。

-- 自动创建子表并写入数据
INSERT INTO d1004 USING meters TAGS ('Shenzhen', 2) VALUES ('2024-01-15 10:00:00', 14.2, 218.5, 0.93);-- 批量自动建表写入
INSERT INTO d1005 USING meters TAGS ('Hangzhou', 3) VALUES ('2024-01-15 10:00:00', 11.8, 222.1, 0.98),('2024-01-15 10:01:00', 12.1, 221.8, 0.97)d1006 USING meters TAGS ('Nanjing', 3) VALUES ('2024-01-15 10:00:00', 13.5, 219.6, 0.96);

2. 无模式写入(Schemaless)

无模式写入允许在写入数据时自动创建表结构,支持多种协议格式。

2.1 InfluxDB 行协议
# 使用 taos 客户端写入
taos> INSERT INTO test.meters,location=Beijing,groupid=1i current=12.5,voltage=220.3,phase=0.95 1642204800000000000
taos> INSERT INTO test.meters,location=Shanghai,groupid=1i current=15.2,voltage=219.8,phase=0.94 1642204860000000000# 批量写入多行
taos> INSERT INTO 
test.meters,location=Beijing,groupid=1i current=12.5,voltage=220.3,phase=0.95 1642204800000000000
test.meters,location=Shanghai,groupid=1i current=15.2,voltage=219.8,phase=0.94 1642204860000000000
test.meters,location=Guangzhou,groupid=2i current=14.1,voltage=221.2,phase=0.96 1642204920000000000
2.2 OpenTSDB JSON 格式
[{"metric": "meters.current","timestamp": 1642204800,"value": 12.5,"tags": {"location": "Beijing","groupid": "1"}},{"metric": "meters.voltage", "timestamp": 1642204800,"value": 220.3,"tags": {"location": "Beijing","groupid": "1"}}
]
2.3 OpenTSDB Telnet 格式
meters.current 1642204800 12.5 location=Beijing groupid=1
meters.voltage 1642204800 220.3 location=Beijing groupid=1
meters.phase 1642204800 0.95 location=Beijing groupid=1

3. 参数化查询(Prepared Statement)

使用参数化查询可以提高写入性能,特别适合大批量数据写入。

3.1 基本用法示例(C 语言)
#include <taos.h>int main() {TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);if (taos == NULL) {printf("连接失败\n");return -1;}// 使用数据库taos_query(taos, "USE test");// 准备 SQL 语句TAOS_STMT *stmt = taos_stmt_init(taos);const char *sql = "INSERT INTO ? USING meters TAGS (?, ?) VALUES (?, ?, ?, ?)";taos_stmt_prepare(stmt, sql, 0);// 绑定表名和标签TAOS_BIND tags[2];char tbname[] = "d1007";char location[] = "Tianjin";int groupid = 1;// 设置表名taos_stmt_set_tbname(stmt, tbname);// 绑定标签tags[0].buffer_type = TSDB_DATA_TYPE_VARCHAR;tags[0].buffer = location;tags[0].buffer_length = strlen(location);tags[0].length = &tags[0].buffer_length;tags[0].is_null = NULL;tags[1].buffer_type = TSDB_DATA_TYPE_INT;tags[1].buffer = &groupid;tags[1].buffer_length = sizeof(int);tags[1].length = NULL;tags[1].is_null = NULL;taos_stmt_set_tags(stmt, tags);// 绑定数据值TAOS_BIND params[4];long long ts = 1642204800000;float current = 12.5;float voltage = 220.3;float phase = 0.95;params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;params[0].buffer = &ts;params[0].buffer_length = sizeof(ts);params[0].length = NULL;params[0].is_null = NULL;params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;params[1].buffer = &current;params[1].buffer_length = sizeof(float);params[1].length = NULL;params[1].is_null = NULL;params[2].buffer_type = TSDB_DATA_TYPE_FLOAT;params[2].buffer = &voltage;params[2].buffer_length = sizeof(float);params[2].length = NULL;params[2].is_null = NULL;params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;params[3].buffer = &phase;params[3].buffer_length = sizeof(float);params[3].length = NULL;params[3].is_null = NULL;taos_stmt_bind_param(stmt, params);taos_stmt_add_batch(stmt);// 执行写入if (taos_stmt_execute(stmt) != 0) {printf("执行失败: %s\n", taos_errstr(NULL));}// 清理资源taos_stmt_close(stmt);taos_close(taos);return 0;
}
3.2 Python 示例
import taos# 连接数据库
conn = taos.connect(host="localhost", user="root", password="taosdata", database="test")# 创建 Prepared Statement
stmt = conn.statement("INSERT INTO ? USING meters TAGS (?, ?) VALUES (?, ?, ?, ?)")# 设置表名
stmt.set_tbname("d1008")# 设置标签
stmt.set_tags([("Wuhan", taos.FIELD_TYPE.C_VARCHAR), (2, taos.FIELD_TYPE.C_INT)])# 批量绑定数据
timestamps = [1642204800000, 1642204860000, 1642204920000]
currents = [12.5, 12.8, 13.1]
voltages = [220.3, 221.0, 220.8]
phases = [0.95, 0.96, 0.97]for i in range(len(timestamps)):stmt.bind_param([(timestamps[i], taos.FIELD_TYPE.C_TIMESTAMP),(currents[i], taos.FIELD_TYPE.C_FLOAT),(voltages[i], taos.FIELD_TYPE.C_FLOAT),(phases[i], taos.FIELD_TYPE.C_FLOAT)])stmt.add_batch()# 执行写入
stmt.execute()
stmt.close()
conn.close()

4. 连接器写入

4.1 JDBC 连接器(Java)
import java.sql.*;public class TDengineExample {public static void main(String[] args) {String jdbcUrl = "jdbc:TAOS://localhost:6030/test";String user = "root";String password = "taosdata";try {Class.forName("com.taosdata.jdbc.TSDBDriver");Connection conn = DriverManager.getConnection(jdbcUrl, user, password);// 批量写入示例String sql = "INSERT INTO d1009 USING meters TAGS ('Chengdu', 3) VALUES (?, ?, ?, ?)";PreparedStatement pstmt = conn.prepareStatement(sql);// 添加多个批次long baseTime = 1642204800000L;for (int i = 0; i < 1000; i++) {pstmt.setTimestamp(1, new Timestamp(baseTime + i * 60000));pstmt.setFloat(2, 12.0f + (float)Math.random() * 5);pstmt.setFloat(3, 220.0f + (float)Math.random() * 10);pstmt.setFloat(4, 0.9f + (float)Math.random() * 0.1f);pstmt.addBatch();}// 执行批量写入int[] results = pstmt.executeBatch();System.out.println("写入记录数: " + results.length);pstmt.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}
4.2 Python 连接器
import taos
import time
import random# 连接数据库
conn = taos.connect(host="localhost", user="root", password="taosdata", database="test")
cursor = conn.cursor()# 批量写入数据
def batch_insert_data():# 生成测试数据sql = "INSERT INTO d1010 USING meters TAGS ('Chongqing', 3) VALUES "values = []base_time = int(time.time() * 1000)for i in range(1000):timestamp = base_time + i * 60000  # 每分钟一条记录current = round(12.0 + random.random() * 5, 2)voltage = round(220.0 + random.random() * 10, 2)phase = round(0.9 + random.random() * 0.1, 2)values.append(f"({timestamp}, {current}, {voltage}, {phase})")sql += ",".join(values)try:cursor.execute(sql)print(f"成功写入 {len(values)} 条记录")except Exception as e:print(f"写入失败: {e}")# 执行批量写入
batch_insert_data()cursor.close()
conn.close()
4.3 Go 连接器
package mainimport ("database/sql""fmt""math/rand""time"_ "github.com/taosdata/driver-go/v3/taosRestful"
)func main() {// 连接数据库db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/test")if err != nil {panic(err)}defer db.Close()// 批量写入数据stmt, err := db.Prepare("INSERT INTO d1011 USING meters TAGS ('Dalian', 4) VALUES (?, ?, ?, ?)")if err != nil {panic(err)}defer stmt.Close()// 开始事务tx, err := db.Begin()if err != nil {panic(err)}baseTime := time.Now().UnixMilli()for i := 0; i < 1000; i++ {timestamp := baseTime + int64(i*60000)current := 12.0 + rand.Float64()*5voltage := 220.0 + rand.Float64()*10phase := 0.9 + rand.Float64()*0.1_, err = stmt.Exec(timestamp, current, voltage, phase)if err != nil {tx.Rollback()panic(err)}}// 提交事务err = tx.Commit()if err != nil {panic(err)}fmt.Println("成功写入 1000 条记录")
}

5. RESTful API 写入

5.1 基本 REST 写入
# 单条记录写入
curl -X POST http://localhost:6041/rest/sql \-H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \-H "Content-Type: application/json" \-d '{"sql": "INSERT INTO test.d1012 USING test.meters TAGS (\"Xiamen\", 4) VALUES (1642204800000, 13.2, 218.9, 0.94)"}'# 批量写入
curl -X POST http://localhost:6041/rest/sql \-H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \-H "Content-Type: application/json" \-d '{"sql": "INSERT INTO test.d1013 USING test.meters TAGS (\"Qingdao\", 4) VALUES (1642204800000, 11.8, 222.1, 0.98), (1642204860000, 12.1, 221.8, 0.97), (1642204920000, 12.4, 221.5, 0.96)"}'
5.2 无模式 REST 写入
# InfluxDB 行协议写入
curl -X POST http://localhost:6041/influxdb/v1/write?db=test \-H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \-d 'meters,location=Harbin,groupid=5 current=14.5,voltage=219.2,phase=0.93 1642204800000000000'# OpenTSDB JSON 写入
curl -X POST http://localhost:6041/opentsdb/v1/put/json/test \-H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \-H "Content-Type: application/json" \-d '[{"metric": "meters.current","timestamp": 1642204800,"value": 13.8,"tags": {"location": "Changchun", "groupid": "5"}}]'

性能优化建议

1. 批量写入优化

-- 推荐:批量写入多条记录
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95),('2024-01-15 10:01:00', 12.8, 221.0, 0.96),('2024-01-15 10:02:00', 13.1, 220.8, 0.97);-- 不推荐:单条记录多次写入
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95);
INSERT INTO d1001 VALUES ('2024-01-15 10:01:00', 12.8, 221.0, 0.96);
INSERT INTO d1001 VALUES ('2024-01-15 10:02:00', 13.1, 220.8, 0.97);

2. 预先创建子表

-- 推荐:预先创建子表
CREATE TABLE d1001 USING meters TAGS ('Beijing', 1);
CREATE TABLE d1002 USING meters TAGS ('Shanghai', 1);
-- 然后写入数据
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95);-- 或者使用自动建表(适合动态场景)
INSERT INTO d1001 USING meters TAGS ('Beijing', 1) VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95);

3. 参数调优

-- 调整数据库参数以优化写入性能
ALTER DATABASE test WAL_LEVEL 1;          -- 设置 WAL 级别
ALTER DATABASE test WAL_FSYNC_PERIOD 3000; -- 设置 WAL 同步周期
ALTER DATABASE test BUFFER 256;            -- 设置写入缓冲区大小

错误处理和注意事项

1. 时间戳格式

-- 正确的时间戳格式
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00.000', 12.5, 220.3, 0.95);
INSERT INTO d1001 VALUES (1642204800000, 12.5, 220.3, 0.95);-- 时间戳必须是递增的
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95),('2024-01-15 10:01:00', 12.8, 221.0, 0.96),  -- 正确:递增('2024-01-15 10:02:00', 13.1, 220.8, 0.97);

2. 数据类型匹配

-- 确保数据类型与表结构匹配
-- 错误示例:字符串不能写入数值列
-- INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 'abc', 220.3, 0.95);-- 正确示例:
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', 12.5, 220.3, 0.95);

3. NULL 值处理

-- TDengine 支持 NULL 值
INSERT INTO d1001 VALUES ('2024-01-15 10:00:00', NULL, 220.3, 0.95);
INSERT INTO d1001 VALUES ('2024-01-15 10:01:00', 12.5, NULL, 0.95);-- 使用 CASE 语句处理 NULL 值
SELECT ts,CASE WHEN current IS NULL THEN 0 ELSE current END as safe_current,CASE WHEN voltage IS NULL THEN 220 ELSE voltage END as safe_voltage
FROM d1001;

4. 错误码处理

常见的写入错误码:

  • TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE: 时间戳超出范围
  • TSDB_CODE_TDB_INVALID_TABLE_ID: 无效的表名
  • TSDB_CODE_OUT_OF_MEMORY: 内存不足
  • TSDB_CODE_PAR_INVALID_COLUMN: 无效的列名

最佳实践总结

  1. 批量写入:尽量使用批量写入,减少网络往返次数
  2. 时间有序:确保写入的时间戳是有序的
  3. 预先建表:对于已知的设备,预先创建子表
  4. 参数化查询:大批量数据写入时使用 Prepared Statement
  5. 错误处理:妥善处理各种错误情况
  6. 性能监控:监控写入性能,适时调整参数
  7. 数据压缩:利用 TDengine 的自动压缩特性,无需额外处理

通过合理选择写入方式和优化配置,可以充分发挥 TDengine 在时序数据写入方面的高性能优势。

关于 TDengine

TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。


文章转载自:

http://ZZPrcb0R.bgpch.cn
http://GUR52fXT.bgpch.cn
http://8E5pL8na.bgpch.cn
http://y29UO703.bgpch.cn
http://mpPjnn1I.bgpch.cn
http://t9Y6PLzA.bgpch.cn
http://3lBIxgwL.bgpch.cn
http://ZJ3EOf7a.bgpch.cn
http://ctnGsWbN.bgpch.cn
http://0W0ZHTiX.bgpch.cn
http://T95sqEiD.bgpch.cn
http://rbuntIgI.bgpch.cn
http://CgEgfzNw.bgpch.cn
http://q41tahqK.bgpch.cn
http://nz8DfX21.bgpch.cn
http://cYbICTle.bgpch.cn
http://x3zniuiT.bgpch.cn
http://G47N81wT.bgpch.cn
http://xcIhPCFs.bgpch.cn
http://KpmX06Z6.bgpch.cn
http://ZFoPkMIE.bgpch.cn
http://sHbohMj0.bgpch.cn
http://IOnsdgf8.bgpch.cn
http://ATUYeUVM.bgpch.cn
http://DcrgGwlM.bgpch.cn
http://BEsTrxZy.bgpch.cn
http://HvhH1pPO.bgpch.cn
http://hsXrpHFQ.bgpch.cn
http://QA12aImV.bgpch.cn
http://2nIltfb9.bgpch.cn
http://www.dtcms.com/a/381529.html

相关文章:

  • 校园电动自行车管理系统的设计与实现(文末附源码)
  • HarmonyOS 应用开发深度解析:基于 ArkTS 的现代化状态管理实践
  • 【大语言模型 58】分布式文件系统:训练数据高效存储
  • [code-review] AI聊天接口 | 语言模型通信器
  • 力扣刷题笔记-删除链表的倒数第N个结点
  • 代码审计-PHP专题原生开发SQL注入1day分析构造正则搜索语句执行监控功能定位
  • dots.llm1:小红书开源的 MoE 架构大语言模型
  • --gpu-architecture <arch> (-arch)
  • uniapp动态修改tabbar
  • Spring Boot 集成 Flowable 7.1.0 完整教程
  • 教你使用服务器如何搭建数据库
  • Kafka如何配置生产者拦截器和消费者拦截器
  • uniapp:根据目的地经纬度,名称,唤起高德/百度地图来导航,兼容App,H5,小程序
  • 欧拉函数 | 定义 / 性质 / 应用
  • 【更新至2024年】1996-2024年各省农业总产值数据(无缺失)
  • 财报季观察|消费“分野”,燕之屋(1497.HK)们向上生长
  • 机械制造专属ERP:降本增效与数字转型的关键
  • 基于node.js+vue的医院陪诊系统的设计与实现(源码+论文+部署+安装)
  • 【大语言模型 59】监控与日志系统:训练过程全面监控
  • HIS架构智能化升级编程路径:从底层原理到临床实践的深度解析(下)
  • Node.js中package.json详解
  • 当AI遇上数据库:Text2Sql.Net如何让“说人话查数据“成为现实
  • 数据结构8——双向链表
  • 问卷系统自动化测试报告
  • Python 的函数柯里化(Currying)
  • 渗透测试信息收集详解
  • 【连载3】C# MVC 异常日志进阶:结构化日志与性能优化技巧
  • 冯诺依曼体系:现代计算机的基石与未来展望
  • 关于在阿里云DMS误操作后如何恢复数据的记录
  • 贪心算法应用:神经网络剪枝详解