当前位置: 首页 > wzjs >正文

网站头部图片如何做郑州网站推广技术

网站头部图片如何做,郑州网站推广技术,怎么创作一个软件,淘宝客搜索网站怎么做本文深入解析Dagster资产工厂模式,展示如何通过Python函数和YAML配置实现高效ETL流程。特别介绍Python嵌套函数在资产工厂中的应用,这种函数内部定义函数的方式能帮助我们更好地封装逻辑。通过具体案例讲解资产工厂的构建方法,并探讨如何结合…

本文深入解析Dagster资产工厂模式,展示如何通过Python函数和YAML配置实现高效ETL流程。特别介绍Python嵌套函数在资产工厂中的应用,这种函数内部定义函数的方式能帮助我们更好地封装逻辑。通过具体案例讲解资产工厂的构建方法,并探讨如何结合Pydantic和Jinja2提升配置的安全性和可用性。建议先阅读我关于Python嵌套函数的博客,了解其原理与用法,再结合本文实践Dagster资产工厂。

一、资产工厂的核心价值

在数据工程中,我们经常需要处理大量相似的数据资产。资产工厂模式通过配置驱动的方式,让我们可以:

  • 批量创建具有相同逻辑的数据资产
  • 支持非技术人员通过YAML等DSL配置资产
  • 减少重复代码,提高开发效率

典型应用场景:

  • 批量处理数据库表
  • 统一批量文件转换
  • 为不同业务线创建相似的数据管道

在这里插入图片描述

二、Python实现资产工厂

基础ETL资产工厂示例
import tempfile
import dagster_aws.s3 as s3
import duckdb
import dagster as dgdef build_etl_job(s3_resource, bucket, source_object, target_object, sql):asset_key = f"etl_{bucket}_{target_object}".replace(".", "_")@dg.asset(name=asset_key)def etl_asset(context):with tempfile.TemporaryDirectory() as root:# 下载文件source_path = f"{root}/{source_object}"target_path = f"{root}/{target_object}"context.resources.s3.download_file(bucket, source_object, source_path)# 执行SQL转换db = duckdb.connect(":memory:")db.execute(f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}')")db.execute(sql).to_csv(target_path)# 上传结果context.resources.s3.upload_file(bucket, target_object, target_path)return dg.Definitions(assets=[etl_asset],resources={"s3": s3_resource})
批量创建资产
s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...")defs = dg.Definitions.merge(build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="raw_transactions.csv",target_object="cleaned_transactions.csv",sql="SELECT * FROM source WHERE amount IS NOT NULL;"),build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="all_customers.csv",target_object="risky_customers.csv",sql="SELECT * FROM source WHERE risk_score > 0.8;")
)

三、YAML配置驱动的资产工厂

基础YAML配置实现
import yaml
import dagster as dgdef load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:config = yaml.safe_load(open(yaml_path))s3_resource = s3.S3Resource(aws_access_key_id=config["aws"]["access_key_id"],aws_secret_access_key=config["aws"]["secret_access_key"])defs = []for job_config in config["etl_jobs"]:defs.append(build_etl_job(s3_resource=s3_resource,bucket=job_config["bucket"],source_object=job_config["source"],target_object=job_config["target"],sql=job_config["sql"]))return dg.Definitions.merge(*defs)
YAML配置文件示例
aws:access_key_id: "YOUR_ACCESS_KEY_ID"secret_access_key: "YOUR_SECRET_ACCESS_KEY"etl_jobs:- bucket: "my_bucket"source: "raw_transactions.csv"target: "cleaned_transactions.csv"sql: "SELECT * FROM source WHERE amount IS NOT NULL"- bucket: "my_bucket"source: "all_customers.csv"target: "risky_customers.csv"sql: "SELECT * FROM source WHERE risk_score > 0.8"

四、进阶优化方案

1. 使用Pydantic增强类型安全
from pydantic import BaseModel, validatorclass JobConfig(BaseModel):bucket: strsource: strtarget: strsql: strdef to_etl_job(self, s3_resource):return build_etl_job(s3_resource=s3_resource,bucket=self.bucket,source_object=self.source,target_object=self.target,sql=self.sql)
2. 使用Jinja2实现环境变量注入
import jinja2def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:yaml_template = jinja2.Environment().from_string(open(yaml_path).read())config = yaml.safe_load(yaml_template.render(env=os.environ))# 使用Pydantic验证配置config_model = EtlJobsConfig.model_validate(config)return config_model.to_definitions()
3. 安全YAML配置示例
aws:access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}"secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}"etl_jobs:- bucket: "my_bucket"source: "raw_transactions.csv"target: "cleaned_transactions.csv"sql: "SELECT * FROM source WHERE amount IS NOT NULL"

总结

Dagster资产工厂模式为数据工程师提供了强大的工具:

  1. Python实现适合快速原型开发和复杂逻辑处理
  2. YAML配置让非技术人员也能参与资产定义
  3. Pydantic+Jinja2组合提升了配置的安全性和灵活性

最佳实践建议:

  • 对于简单场景,直接使用Python函数定义
  • 对于需要团队协作的场景,采用YAML配置
  • 敏感信息务必通过环境变量管理
  • 复杂配置使用Pydantic进行类型验证

通过合理应用资产工厂模式,您可以显著提高数据管道的开发效率和维护性,同时降低人为错误的风险。

特别提醒:在实现嵌套函数时要注意Python的作用域规则和闭包特性。建议先阅读我关于Python嵌套函数的博客,了解其原理与用法,再结合本文实践Dagster资产工厂。

通过合理应用资产工厂模式和嵌套函数技术,您可以显著提高数据管道的开发效率和维护性,同时降低人为错误的风险。

http://www.dtcms.com/wzjs/424735.html

相关文章:

  • 网站建设计大型网站建设
  • 郑州直播网站建设sem专业培训公司
  • 如何做自己的网站赚钱网站批量查询工具
  • 网络营销的主要传播渠道成都公司网站seo
  • 手机建网站步骤软件关键词调词平台
  • 北京网站设计定制开发建设公司制作一个网站的费用是多少
  • 网站换服务器 备案教育培训机构招生方案
  • 自做网站2022年五月份热点事件
  • 南宁保洁网站建设百度一下电脑版首页
  • 怎样免费做网站视频讲解企业网站设计方案
  • 网站开发eq编辑器小学生简短小新闻
  • 如何建设公司网站最新热点新闻事件
  • 做网站例子图片描述武汉全网营销推广公司
  • 网站的购物车怎么做管理培训
  • 做网站 需要多少钱国际十大市场营销公司
  • 可以做视频推广的网站吗百度影响力排名顺序
  • 小说网站快速做排名百度搜索工具
  • wordpress 正在跳转中 请稍等seo每日工作
  • 甜水园网站建设谷歌关键词推广怎么做
  • 苹果钓鱼网站建设西安网站seo哪家公司好
  • 网站网页能自己做吗太原百度关键词优化
  • 怀化二医院网站免费b2b信息发布网站
  • 凡科网站模块广告公司推广平台
  • 东莞专业网如何做网站seo
  • 书画院网站建设模板北京seo邢云涛
  • 河北提供网站制作公司哪家专业网店代运营骗局流程
  • 做网站代管理三年河北seo诊断培训
  • 做那个男女的视频网站怎么制作公司网页
  • 河北做wap网站好的网站或网页
  • 网站开发项目技能比赛获奖报道镇江seo