当前位置: 首页 > wzjs >正文

地产公司做网站维护写代码么长春网站建设制作

地产公司做网站维护写代码么,长春网站建设制作,购物网页版,网站建设 国外在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功…

在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。

在这里插入图片描述

修改外部代码的步骤

假设我们有一个独立的Python脚本external_code.py,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py,其中包含了一个Dagster资产和其他相关定义。

步骤1:在外部代码中引入Dagster上下文

首先,我们需要在external_code.py中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步骤2:发送日志消息到Dagster

接下来,我们可以使用context.log方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
在这里插入图片描述

步骤3:发送结构化元数据到Dagster

除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。

报告资产物化

我们可以使用context.report_asset_materialization方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查

如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check方法来报告检查的结果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
在这里插入图片描述

完整代码示例

外部代码 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代码 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

总结

通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。

http://www.dtcms.com/wzjs/235978.html

相关文章:

  • 无锡公司网站制作网站的优化
  • 山东德州网站建设哪家便宜流量宝
  • 广州专业建网站百度旗下所有app列表
  • asp技术做网站新手如何涨1000粉
  • 怎样做自己的vip解析网站长沙做网站的公司有哪些
  • 长沙做营销型网站公司seo对网络推广的作用是什么?
  • 中企动力做的网站后台怎么登录速推网
  • 成都哪家做网站好app推广员怎么做
  • 做视频网站资金多少网络营销收获与体会
  • 蚌埠网站建设公司cztv网络营销成功的品牌
  • 网站开发php js国产十大erp软件
  • 华为手表网站如何提高搜索引擎优化
  • 武汉做网站的有哪些免费发广告的平台有哪些
  • 在360网站上怎么做推广海淀区seo搜索引擎优化企业
  • 大连响应式网站建设全球搜索引擎入口
  • 网站怎么换空间商上海培训机构
  • 运城做网站哪家好竞价推广账户托管费用
  • b2b网站推广快速排名点击工具
  • 微信手机网站app制作怎么让百度搜出自己
  • 营销4pseo如何快速排名百度首页
  • 知名网站制作公网络推广是做什么工作的
  • 随州网站建设外包公司视频号怎么推广流量
  • 想学习做网站青岛seo建站
  • asp.net mvc网站开发之美云南seo
  • 合肥做网站的公司讯登优化网站排名公司
  • 做问卷的网站生成二维码免费建站哪个比较好
  • 网站建设与网站制作哪些网站有友情链接
  • 网站注册搜索引擎的目的搜索引擎推广方案
  • 做网站的标性英文seo实战派
  • 用易语言做攻击网站软件下载百度手机怎么刷排名多少钱