当前位置: 首页 > news >正文

告别定时任务!用Dagster监听器实现秒级数据响应自动化

在数据管道开发中,我们经常面临需要根据外部事件触发计算任务的场景。传统基于时间的调度方式存在资源浪费和时效性不足的问题。本文将通过Dagster的**传感器(Sensor)**功能,演示如何构建事件驱动的数据处理流程。

场景模拟:动态销售报表生成系统

假设业务部门需要实时获取特定产品在指定时间段的销售分析报表。传统方案需要人工手动触发任务,而我们希望通过以下方式实现自动化:

  • 当新的销售请求文件到达时自动触发计算
  • 根据请求参数动态生成报表
  • 仅在检测到有效请求时运行作业

在这里插入图片描述

实现步骤详解

1. 定义事件驱动型资产

首先创建一个接收动态参数的资产,该资产将根据请求参数查询数据仓库生成报表:

from dagster import asset, MaterializeResult, Config
import duckdbclass AdhocRequestConfig(Config):"""请求参数配置"""department: strproduct: strstart_date: strend_date: str@asset(deps=["joined_data"], compute_kind="Python")
def adhoc_request(config: AdhocRequestConfig,duckdb: duckdb.DuckDBResource
) -> MaterializeResult:"""动态销售报表生成"""query = f"""SELECT department, rep_name, product_name, SUM(dollar_amount) AS total_salesFROM joined_dataWHERE date >= '{config.start_date}' AND date < '{config.end_date}' ANDdepartment = '{config.department}' ANDproduct_name = '{config.product}'GROUP BY department, rep_name, product_name"""with duckdb.get_connection() as conn:preview_df = conn.execute(query).fetchdf()return MaterializeResult(metadata={"preview": MaterializeResult.MetadataValue.md(preview_df.to_markdown(index=False))})

2. 构建事件监听传感器

使用@sensor装饰器创建传感器,持续监控指定目录下的请求文件:

import os
import json
from dagster import sensor, SensorEvaluationContext, RunRequest@sensor(job=adhoc_request_job)
def adhoc_request_sensor(context: SensorEvaluationContext):"""请求文件监听传感器"""requests_dir = os.path.join(os.path.dirname(__file__), "../data/requests")current_state = {}for filename in os.listdir(requests_dir):if filename.endswith(".json"):file_path = os.path.join(requests_dir, filename)file_mtime = os.path.getmtime(file_path)# 检测新文件或修改过的文件if filename not in current_state or current_state[filename] != file_mtime:with open(file_path) as f:request_config = json.load(f)# 生成唯一运行标识run_key = f"adhoc_request_{filename}_{file_mtime}"yield RunRequest(run_key=run_key,run_config={"ops": {"adhoc_request": {"config": request_config}}})current_state[filename] = file_mtime

3. 部署与测试

更新Dagster定义文件并启动服务:

from dagster import Definitions, AssetGroupdefs = Definitions(assets=[adhoc_request],sensors=[adhoc_request_sensor],resources={"duckdb": duckdb.DuckDBResource(database="data/mydb.duckdb")}
)

操作流程:

  1. 将请求文件放入data/requests目录
  2. 在Dagster UI中启用传感器
  3. 观察自动化触发记录
  4. 查看生成的Markdown格式报表预览

在这里插入图片描述

核心优势

  1. 精准触发:仅在检测到有效事件时运行,避免空跑
  2. 动态配置:通过JSON文件传递参数,支持复杂查询条件
  3. 审计追踪:自动记录每次触发的配置和结果元数据
  4. 幂等性保障:通过run_key防止重复执行

扩展建议

  • 添加文件格式验证(如JSON Schema)
  • 实现请求去重机制
  • 集成Slack通知功能
  • 增加请求优先级队列

通过这种架构,我们可以轻松将传统批处理流程升级为实时事件驱动系统,显著提升数据分析的响应速度和资源利用率。传感器机制使得Dagster在复杂ETL场景中展现出独特的灵活性和扩展能力。

相关文章:

  • [ComfyUI]重磅升级,FLUX.1-dev-ControlNet-Union-Pro-2.0发布,更好用了
  • Java对接Dify API接口完整指南
  • 吴恩达深度学习复盘(19)XGBoost简介|神经网络与决策树
  • openai发布今天发布了o3和o4-mini。
  • Selenium 实现自动化分页处理与信息提取
  • 【JavaEE】Maven配置
  • (leetcode算法题)309. 买卖股票的最佳时机含冷冻期
  • 【音视频】音视频FLV合成实战
  • 界面开发框架DevExpress XAF实践:如何在Blazor项目中集成.NET Aspire?(一)
  • 拖拉拽效果加点击事件
  • 智慧交通内容及发展趋势概述
  • 第五章 SQLite数据库:6、SQLite 常用语法1
  • 【数据结构】AVL树
  • 主数据管理:企业数字化转型的 “数据基石“ 如何为 AI 筑基?
  • Google Mock(GMock):C++单元测试的高效模拟框架详解
  • D4707同步整流器:提升Flyback转换器效率的关键元件
  • 本地Ubuntu轻松部署高效性能监控平台SigNoz与远程使用教程
  • Django 实现物联网管理系统的详细方案
  • Unity3D 测试驱动开发(TDD)框架设计
  • Immich图库本地部署与远程管理:打造你的专属照片云服务
  • 特朗普与普京就俄乌问题通话
  • 交通运输局男子与两名女子办婚礼?官方通报:未登记结婚,开除该男子
  • 国家统计局:4月全国规模以上工业增加值同比增长6.1%
  • “复旦源”一源六馆焕新启幕,设立文化发展基金首期1亿元
  • 广西桂林、百色、河池等地表态:全力配合中央对蓝天立的审查调查
  • 世界高血压日|专家:高血压患者控制血压同时应注重心率管理