当前位置: 首页 > news >正文

SQLMesh增量模型实战指南:时间范围分区

引言

在数据工程领域,处理大规模数据集和高频率数据更新是一项挑战。SQLMesh作为一款强大的数据编排工具,提供了增量模型功能,帮助数据工程师高效地管理和更新数据。本文将详细介绍如何使用SQLMesh创建和管理基于时间范围的增量模型,涵盖从开发到生产的完整工作流程。
在这里插入图片描述

需求背景

假设你是一名数据工程师,负责处理一家直接面向客户销售软件的公司的数据。你每天需要处理数百万笔销售交易数据,并且需要将这些数据与产品使用数据进行关联,以更好地理解销售趋势和产品使用情况。

你面临以下挑战:

  • 如何处理延迟到达的数据?
  • 如何处理UTC和PST时间戳的转换?
  • 应该在什么时间运行这些任务?
  • 如何测试这些数据?
  • 如何高效地运行增量更新?
  • 如何处理边缘情况下的历史数据错误?
  • 如何编写单元测试?
  • 如何确保生产环境的数据完整性?

本文将通过一个完整的示例,展示如何使用SQLMesh解决这些问题。

开发工作流程

在SQLMesh中,典型的开发工作流程如下:

  1. sqlmesh plan dev: 创建一个新的开发环境
  2. sqlmesh fetchdf: 在开发环境中预览数据
  3. sqlmesh create_external_models: 自动生成原始源表的列级血缘文档
  4. sqlmesh plan: 将模型从开发环境推广到生产环境
  5. sqlmesh plan dev --forward-only: 在开发环境中进行代码更改,并仅处理新数据
  6. sqlmesh fetchdf: 在开发环境中预览更改后的数据
  7. sqlmesh create_test: 自动生成单元测试
  8. sqlmesh test: 运行单元测试
  9. sqlmesh plan: 将更改推广到生产环境

环境设置

我们将从一个现有的SQLMesh项目开始,该项目已经包含一些生产模型。假设我们已经有以下原始数据表:

原始产品使用数据

product_idcustomer_idlast_usage_dateusage_countfeature_utilization_scoreuser_segment
PROD-101CUST-0012024-10-25 23:45:00+001200.85enterprise
PROD-103CUST-0012024-10-27 12:30:00+00950.75enterprise

原始销售数据

transaction_idproduct_idcustomer_idtransaction_amounttransaction_timestamppayment_methodcurrency
TX-001PROD-101CUST-00199.992024-10-25 08:30:00+00credit_cardUSD
TX-002PROD-102CUST-002149.992024-10-25 09:45:00+00paypalUSD

模型配置

我们将创建一个增量模型demo.incrementals_demo,该模型按天分区,并处理销售数据和产品使用数据的关联。

