当前位置: 首页 > wzjs >正文

做的烂的网站句容市网站seo优化排名

做的烂的网站,句容市网站seo优化排名,做外贸网站卖什么好,学院网站建设进度情况说明在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功…

在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。

在这里插入图片描述

修改外部代码的步骤

假设我们有一个独立的Python脚本external_code.py,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py,其中包含了一个Dagster资产和其他相关定义。

步骤1:在外部代码中引入Dagster上下文

首先,我们需要在external_code.py中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步骤2:发送日志消息到Dagster

接下来,我们可以使用context.log方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
在这里插入图片描述

步骤3:发送结构化元数据到Dagster

除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。

报告资产物化

我们可以使用context.report_asset_materialization方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查

如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check方法来报告检查的结果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
在这里插入图片描述

完整代码示例

外部代码 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代码 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

总结

通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。


文章转载自:

http://iSCVoJC8.gkjnz.cn
http://63Rd7OiY.gkjnz.cn
http://H4bCutxE.gkjnz.cn
http://kqjjJb4T.gkjnz.cn
http://8mLMWmBM.gkjnz.cn
http://v6hG56kq.gkjnz.cn
http://dCgFWob7.gkjnz.cn
http://6tQdXFRC.gkjnz.cn
http://iiy7fkLH.gkjnz.cn
http://wjKteLlY.gkjnz.cn
http://mHYTxXMW.gkjnz.cn
http://rycusB1W.gkjnz.cn
http://6q6VOdkZ.gkjnz.cn
http://4oJmt2Ck.gkjnz.cn
http://dp17vNJN.gkjnz.cn
http://nJG3KDiE.gkjnz.cn
http://A62h5xRq.gkjnz.cn
http://3OuCE5Da.gkjnz.cn
http://0K34nlN0.gkjnz.cn
http://lUT2bLIE.gkjnz.cn
http://yWzUO0HR.gkjnz.cn
http://OyyxnOkp.gkjnz.cn
http://11ZSDNRu.gkjnz.cn
http://qM3pCu3k.gkjnz.cn
http://iJxqZdKA.gkjnz.cn
http://R8e7Kq8G.gkjnz.cn
http://4590q1nN.gkjnz.cn
http://fuH0bJv1.gkjnz.cn
http://IUrbDx2Z.gkjnz.cn
http://lxXLuFzu.gkjnz.cn
http://www.dtcms.com/wzjs/719076.html

相关文章:

  • 弹性盒子做自适应网站ftp如何上传网站
  • 优惠券推广网站怎么做株洲网站设计外包首选
  • 免费 网站 如何做网站开发服务费计入哪项费用
  • 汕头市营商环境建设监督局网站wordpress更新 ftp
  • 云存储做网站有免费的wordpress
  • 网站开发 参考文献网站选项卡
  • 站长统计黄页网站下载大全购物商城模板
  • 石狮网站建设联系电话靖江市建设行业协会网站
  • 中山市企业网站建设网站建设需要经历什么步骤
  • 微信分销网站建设价格舅舅建筑网
  • 网站外包制作网站开发是什么职位
  • 网站主题切换网站建设与管理和电子商务哪个好
  • 交友深圳网站建设做一个网站的流程
  • 网站模板使用wordpress 页面伪静态页面
  • 制作网站怎么用图片做背景电子商务的就业方向是什么
  • 厦门网站建设培训机构响应式网站排名如何
  • 那里做直播网站中小企业网络组网案例
  • 关于网站建设的英文歌什么是网络营销?
  • 怎么提高网站权重机械东莞网站建设0769
  • 设计素材网站破解网站字体颜色大小
  • 给公司做网站软件广州网站建设公司怎么选
  • p2p网站建设教程陇城科技网站建设
  • 推广网站的方法有搜索引擎wordpress列表页添加页码
  • 电子产品玩具东莞网站建设钢铁网站建设
  • 做自适应网站点击软件
  • 中国建设银行昆山支行网站长春招聘网智联
  • 怎样做后端数据传输前端的网站常德seo招聘
  • 淘宝客网站建设分类商标设计一般多少钱
  • 石家庄网站推广专家长沙免费旅游景点大全
  • php网站建设案例教程行政单位建设网站方案