当前位置: 首页 > wzjs >正文

合肥网站开发外包网站建设教程给赚湖南岚鸿官 网

合肥网站开发外包,网站建设教程给赚湖南岚鸿官 网,如何建立网站平台的步骤,西安嵌入式培训分区是Dagster中的核心抽象概念,它允许我们管理大型数据集、处理增量更新并提高管道性能。本文将详细介绍如何创建和实现基于时间和类别的分区资产。 什么是分区? 分区是将数据集划分为更小、更易管理的部分的技术。在Dagster中,分区可以基于…

分区是Dagster中的核心抽象概念,它允许我们管理大型数据集、处理增量更新并提高管道性能。本文将详细介绍如何创建和实现基于时间和类别的分区资产。

在这里插入图片描述

什么是分区?

分区是将数据集划分为更小、更易管理的部分的技术。在Dagster中,分区可以基于时间、类别或其他自定义逻辑创建,从而优化数据处理流程。

创建时间分区资产

基于时间的月度分区

首先,我们将创建一个按月份分区的资产,用于计算每个销售代表的月度绩效:

monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2023-01-01")@dg.asset(partitions_def=monthly_partition,compute_kind="duckdb",group_name="analysis",deps=[joined_data]
)
def monthly_sales_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):partition_date_str = context.partition_keymonth_to_fetch = partition_date_str[:-3]  # 格式化为YYYY-MMwith duckdb.get_connection() as conn:# 创建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS monthly_sales_performance (partition_date varchar,rep_name varchar,product varchar,total_dollar_amount double);""")# 删除该月已有数据conn.execute(f"""DELETE FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""")# 插入新数据conn.execute(f"""INSERT INTO monthly_sales_performanceSELECT '{month_to_fetch}' AS partition_date, rep_name, product_name AS product, SUM(dollar_amount) AS total_dollar_amountFROM joined_dataWHERE strftime(date, '%Y-%m') = '{month_to_fetch}'GROUP BY '{month_to_fetch}', rep_name, product_name;""")# 预览数据preview_query = f"SELECT * FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

创建类别分区资产

基于产品类别的分区

接下来,我们创建一个基于预定义产品类别的静态分区资产:

product_category_partition = dg.StaticPartitionsDefinition(["Electronics", "Books", "Home and Garden", "Clothing"
])@dg.asset(deps=[joined_data],partitions_def=product_category_partition,group_name="analysis",compute_kind="duckdb"
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):product_category_str = context.partition_keywith duckdb.get_connection() as conn:# 创建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS product_performance (product_category varchar,product_name varchar,total_dollar_amount double,total_units_sold double);""")# 删除该类别已有数据conn.execute(f"""DELETE FROM product_performance WHERE product_category = '{product_category_str}';""")# 插入新数据conn.execute(f"""INSERT INTO product_performanceSELECT '{product_category_str}' AS product_category, product_name, SUM(dollar_amount) AS total_dollar_amount, SUM(quantity) AS total_units_soldFROM joined_dataWHERE category = '{product_category_str}'GROUP BY '{product_category_str}', product_name;""")# 预览数据preview_query = f"SELECT * FROM product_performance WHERE product_category = '{product_category_str}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

将分区资产添加到Definitions

完成资产定义后,需要将它们添加到Dagster的Definitions对象中:

defs = dg.Definitions(assets=[products, sales_reps, sales_data, joined_data, monthly_sales_performance, product_performance,], asset_checks=[missing_dimension_check],resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}
)

物化分区资产

在Dagster UI中操作这些分区资产的步骤:

  1. 导航到"Assets"页面
  2. 点击"Reload definitions"重新加载定义
  3. 选择"monthly_sales_performance"资产,然后点击"Materialize selected"
    • 确保选择所有分区
    • 启动回填(backfill)作业
  4. 选择"product_performance"资产,然后点击"Materialize selected"
    • 确保选择所有分区
    • 启动回填作业

下一步计划

现在我们已经建立了ETL管道的主要资产,下一步可以考虑:

  1. 添加自动化调度
  2. 实现数据质量监控
  3. 添加异常处理机制
  4. 优化查询性能
  5. 扩展更多维度的分析

通过合理使用分区技术,我们可以显著提高Dagster管道的性能和可维护性,特别是在处理大规模数据集时。

http://www.dtcms.com/wzjs/804259.html

相关文章:

  • 高端手机网站设计用.net做网站中含有论坛
  • 网站建设门店牌子一周热点新闻
  • 网站开发学什么语音哈尔滨建站模板源码
  • 网站架构企业收费标准正规挣钱最快的app
  • 土木工程毕业设计网站wordpress 4.0 安装
  • 江西九江永修网站建设景观设计公司理念
  • asp网站下用php栏目上海企业名称查询系统
  • 网站页面打不开西安做网站的在哪
  • 佛山合展商务网站建设wordpress 新闻发布
  • 网站标题怎么设置硬件开发是做什么工作
  • 怎么劝客户做网站中国建筑网官网手机版
  • 重庆免费网站推广软件定制网站设计高端网站建设
  • 企业网站规划书网络营销4c策略是什么
  • 青岛公司网站制作esc怎么做网站
  • 网站建设的创新之处成都定制网站建设地址
  • 英语培训网站模板网络建站工作室
  • 网站用什么开发软件做北海住房和城乡建设部网站
  • 阿里云可以做电商网站吗揭阳城乡建设局网站
  • 国外开源网站系统右安门网站建设
  • 重庆网站建设有佳网络网站推广要点
  • vip网站怎么做阳江市招聘最新招聘
  • 商城网站系统建设方案设计平台市场分析
  • 免费电商网站建设表白网页在线生成网站
  • wordpress学校网站模板wordpress 定时显示
  • 网站反链暴增怎么回事网页设计公司有哪些在包头的
  • 做通风工程上哪个网站发布网站建设开发ppt
  • wordpress 网站提速wordpress 滑动门效果
  • 用html5做的网站素材wordpress 模版修改
  • 网站建设外包发展情况购买的网站平台建设服务计入
  • 天津响应式网站设计企业网站有那些