MODEL(name="demo.incrementals_demo",kind=INCREMENTAL_BY_TIME_RANGE(time_column="transaction_date",lookback=2,  # 处理过去2天的延迟数据),start="2024-10-25",  # 不回填此日期之前的数据cron="@daily",  # 每天午夜UTC运行grain="transaction_id",  # 主键audits=[UNIQUE_VALUES(columns=("transaction_id",)),NOT_NULL(columns=("transaction_id",)),]
)WITH sales_data AS (SELECTtransaction_id,product_id,customer_id,transaction_amount,transaction_timestamp,payment_method,currencyFROM sqlmesh-public-demo.tcloud_raw_data.salesWHERE transaction_timestamp BETWEEN @start_dt AND @end_dt
),product_usage AS (SELECTproduct_id,customer_id,last_usage_date,usage_count,feature_utilization_score,user_segmentFROM sqlmesh-public-demo.tcloud_raw_data.product_usageWHERE last_usage_date BETWEEN DATE_SUB(@start_dt, INTERVAL 30 DAY) AND @end_dt
)SELECTs.transaction_id,s.product_id,s.customer_id,s.transaction_amount,DATE(s.transaction_timestamp) as transaction_date,DATETIME(s.transaction_timestamp, 'America/Los_Angeles') as transaction_timestamp_pst,s.payment_method,s.currency,p.last_usage_date,p.usage_count,p.feature_utilization_score,p.user_segment,CASEWHEN p.usage_count > 100 AND p.feature_utilization_score > 0.8 THEN 'Power User'WHEN p.usage_count > 50 THEN 'Regular User'WHEN p.usage_count IS NULL THEN 'New User'ELSE 'Light User'END as user_type,DATE_DIFF(s.transaction_timestamp, p.last_usage_date, DAY) as days_since_last_usage
FROM sales_data s
LEFT JOIN product_usage pON s.product_id = p.product_idAND s.customer_id = p.customer_id

创建模型

首次创建模型时,我们需要将其添加到开发环境中:

sqlmesh plan dev

按照提示输入回填的起始和结束日期,SQLMesh将自动创建物理表并执行初始数据加载。

跟踪列级血缘

SQLMesh可以自动生成外部模型文档,记录原始表的列信息和数据类型:

sqlmesh create_external_models

通过SQLMesh UI,可以直观地查看列级血缘关系。

进行更改

假设我们需要调整“Power User”的定义,将阈值从100次使用调整为50次使用。我们可以使用--forward-only标志,仅对新数据应用更改:

sqlmesh plan dev --forward-only

SQLMesh会生成一个预览表,允许我们在开发环境中测试更改,而不会影响历史数据。

添加单元测试

使用sqlmesh create_test命令可以自动生成单元测试配置文件:

sqlmesh create_test demo.incrementals_demo \--query sqlmesh-public-demo.tcloud_raw_data.product_usage "select * from sqlmesh-public-demo.tcloud_raw_data.product_usage where customer_id='CUST-001'" \--query sqlmesh-public-demo.tcloud_raw_data.sales "select * from sqlmesh-public-demo.tcloud_raw_data.sales where customer_id='CUST-001'" \--var start_dt '2024-10-25' \--var end_dt '2024-10-27'

运行单元测试:

sqlmesh test

推广到生产环境

确认开发环境中的更改无误后,可以将其推广到生产环境:

sqlmesh plan

SQLMesh会自动处理模式演进和数据回填,确保生产环境的数据完整性。

总结

通过本文的示例,我们展示了如何使用SQLMesh创建和管理基于时间范围的增量模型。SQLMesh的优势在于:

  • 自动处理数据分区,提高查询效率
  • 支持增量更新,减少资源消耗
  • 提供强大的测试和验证工具,确保数据质量
  • 简化开发到生产的流程,减少人为错误

希望这篇指南能帮助你更好地理解和使用SQLMesh,提升数据工程的效率和准确性。

注意:本文基于SQLMesh官方文档和示例编写,实际操作中请参考最新版本的SQLMesh文档。

相关文章:

  • nginx 核心功能
  • 鸟笼效应——AI与思维模型【84】
  • 组件轮播与样式结构重用实验
  • Android开发——实现一个计算器
  • 利用3DMAX + Corona Renderer打造现代住宅逼真效果!
  • DotNet 入门:(一) 环境安装
  • MarkItDown:如何高效将各类文档转换为适合 LLM 处理的 Markdown 格式
  • 进程优先级以及切换调度
  • 得物 小程序 6宫格 分析
  • C++/SDL 进阶游戏开发 —— 双人塔防(代号:村庄保卫战 16)
  • opencv 直方图均衡化
  • AimRT 从零到一:官方示例精讲 —— 三、Executor示例.md
  • 【AI News | 20250429】每日AI进展
  • OpenCV 图形API(71)图像与通道拼接函数-----从图像(GMat)中裁剪出一个矩形区域的操作函数 crop()
  • gitee 如何修改提交代码的邮箱
  • 训练神经网络的批量标准化(使用 PyTorch)
  • 内核常见问题汇总
  • 计算机基础:二进制基础14,二进制加法
  • 某建筑石料用灰岩矿自动化监测
  • 海思vio模块学习
  • 解放日报:这是一场需要定力和实力的“科技长征”
  • 腾讯重构混元大模型研发体系:成立大语言和多模态模型部,提升AI长期技术作战能力
  • 郭继孚被撤销全国政协委员资格,此前为北京交通发展研究院长
  • 国务院安委会对辽宁辽阳一饭店重大火灾事故查处挂牌督办
  • 湖南华容县通报“大垱湖水质受污染”,爆料者:现场已在灌清水
  • 新质观察|重塑低空经济的系统安全观