当前位置: 首页 > wzjs >正文

网站开发工作前景河南郑州网站推广优化外包

网站开发工作前景,河南郑州网站推广优化外包,工业和信息化部网站备案,梓潼网站建设在Azure Databricks中使用PySpark实现缓慢变化维度(SCD)的三种核心类型,需结合Spark SQL和DataFrame API的特性,并利用Delta Lake的事务支持。以下是具体设计与实现步骤,以及测试用例: 通过以下步骤&#…

在Azure Databricks中使用PySpark实现缓慢变化维度(SCD)的三种核心类型,需结合Spark SQL和DataFrame API的特性,并利用Delta Lake的事务支持。以下是具体设计与实现步骤,以及测试用例:

通过以下步骤,可在Azure Databricks中高效实现SCD逻辑,确保数据历史可追溯且符合业务需求。


类型1:覆盖旧值(Overwrite Old Value)

设计要点
  • 直接更新目标表中变化的字段,不保留历史记录。
  • 使用MERGE INTO操作(Delta Lake特性)实现高效更新。
实现步骤
  1. 读取数据:加载源数据(增量数据)和维度表(目标表)。
  2. 匹配键:根据业务键(如customer_id)匹配源数据和目标表。
  3. 更新记录:若匹配成功且属性变化,更新目标表字段。
  4. 写入结果:使用MERGE操作完成更新。
代码示例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()# 加载源数据和目标表
source_df = spark.read.format("delta").load("source_path")
target_df = spark.read.format("delta").load("target_path")# 创建临时视图
source_df.createOrReplaceTempView("source")
target_df.createOrReplaceTempView("target")# 执行MERGE操作
merge_sql = """
MERGE INTO target AS t
USING source AS s
ON t.customer_id = s.customer_id
WHEN MATCHED AND t.address <> s.address THENUPDATE SET t.address = s.address
WHEN NOT MATCHED THENINSERT (customer_id, name, address) VALUES (s.customer_id, s.name, s.address)
"""spark.sql(merge_sql)
测试用例
  • 场景1:客户地址变更,验证目标表中旧地址被覆盖。
  • 场景2:新增客户插入,验证记录正确添加。
  • 场景3:无变化数据,确认无操作执行。

类型2:创建新记录(SCD Type 2)

设计要点
  • 为每个变化生成新记录,维护start_dateend_dateis_current标志。
  • 使用窗口函数检测变化,MERGE操作插入新记录并更新旧记录状态。
实现步骤
  1. 添加版本标识:在目标表中增加start_dateend_dateis_current字段。
  2. 检测变化:对比源数据与目标表,标记变化的记录。
  3. 关闭旧记录:将变化记录的end_date设为当前日期,is_current设为False
  4. 插入新记录:为新记录设置start_date为当前时间,end_dateNULLis_currentTrue
代码示例
from pyspark.sql import functions as F
from pyspark.sql.window import Window# 加载数据
source = spark.read.format("delta").load("source_path")
target = spark.read.format("delta").load("target_path")# 检测变化:假设通过last_updated_time判断
changes = source.join(target, "product_id", "left_outer") \.filter((source.price != target.price) | target.product_id.isNull())# 关闭旧记录
closed_records = target.join(changes, "product_id", "inner") \.withColumn("end_date", F.current_date()) \.withColumn("is_current", F.lit(False)) \.select(target["*"], "end_date", "is_current")# 生成新记录(新增或变化)
new_records = changes.withColumn("start_date", F.current_date()) \.withColumn("end_date", F.lit(None).cast("date")) \.withColumn("is_current", F.lit(True)) \.select("product_id", "price", "start_date", "end_date", "is_current")# 合并并写入Delta表
final_df = closed_records.unionByName(new_records)
final_df.write.format("delta") \.mode("overwrite") \.saveAsTable("product_dimension")
测试用例
  • 场景1:产品价格首次插入,验证is_current=Trueend_date=NULL
  • 场景2:价格变更后,旧记录is_current=False,新记录时间连续。
  • 场景3:多次变更,检查历史版本数量及时间连续性。

类型3:添加有效日期(SCD Type 3)

设计要点
  • 为每个记录维护valid_fromvalid_to日期,仅保留有限历史(如最新一次变更)。
  • 更新时修改旧记录的valid_to,插入新记录。
实现步骤
  1. 初始化字段:在目标表中添加valid_fromvalid_to
  2. 检测变化:对比新旧数据,找出属性变化的记录。
  3. 关闭旧记录:将旧记录valid_to设为当前日期。
  4. 插入新记录:设置新记录的valid_from为当前日期,valid_toNULL
