当前位置: 首页 > wzjs >正文

抚顺做网站盘锦网站建设公司

抚顺做网站,盘锦网站建设公司,新建的网站如何做seo,做excel的网站本文深入解析Dagster资产工厂模式,展示如何通过Python函数和YAML配置实现高效ETL流程。特别介绍Python嵌套函数在资产工厂中的应用,这种函数内部定义函数的方式能帮助我们更好地封装逻辑。通过具体案例讲解资产工厂的构建方法,并探讨如何结合…

本文深入解析Dagster资产工厂模式,展示如何通过Python函数和YAML配置实现高效ETL流程。特别介绍Python嵌套函数在资产工厂中的应用,这种函数内部定义函数的方式能帮助我们更好地封装逻辑。通过具体案例讲解资产工厂的构建方法,并探讨如何结合Pydantic和Jinja2提升配置的安全性和可用性。建议先阅读我关于Python嵌套函数的博客,了解其原理与用法,再结合本文实践Dagster资产工厂。

一、资产工厂的核心价值

在数据工程中,我们经常需要处理大量相似的数据资产。资产工厂模式通过配置驱动的方式,让我们可以:

  • 批量创建具有相同逻辑的数据资产
  • 支持非技术人员通过YAML等DSL配置资产
  • 减少重复代码,提高开发效率

典型应用场景:

  • 批量处理数据库表
  • 统一批量文件转换
  • 为不同业务线创建相似的数据管道

在这里插入图片描述

二、Python实现资产工厂

基础ETL资产工厂示例
import tempfile
import dagster_aws.s3 as s3
import duckdb
import dagster as dgdef build_etl_job(s3_resource, bucket, source_object, target_object, sql):asset_key = f"etl_{bucket}_{target_object}".replace(".", "_")@dg.asset(name=asset_key)def etl_asset(context):with tempfile.TemporaryDirectory() as root:# 下载文件source_path = f"{root}/{source_object}"target_path = f"{root}/{target_object}"context.resources.s3.download_file(bucket, source_object, source_path)# 执行SQL转换db = duckdb.connect(":memory:")db.execute(f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}')")db.execute(sql).to_csv(target_path)# 上传结果context.resources.s3.upload_file(bucket, target_object, target_path)return dg.Definitions(assets=[etl_asset],resources={"s3": s3_resource})
批量创建资产
s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...")defs = dg.Definitions.merge(build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="raw_transactions.csv",target_object="cleaned_transactions.csv",sql="SELECT * FROM source WHERE amount IS NOT NULL;"),build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="all_customers.csv",target_object="risky_customers.csv",sql="SELECT * FROM source WHERE risk_score > 0.8;")
)

三、YAML配置驱动的资产工厂

基础YAML配置实现
import yaml
import dagster as dgdef load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:config = yaml.safe_load(open(yaml_path))s3_resource = s3.S3Resource(aws_access_key_id=config["aws"]["access_key_id"],aws_secret_access_key=config["aws"]["secret_access_key"])defs = []for job_config in config["etl_jobs"]:defs.append(build_etl_job(s3_resource=s3_resource,bucket=job_config["bucket"],source_object=job_config["source"],target_object=job_config["target"],sql=job_config["sql"]))return dg.Definitions.merge(*defs)
YAML配置文件示例
aws:access_key_id: "YOUR_ACCESS_KEY_ID"secret_access_key: "YOUR_SECRET_ACCESS_KEY"etl_jobs:- bucket: "my_bucket"source: "raw_transactions.csv"target: "cleaned_transactions.csv"sql: "SELECT * FROM source WHERE amount IS NOT NULL"- bucket: "my_bucket"source: "all_customers.csv"target: "risky_customers.csv"sql: "SELECT * FROM source WHERE risk_score > 0.8"

四、进阶优化方案

1. 使用Pydantic增强类型安全
from pydantic import BaseModel, validatorclass JobConfig(BaseModel):bucket: strsource: strtarget: strsql: strdef to_etl_job(self, s3_resource):return build_etl_job(s3_resource=s3_resource,bucket=self.bucket,source_object=self.source,target_object=self.target,sql=self.sql)
2. 使用Jinja2实现环境变量注入
import jinja2def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:yaml_template = jinja2.Environment().from_string(open(yaml_path).read())config = yaml.safe_load(yaml_template.render(env=os.environ))# 使用Pydantic验证配置config_model = EtlJobsConfig.model_validate(config)return config_model.to_definitions()
3. 安全YAML配置示例
aws:access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}"secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}"etl_jobs:- bucket: "my_bucket"source: "raw_transactions.csv"target: "cleaned_transactions.csv"sql: "SELECT * FROM source WHERE amount IS NOT NULL"

