SQLMesh增量模型实战指南:时间范围分区
引言
在数据工程领域,处理大规模数据集和高频率数据更新是一项挑战。SQLMesh作为一款强大的数据编排工具,提供了增量模型功能,帮助数据工程师高效地管理和更新数据。本文将详细介绍如何使用SQLMesh创建和管理基于时间范围的增量模型,涵盖从开发到生产的完整工作流程。
需求背景
假设你是一名数据工程师,负责处理一家直接面向客户销售软件的公司的数据。你每天需要处理数百万笔销售交易数据,并且需要将这些数据与产品使用数据进行关联,以更好地理解销售趋势和产品使用情况。
你面临以下挑战:
- 如何处理延迟到达的数据?
- 如何处理UTC和PST时间戳的转换?
- 应该在什么时间运行这些任务?
- 如何测试这些数据?
- 如何高效地运行增量更新?
- 如何处理边缘情况下的历史数据错误?
- 如何编写单元测试?
- 如何确保生产环境的数据完整性?
本文将通过一个完整的示例,展示如何使用SQLMesh解决这些问题。
开发工作流程
在SQLMesh中,典型的开发工作流程如下:
sqlmesh plan dev
: 创建一个新的开发环境sqlmesh fetchdf
: 在开发环境中预览数据sqlmesh create_external_models
: 自动生成原始源表的列级血缘文档sqlmesh plan
: 将模型从开发环境推广到生产环境sqlmesh plan dev --forward-only
: 在开发环境中进行代码更改,并仅处理新数据sqlmesh fetchdf
: 在开发环境中预览更改后的数据sqlmesh create_test
: 自动生成单元测试sqlmesh test
: 运行单元测试sqlmesh plan
: 将更改推广到生产环境
环境设置
我们将从一个现有的SQLMesh项目开始,该项目已经包含一些生产模型。假设我们已经有以下原始数据表:
原始产品使用数据
product_id | customer_id | last_usage_date | usage_count | feature_utilization_score | user_segment |
---|---|---|---|---|---|
PROD-101 | CUST-001 | 2024-10-25 23:45:00+00 | 120 | 0.85 | enterprise |
PROD-103 | CUST-001 | 2024-10-27 12:30:00+00 | 95 | 0.75 | enterprise |
… | … | … | … | … | … |
原始销售数据
transaction_id | product_id | customer_id | transaction_amount | transaction_timestamp | payment_method | currency |
---|---|---|---|---|---|---|
TX-001 | PROD-101 | CUST-001 | 99.99 | 2024-10-25 08:30:00+00 | credit_card | USD |
TX-002 | PROD-102 | CUST-002 | 149.99 | 2024-10-25 09:45:00+00 | paypal | USD |
… | … | … | … | … | … | … |
模型配置
我们将创建一个增量模型demo.incrementals_demo
,该模型按天分区,并处理销售数据和产品使用数据的关联。
MODEL(name="demo.incrementals_demo",kind=INCREMENTAL_BY_TIME_RANGE(time_column="transaction_date",lookback=2, # 处理过去2天的延迟数据),start="2024-10-25", # 不回填此日期之前的数据cron="@daily", # 每天午夜UTC运行grain="transaction_id", # 主键audits=[UNIQUE_VALUES(columns=("transaction_id",)),NOT_NULL(columns=("transaction_id",)),]
)WITH sales_data AS (SELECTtransaction_id,product_id,customer_id,transaction_amount,transaction_timestamp,payment_method,currencyFROM sqlmesh-public-demo.tcloud_raw_data.salesWHERE transaction_timestamp BETWEEN @start_dt AND @end_dt
),product_usage AS (SELECTproduct_id,customer_id,last_usage_date,usage_count,feature_utilization_score,user_segmentFROM sqlmesh-public-demo.tcloud_raw_data.product_usageWHERE last_usage_date BETWEEN DATE_SUB(@start_dt, INTERVAL 30 DAY) AND @end_dt
)SELECTs.transaction_id,s.product_id,s.customer_id,s.transaction_amount,DATE(s.transaction_timestamp) as transaction_date,DATETIME(s.transaction_timestamp, 'America/Los_Angeles') as transaction_timestamp_pst,s.payment_method,s.currency,p.last_usage_date,p.usage_count,p.feature_utilization_score,p.user_segment,CASEWHEN p.usage_count > 100 AND p.feature_utilization_score > 0.8 THEN 'Power User'WHEN p.usage_count > 50 THEN 'Regular User'WHEN p.usage_count IS NULL THEN 'New User'ELSE 'Light User'END as user_type,DATE_DIFF(s.transaction_timestamp, p.last_usage_date, DAY) as days_since_last_usage
FROM sales_data s
LEFT JOIN product_usage pON s.product_id = p.product_idAND s.customer_id = p.customer_id
创建模型
首次创建模型时,我们需要将其添加到开发环境中:
sqlmesh plan dev
按照提示输入回填的起始和结束日期,SQLMesh将自动创建物理表并执行初始数据加载。
跟踪列级血缘
SQLMesh可以自动生成外部模型文档,记录原始表的列信息和数据类型:
sqlmesh create_external_models
通过SQLMesh UI,可以直观地查看列级血缘关系。
进行更改
假设我们需要调整“Power User”的定义,将阈值从100次使用调整为50次使用。我们可以使用--forward-only
标志,仅对新数据应用更改:
sqlmesh plan dev --forward-only
SQLMesh会生成一个预览表,允许我们在开发环境中测试更改,而不会影响历史数据。
添加单元测试
使用sqlmesh create_test
命令可以自动生成单元测试配置文件:
sqlmesh create_test demo.incrementals_demo \--query sqlmesh-public-demo.tcloud_raw_data.product_usage "select * from sqlmesh-public-demo.tcloud_raw_data.product_usage where customer_id='CUST-001'" \--query sqlmesh-public-demo.tcloud_raw_data.sales "select * from sqlmesh-public-demo.tcloud_raw_data.sales where customer_id='CUST-001'" \--var start_dt '2024-10-25' \--var end_dt '2024-10-27'
运行单元测试:
sqlmesh test
推广到生产环境
确认开发环境中的更改无误后,可以将其推广到生产环境:
sqlmesh plan
SQLMesh会自动处理模式演进和数据回填,确保生产环境的数据完整性。
总结
通过本文的示例,我们展示了如何使用SQLMesh创建和管理基于时间范围的增量模型。SQLMesh的优势在于:
- 自动处理数据分区,提高查询效率
- 支持增量更新,减少资源消耗
- 提供强大的测试和验证工具,确保数据质量
- 简化开发到生产的流程,减少人为错误
希望这篇指南能帮助你更好地理解和使用SQLMesh,提升数据工程的效率和准确性。
注意:本文基于SQLMesh官方文档和示例编写,实际操作中请参考最新版本的SQLMesh文档。