当前位置: 首页 > news >正文

【Azure 架构师学习笔记】- Azure Databricks (22) --Autoloader

本文属于【Azure 架构师学习笔记】系列。
本文属于【Azure Databricks】系列。
接上文 【Azure 架构师学习笔记】- Azure Databricks (21) --费用相关

前言

Databricks家里在Apache Spark之上,是企业级的应对大规模数据处理的通用平台, 可以运行在AWS, Azure和GCP 之上。
作为数据处理平台, ETL 必不可少,虽然在特定平台比如Azure上可以通过如ADF来实现数据抽取,但是这样对于云平台间迁移并没有什么帮助。
Databricks自带了一个Autoloader功能,本文将介绍一下这个工具。

ADB上的Autoloader

Autoloader是Databricks中的一个“机制”,用于从data lake中获取数据。它的强处在于不需要配置一些列的触发器来处理data lake中的新数据,而是如其名“auto”地把新文件推到流处理作业中。
Autoloader有几个优势:

  1. 简化过程:简化了ADB 从各种数据源抽取数据到Delta Table的过程。它自动检测特定目录中的新文件
  2. 时效性高:接近实时地,高效地加载到表中。
  3. 可以处理大数据量:同时它还能应对大体量的数据,不像某些数据集成工具,只适合短时、少量的数据处理(因为通常这些工具目标是处理逻辑而不是数据量)。
  4. 易用:不需要写复杂的代码来实现文件发现和数据加载。还能快速适应数据结构的变化。
  5. 支持数据源多:无缝对接Azure生态圈中的数据源, 如Event Hubs,Azure Blob Storage。
  6. 支持预处理:这个大部分的ETL 工具都支持,在这里只是说明它也支持。
  7. 可靠性和一致性:通过事务控制数据的增删改操作。
  8. 支持多种数据格式:包括JSON, CSV, PARQUET, AVRO,ORC,TEXT, BINARY等文件,也广泛支持3种云平台的存储服务。

组件

  • Cloud Files:在Databricks中提供大数据集的分布式文件存储,Autoloader通过CloudFiles管理和存储数据文件的进入。
  • CloudNotification:通过启用事件驱动工作流来监听云存储上的变更。当新文件出现在制定的目录时,cloudnotification触发Autoloader去初始化数据处理。

演示

接下来演示一下简单的使用,首先我们需要有一个ADB,还要有一个ADLS Gen2。 按照前文的配置 【Azure 架构师学习笔记】- Azure Databricks (14) – 搭建Medallion Architecture part 2的环境来配置。

新建一个文件夹raw_data, 然后 把测试数据放进去。

在这里插入图片描述
在notebook中执行下面的代码。

# 无需任何spark.conf配置,直接使用UC
spark.sql("USE CATALOG george_demo")
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")  # 如果Schema不存在
spark.sql("USE SCHEMA bronze")

# Auto Loader配置
adls_path = "abfss://bronze@medallionadls01.dfs.core.windows.net/raw_data/"
schema_path = "abfss://bronze@medallionadls01.dfs.core.windows.net/schemas/"
checkpoint_path = "abfss://bronze@medallionadls01.dfs.core.windows.net/system/checkpoints/"

# 流式读取(事件驱动模式)
bronze_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.useNotifications", "false")
    .load(adls_path)
)

# 写入UC托管表(三级命名空间)
target_table = "george_demo.bronze.transactions"  # Catalog.Schema.Table
(bronze_stream.writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .toTable(target_table)
    
)

执行前的截图:

在这里插入图片描述

执行代码:
在这里插入图片描述

执行后出现了新的表transactions
在这里插入图片描述
这是一个简单的演示,后续可以通过下图的schedule,或者借助event hub等事件触发来监控和加载新的文件。
在这里插入图片描述

相关文章:

  • LeetCode 热题 100_前 K 个高频元素(75_347_中等_C++)(堆)(哈希表+排序;哈希表+优先队列(小根堆))
  • 基于ssm的自习室预订座位管理(全套)
  • 蓝桥杯 阶乘的和
  • 登录Xshell主机及Linux基本指令
  • SpringBoot之一个注解完成所有类型的文件下载!
  • MySQL UPDATE 更新操作详解
  • 深入解析GORM的配置选项及示例
  • clickhouse清除system 表数据释放磁盘空间
  • 内容中台智能推荐服务创新路径
  • 网上怎么样可以挣钱,分享几种可以让你在家赚钱的兼职项目
  • 开发过程中的网络协议
  • SpringMVC(八)Knife4j 接口文档
  • XML 树结构
  • 爬虫逆向:逆向中用到汇编语言详细总结
  • 网络层协议
  • 教育直播培训系统源码解析:核心功能与实现方式
  • Android Room 框架公共模块源码深度剖析(四)
  • React Native 如何使用 Expo 快速开发?
  • 《C#上位机开发从门外到门内》3-2::Modbus数据采集系统
  • 缓存相关内容
  • 即日起,“应急使命·2025”演习公开征集新质救援能力
  • 购车补贴、“谷子”消费、特色产品,这些活动亮相五五购物节
  • 国家卫健委对近日肖某引发舆情问题开展调查
  • 中国海油总裁:低油价短期影响利润,但也催生资产并购机会
  • 2025年度中国青年五四奖章暨新时代青年先锋奖评选揭晓
  • 豆神教育:2024年净利润1.37亿元,同比增长334%