Dagster Pipes系列-2:增强外部脚本与Dagster的交互能力
在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。
什么是Dagster Pipes?
Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。
修改外部代码的步骤
假设我们有一个独立的Python脚本external_code.py
,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py
,其中包含了一个Dagster资产和其他相关定义。
步骤1:在外部代码中引入Dagster上下文
首先,我们需要在external_code.py
中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()
函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。
from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")
步骤2:发送日志消息到Dagster
接下来,我们可以使用context.log
方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")
在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
步骤3:发送结构化元数据到Dagster
除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。
报告资产物化
我们可以使用context.report_asset_materialization
方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查
如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check
方法来报告检查的结果。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)
在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
完整代码示例
外部代码 external_code.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)
Dagster代码 dagster_code.py
import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
总结
通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。