当前位置: 首页 > news >正文

Rust + 时序数据库 TDengine:打造高性能时序数据处理利器

引言:为什么选择 TDengine 与 Rust?

TDengine 是一款专为物联网、车联网、工业互联网等时序数据场景优化设计的开源时序数据库,支持高并发写入、高效查询及流式计算,通过“一个数据采集点一张表”与“超级表”的概念显著提升性能。

Rust 作为一门系统级编程语言,近年来在数据库、嵌入式系统、分布式服务等领域迅速崛起,以其内存安全、高性能著称,与 TDengine 的高效特性天然契合,适合构建高可靠、高性能的数据处理系统。

TDengine Rust 连接器的设计与架构

TDengine Rust 连接器的主要目标是提供一个高效、安全且易于使用的接口,让开发者能够通过 Rust 语言与 TDengine 数据库进行高效的交互。连接器的设计充分考虑了 Rust 语言的优势,如内存安全、并发处理和高性能,同时确保与 TDengine 数据库之间的通信可靠且高效。

主要模块间的架构如下图:

解释:

  1. taos-query 模块作为核心,负责定义公共接口和数据结构。

  2. taos-optin 模块和 taos-ws 模块分别实现了这些公共接口,负责原生连接和 WebSocket 连接的业务逻辑实现。

  3. taos 模块封装了 taos-optin 和 taos-ws 模块中的实现,同时将 taos-query 模块所定义的公共接口和数据结构予以暴露。

  4. 最终,用户只需依赖 taos 模块,即可轻松使用整个系统,而无需了解内部复杂的实现细节。

快速入门:从安装到代码开发

1.在 Cargo.toml 中添加以下内容:

[dependencies]
anyhow = "1.0.96"
chrono = "0.4.39"
serde = "1.0.217"
taos = "0.12.3"
tokio = "1.43.0"

2.代码演示,数据写入与查询:

use chrono::{DateTime, Local};
use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Establish native connection
    let dsn = "taos://localhost:6030";
    let taos = TaosBuilder::from_dsn(dsn)?.build().await?;

    let db = "power";

    // Prepare the database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS {db}"),
        format!("CREATE DATABASE {db}"),
        format!("USE {db}"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // Create a super table
        "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
         TAGS (group_id INT, location varchar(20))",
        // Create a subtable
        "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
        // Write one record at a time
        "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
        // Write Null values
        "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
        // Automatically create table on insert
        "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
        // Write multiple records at once
        "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
    ]).await?;
    assert_eq!(inserted, 6);

    let mut result = taos.query("SELECT * FROM meters").await?;

    for field in result.fields() {
        println!("Get the field: {}", field.name());
    }
    println!();

    // Query option 1, using row stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("Get the value of {}: {}", name, value);
        }
        println!()
    }

    // Query option 2, deserialize using serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // Deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // Deserialize float to f32
        current: Option<f32>,
        // Deserialize int to i32
        voltage: Option<i32>,
        // Deserialize float to f32
        phase: Option<f32>,
        // Deserialize int to i32
        group_id: i32,
        // Deserialize varchar to String
        location: String,
    }

    let records: Vec<Record> = taos
        .query("SELECT * FROM meters")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    println!("Get records: {:?}", records);

    Ok(())
}

高级功能与最佳实践

连接池优化:提升高并发性能

频繁创建和销毁数据库连接会带来显著的开销,尤其是在高并发场景(如物联网设备高频上报数据)中。通过连接池复用连接,可减少 TCP 握手、认证等重复操作,提升整体吞吐量。

代码示例:

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create a connection pool
    let dsn = "taos://localhost:6030";
    let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
    let taos = pool.get().await?;

    let db = "power";

    // Prepare the database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS {db}"),
        format!("CREATE DATABASE {db}"),
        format!("USE {db}"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // Create a super table
        "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
         TAGS (group_id INT, location varchar(20))",
        // Create a subtable
        "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
        // Write one record at a time
        "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
        // Write Null values
        "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
        // Automatically create table on insert
        "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
        // Write multiple records at once
        "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
    ]).await?;

    assert_eq!(inserted, 6);

    Ok(())
}