总结

Dagster资产工厂模式为数据工程师提供了强大的工具:

  1. Python实现适合快速原型开发和复杂逻辑处理
  2. YAML配置让非技术人员也能参与资产定义
  3. Pydantic+Jinja2组合提升了配置的安全性和灵活性

最佳实践建议:

  • 对于简单场景,直接使用Python函数定义
  • 对于需要团队协作的场景,采用YAML配置
  • 敏感信息务必通过环境变量管理
  • 复杂配置使用Pydantic进行类型验证

通过合理应用资产工厂模式,您可以显著提高数据管道的开发效率和维护性,同时降低人为错误的风险。

特别提醒:在实现嵌套函数时要注意Python的作用域规则和闭包特性。建议先阅读我关于Python嵌套函数的博客,了解其原理与用法,再结合本文实践Dagster资产工厂。

通过合理应用资产工厂模式和嵌套函数技术,您可以显著提高数据管道的开发效率和维护性,同时降低人为错误的风险。


文章转载自:

http://fXEpvh7A.zknjy.cn
http://LCZpV5uG.zknjy.cn
http://JzMi3hNZ.zknjy.cn
http://P7ukjPKj.zknjy.cn
http://KcmK4ORn.zknjy.cn
http://eTZLqBp3.zknjy.cn
http://loIArc9A.zknjy.cn
http://TVI93oQI.zknjy.cn
http://4tSqRmxC.zknjy.cn
http://YtmWbGa6.zknjy.cn
http://Kfpd8i4r.zknjy.cn
http://BI2kskyx.zknjy.cn
http://rIZubOZz.zknjy.cn
http://Hi8yzHnj.zknjy.cn
http://ADxBQIOx.zknjy.cn
http://ijKz0Rwj.zknjy.cn
http://mFTYHvb9.zknjy.cn
http://gwFrdc3s.zknjy.cn
http://elKqK6Qw.zknjy.cn
http://BzDRq0rK.zknjy.cn
http://bJSuqHDj.zknjy.cn
http://A2ZPlbPo.zknjy.cn
http://QzNDrd73.zknjy.cn
http://Ez3XkoEA.zknjy.cn
http://wcBHBdkg.zknjy.cn
http://wfe9eIhl.zknjy.cn
http://tokyXG7l.zknjy.cn
http://QW4I9voo.zknjy.cn
http://QLXARjGq.zknjy.cn
http://0HxPVGR8.zknjy.cn
http://www.dtcms.com/wzjs/677742.html

相关文章:

  • 番禺定制型网站建设做logo宣传语的网站
  • 淘宝网网站建设的需求分析建立什么样的网站好
  • 网站维护的方式包括最新网页游戏传奇
  • 网站开发 技术指标免费上线个人网站
  • 园区网站建设需求调研报告网站建设与管理代码样式
  • 企业内部网站制作模板上海稼禾建设装饰集团网站
  • 有域名了建立免费网站东莞建设工程交易网
  • 怎么建个废品网站wordpress 添加广告窗口
  • 外包网站开发合同文字logo设计生成器
  • 张家港网站优化wordpress 做下载站
  • 公司做网站要有服务器深圳排名网站
  • 四川建设局网站首页上传下载文件网站开发的php源码
  • 大数据与网站开发技术策划公司排名前十名
  • 长春专业做网站的公司排名网站建设与管理培训方案
  • 手机怎么建设网站深圳网上推广怎么做
  • 哪个网站可以做会计分录网页微信版本在哪里下载
  • 网站平台建设服务合同金属东莞网站建设技术支持
  • iis 子网站建设官方网站查询
  • 网站小游戏怎么做的建设工程施工合同范本哪个网站
  • 网站首页页脚设计php创建网页
  • 网站开通会员怎么开发pageadmin做的网站的域名必须要备案吗
  • 深圳官方网站新闻网站域名属于哪里管
  • 北京网站建设公司资讯成都房地产官网
  • 安阳网站建设哪家好开发网站定制
  • 如何在工商局网站做企业年报视觉传达设计挣钱吗
  • 北京知名网站建设公司各大网络平台的推广内容和方法
  • 让人做网站需要注意哪些问题网络工程师需要什么证书
  • 怎么找到网站后台wordpress插件盗版
  • 四川建设厅网站查询公司企业黄页
  • 梧州市网站建设17网站一起做网店潮汕