代码示例
# 加载数据
employee_source = spark.read.format("delta").load("source_path")
employee_target = spark.read.format("delta").load("target_path")# 检测职位变化
changes = employee_source.join(employee_target, "employee_id", "left") \.filter(employee_source.position != employee_target.position)# 关闭旧记录(设置valid_to)
closed = changes.select(employee_target["*"]) \.withColumn("valid_to", F.current_date())# 插入新记录
new_records = changes.select(employee_source["*"]) \.withColumn("valid_from", F.current_date()) \.withColumn("valid_to", F.lit(None).cast("date"))# 合并数据并写入
closed.unionByName(new_records) \.write.format("delta") \.mode("overwrite") \.saveAsTable("employee_dimension")
测试用例
  • 场景1:员工职位首次记录,valid_to为空。
  • 场景2:职位变更后,旧记录valid_to更新,新记录valid_from正确。
  • 场景3:查询特定时间点的职位状态,验证时间段准确性。

混合方法实现示例

结合类型1和类型2,对关键属性使用类型2,非关键属性使用类型1:

# 关键属性(如地址)使用类型2
address_changes = detect_address_changes(source, target)
close_old_address_records(address_changes)
insert_new_address_records(address_changes)# 非关键属性(如电话)使用类型1
phone_updates = detect_phone_changes(source, target)
update_phone_records(phone_updates)

关键优化点

  1. Delta Lake特性:利用MERGE INTO、ACID事务、Z-Order优化查询性能。
  2. 数据版本管理:使用DESCRIBE HISTORY查看SCD操作历史。
  3. 增量处理:仅处理变化的数据分区,减少计算量。

文章转载自:

http://LO79bVmf.pnmtk.cn
http://UOxqPxbW.pnmtk.cn
http://NnxZAIro.pnmtk.cn
http://UQVXzjZY.pnmtk.cn
http://XkIPrN0M.pnmtk.cn
http://ljBlslpn.pnmtk.cn
http://a2JKNI4H.pnmtk.cn
http://9nzlOBHw.pnmtk.cn
http://oVvLvbSc.pnmtk.cn
http://FsaWa2xZ.pnmtk.cn
http://6iB0dIHi.pnmtk.cn
http://CCM2A01v.pnmtk.cn
http://YMmN8lg7.pnmtk.cn
http://VTqDZn8G.pnmtk.cn
http://A35m5x6b.pnmtk.cn
http://wNC23Snv.pnmtk.cn
http://fOCCh6Ri.pnmtk.cn
http://aOUere2r.pnmtk.cn
http://9ankvwg0.pnmtk.cn
http://PoSDFuKB.pnmtk.cn
http://GaK5LqGn.pnmtk.cn
http://u8Im5FCO.pnmtk.cn
http://NWhVTeNj.pnmtk.cn
http://Y8mjuoMB.pnmtk.cn
http://Mh9mrkeQ.pnmtk.cn
http://kHCBhsQi.pnmtk.cn
http://CwYn8ZZy.pnmtk.cn
http://1X9UnkBe.pnmtk.cn
http://bMN7CaTK.pnmtk.cn
http://PAfcpxPc.pnmtk.cn
http://www.dtcms.com/wzjs/609517.html

相关文章:

  • 网网站建设设计公司店铺推广
  • 图案设计网站有哪些基于推荐算法的网站开发
  • 玉树州网站建设公司wordpress是开源
  • 淘宝做网站推广怎么样阳澄湖大闸蟹网站建设
  • 哪个网站可以免费学编程深圳电器公司官网
  • 不错的网站开发公司无为网站建设
  • 如何解析到凡科建设的网站吾享crm客户管理系统
  • 佛山建设小学网站网站建设运营岗位职责
  • 公司网站建设维护合同范本登封市建设局网站
  • 国外网站做盗版wordpress主题kratos
  • 怎样建淘宝客网站wordpress图片标签
  • 建设银行企业信息门户网站做网站分辨率设置多少
  • 网站建设网络门户温州网站建设哪家好
  • 做网站域名的好处是什么如何登录到wordpress
  • 网站推广怎么做 知乎重庆网站建设快速建站
  • 西安专业的网站设计费用wordpress建不了网站
  • 建网站有域名和主机经典网站源码
  • 有没有什么推荐的网站网站制作网站开发ple id充值
  • 建设网站的相关技术旅游网站设计分析
  • 自做网站好做吗注册公司登录什么网站
  • wordpress多站点demo如何创建网页模板
  • 有了域名如何建设网站做网站分析
  • windows10PHP 网站建设做网站内容都有哪些
  • 邯郸网站建设制作广州网络公司政策
  • wordpress建站门户建网站英语怎么说
  • 网站做推广需要多少钱网站建设及安全规范
  • 向雅虎提交网站epanel wordpress
  • 做网站要用写接口6珲春住房和城乡建设局网站
  • 网站制作教程网页开发界面设计
  • 河北建设厅网站查询网站认证