原生连接与 WebSocket 连接

对于原生连接和 WebSocket 连接这两种方式,除了建立连接所使用的 DSN 不同外,其余接口调用没有差异。

代码示例:

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // let dsn = "taosws://localhost:6041"; // WebSocket connection
    let dsn = "taos://localhost:6030"; // Native connection
    let _taos = TaosBuilder::from_dsn(dsn)?.build().await?;
    Ok(())
}

同步接口和异步接口

TDengine Rust 连接器的接口分为同步接口和异步接口。通常情况下,同步接口基于异步接口实现,二者的方法签名,除了异步接口多了 async 关键字外,基本一致。

异步接口的代码示例可查阅第 3 章节,同步接口的代码示例如下:

use taos::sync::*;

fn main() -> anyhow::Result<()> {
    // Establish native connection
    let dsn = "taos://localhost:6030";
    let taos = TaosBuilder::from_dsn(dsn)?.build()?;

    let db = "power";

    // Prepare the database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS {db}"),
        format!("CREATE DATABASE {db}"),
        format!("USE {db}"),
    ])?;

    let inserted = taos.exec_many([
        // Create a super table
        "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
         TAGS (group_id INT, location varchar(20))",
        // Create a subtable
        "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
        // Write one record at a time
        "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
        // Write Null values
        "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
        // Automatically create table on insert
        "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
        // Write multiple records at once
        "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
    ])?;

    assert_eq!(inserted, 6);

    Ok(())
}

结语:开源社区的星辰大海

TDengine Rust 连接器的成长史,是一个典型的技术民主化故事——从社区萌芽到官方支持,从边缘工具到核心生态。它证明了两个事实:

  1. 开源协作的效率:在数据库领域,社区贡献者的“需求反哺”比闭门造车更能击中痛点。

  2. Rust 的生态潜力:作为系统级编程语言的 Rust,正在时序数据处理这样的垂直领域开辟新战场。

我们期待你的参与,欢迎提交 PR,一起推动 TDengine Rust 连接器的进步,携手开创更加美好的开源未来!

附录

  1. https://docs.taosdata.com/

  2. https://github.com/taosdata/taos-connector-rust

相关文章:

  • SpringBoot 第二课(Ⅰ) 整合springmvc(详解)
  • WorkManager 系列之一
  • 《AI赋能云原生区块链,引领供应链溯源革新》
  • 设备健康管理系统是什么,设备健康管理系统多少钱?
  • 开源模型应用落地-shieldgemma-2-4b-it模型小试-多模态内容安全检测(一)
  • 2610.转换二维数组
  • 蓝桥杯2023年第十四届省赛真题-阶乘的和
  • 【LangChain入门 1】安装
  • mac brew 安装的php@7.4 打开redis扩展
  • Elasticsearch8.17 集群重启操作
  • Linux 权限的概念
  • 数据库GreenDao的使用、升级、以及相关常用注释说明
  • 【Linux系统】Linux进程终止的N种方式
  • 河南电站实证,TOPCon较XBC组件发电早晚时段增益7.4%
  • Linux 一步部署DHCP服务
  • Spring WebFlux之流式输出
  • 专业学习|改进的多种群竞争粒子群优化算法
  • vue3 报错 Could not find a declaration file for module ‘/App.vue‘
  • 【免费】2000-2019年各省地方财政国内增值税数据
  • ubuntu20.04系统没有WiFi图标解决方案_安装Intel网卡驱动
  • 长江画派创始人之一、美术家鲁慕迅逝世,享年98岁
  • 上海:企业招用高校毕业生可享受1500元/人一次性扩岗补助
  • 江苏省人社厅党组书记、厅长王斌接受审查调查
  • 多人称华为手机忽现拍照模糊疑存缺陷,售后回应:主摄像头故障
  • 云南临沧一行贿案金额认定比受贿案多41万,重审时检方变更金额起诉
  • 光大华夏:近代中国私立大学遥不可及的梦想