当前位置: 首页 > wzjs >正文

地产公司做网站维护写代码么爱站网反链查询

地产公司做网站维护写代码么,爱站网反链查询,网站时间特效,app开发网上app开发在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功…

在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。

在这里插入图片描述

修改外部代码的步骤

假设我们有一个独立的Python脚本external_code.py,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py,其中包含了一个Dagster资产和其他相关定义。

步骤1:在外部代码中引入Dagster上下文

首先,我们需要在external_code.py中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步骤2:发送日志消息到Dagster

接下来,我们可以使用context.log方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
在这里插入图片描述

步骤3:发送结构化元数据到Dagster

除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。

报告资产物化

我们可以使用context.report_asset_materialization方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查

如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check方法来报告检查的结果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
在这里插入图片描述

完整代码示例

外部代码 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代码 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

总结

通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。

http://www.dtcms.com/wzjs/48940.html

相关文章:

  • 网络营销外包公司靠谱吗上海排名优化推广工具
  • 网站主色调qq推广工具
  • 广东省建设厅投诉网站首页全网热搜关键词排行榜
  • 同一个域名可以做几个网站吗手机优化大师官方版
  • 建设在线教育网站seo专业课程
  • 网站项目策划书实例创建网站
  • wordpress做网站外贸网站建设推广
  • 浦城县规划建设旅游局网站百度推广官网
  • 强化乡镇政府网站建设和管理客服外包
  • 做网站 前途官网seo哪家公司好
  • 免费的logo设计网站公众号seo排名优化
  • 朝阳做网站公司站长之家站长工具综合查询
  • 建设企业网站作用公司网站如何建设
  • 怎样让百度收录自己的网站网络营销实施方案
  • 网站可信认证多少钱网络舆情监测
  • 网站聊天系统怎么做8大营销工具指的是哪些
  • 荆门做网站公众号的公司大连网站seo
  • 浙江建设网官网武汉seo关键词排名
  • 网站服务费做管理费用seo建站要求
  • wordpress为什么那么卡怎么优化整站
  • 网站建设的自查报告百度云在线登录
  • 设计制作数字电压表长春网站优化平台
  • 一个app下载网站今日关键词
  • 西城网站建设公司提升关键词
  • 购物网站建设要多少钱百度搜索指数查询
  • 嘉兴做网站的公司有哪些营销型企业网站建设步骤
  • seo蒙牛伊利企业网站专业性诊断网站seo优化方案设计
  • 做视频网站需要多大的带宽泉州网站建设优化
  • 公司备案证查询网站查询cps广告联盟平台
  • H5平台网站建设东莞seo建站推广费用