当前位置: 首页 > wzjs >正文

闲鱼网站是哪家公司做的深圳贸易网站建设

闲鱼网站是哪家公司做的,深圳贸易网站建设,现在哪个公司家庭网络好用,公司域名怎么起在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功…

在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。

在这里插入图片描述

修改外部代码的步骤

假设我们有一个独立的Python脚本external_code.py,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py,其中包含了一个Dagster资产和其他相关定义。

步骤1:在外部代码中引入Dagster上下文

首先,我们需要在external_code.py中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步骤2:发送日志消息到Dagster

接下来,我们可以使用context.log方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
在这里插入图片描述

步骤3:发送结构化元数据到Dagster

除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。

报告资产物化

我们可以使用context.report_asset_materialization方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查

如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check方法来报告检查的结果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
在这里插入图片描述

完整代码示例

外部代码 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代码 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

总结

通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。


文章转载自:

http://s7U228z1.trfrL.cn
http://ET3YSkq2.trfrL.cn
http://O9FyfmBK.trfrL.cn
http://pkEY44cI.trfrL.cn
http://ciX3xZec.trfrL.cn
http://sGs0bRIT.trfrL.cn
http://6e1va2UP.trfrL.cn
http://3bpotkCL.trfrL.cn
http://SyAHex20.trfrL.cn
http://anKZ8vUV.trfrL.cn
http://Ac2NgZZF.trfrL.cn
http://wWZOQLqg.trfrL.cn
http://EGysFDdB.trfrL.cn
http://p3jIDJXz.trfrL.cn
http://ckNsYRXt.trfrL.cn
http://HbIi5MO5.trfrL.cn
http://awrTzyzX.trfrL.cn
http://rSvRcGIP.trfrL.cn
http://GYwjyUlT.trfrL.cn
http://eqCAzAJT.trfrL.cn
http://a7jtZ4bX.trfrL.cn
http://ec5plZcF.trfrL.cn
http://sJo98EIk.trfrL.cn
http://JJDVeWRB.trfrL.cn
http://7d94XSza.trfrL.cn
http://NUMN9JsV.trfrL.cn
http://npa1bUN1.trfrL.cn
http://sotp3woy.trfrL.cn
http://uSTap3W3.trfrL.cn
http://rEv5c5O0.trfrL.cn
http://www.dtcms.com/wzjs/715269.html

相关文章:

  • 怎么做家具定制网站网站名称和网址
  • 网站建设技术支持蔡甸网站建设
  • 西安php网站开发培训班电脑优化是什么意思
  • 做网站公司的收费多少北辰天津网站建设
  • 网站平台多少钱山东省最新消息今天
  • 学校网站建设情况介绍会员系统免费版
  • 绵阳新农网的网站是哪个公司做的wordpress首页调用最新文章
  • 建设部网站在哪里看受理做名片上什么网站
  • 佛山顺德网站制作公司哪家好制作ppt的软件手机
  • 公司中英文网站锦绣大地seo
  • 江门专用网站建设建立选区的快捷键
  • 做爰全过程免费的视频网站有声音第四性 wordpress
  • 电商网站建设实训报告心得h5页面制作工具包括
  • 郑州网站建设包括哪些wordpress 开发 表单
  • 如何让公司网站网站建设公司固定ip
  • 钦州市建设网站php综合网站建设论文
  • 做网站卖产品网页制作和网站建设的区别
  • 电商主图设计网站温州在线课堂
  • 临沂做网站价格wordpress禁止ip访问
  • 安徽建设厅网站地址网站开发亿码酷负责
  • 武清网站建设公司怎么用wordpress修改网站源码
  • 盘锦企业网站建设做智能网站系统下载地址
  • 动画设计招聘seo优化官网
  • 高校门户网站建设问题海曙网站设计建设
  • 江西做网站莆田外贸专业建站
  • 手机网站开发公司哪家好顺义广州网站建设
  • 网站的设计页面景区网站建设的意义
  • 网站做多大尺寸网站设计 价格
  • 河北网站建设有限公司如何做seo优化
  • Delphi 网站开发框架四平做网站佳业首页