当前位置: 首页 > news >正文

安徽建海建设工程有限公司网站建个企业网站需要什么

安徽建海建设工程有限公司网站,建个企业网站需要什么,赣州市建设工程造价管理网站,网店推广新思维No18: 使用 Apache Spark 进行分布式计算 摘要 本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代…

No18: 使用 Apache Spark 进行分布式计算


摘要

本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代码示例(含输入输出),帮助开发者掌握大规模数据处理的核心技术。扩展部分探讨云原生部署和数据湖事务支持,为构建企业级大数据系统提供完整解决方案。
在这里插入图片描述


核心概念

1. 宽依赖 vs 窄依赖(性能影响)

  • 窄依赖:每个父分区对应单个子分区(如map/filter)
  • 宽依赖:父分区可能被多个子分区依赖(如groupByKey/join)
# 窄依赖示例
rdd = sc.parallelize([1,2,3])
squared = rdd.map(lambda x: x**2)  # 父子分区1:1# 宽依赖示例
grouped = squared.groupBy(lambda x: x%2)  # 触发shuffle

2. Catalyst优化器

通过四个阶段优化查询计划:

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()
df = spark.range(1000).filter("id > 500")
df.explain(True)  # 查看优化后的物理计划

3. 动态资源分配

# spark-defaults.conf配置
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20

4. Structured Streaming微批处理

# 实时词频统计
from pyspark.sql.functions import *lines = (spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load())word_counts = lines.select(explode(split(lines.value, " ")).alias("word")) \.groupBy("word").count()query = (word_counts.writeStream.outputMode("complete").format("console").start())

实战案例

1. 实时推荐系统

# 模拟用户行为数据
# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-pythonfrom pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.ml.recommendation import ALS# 创建Spark会话
spark = SparkSession.builder \.appName("StreamingDemo") \.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \.getOrCreate()# 定义数据模式
schema = StructType([StructField("id", StringType(), True),StructField("timestamp", TimestampType(), True),StructField("value", IntegerType(), True)
])# 读取流数据
stream_df = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "sensor_data").load().selectExpr("CAST(value AS STRING) as json").select(from_json("json", schema).alias("data")).select("data.*"))# 处理流数据
processed_df = stream_df.withWatermark("timestamp", "10 minutes") \.groupBy(col("id"), col("timestamp").cast("date").alias("date")) \.agg({"value": "avg"}) \.withColumnRenamed("avg(value)", "avg_value")# 输出到控制台(用于调试)
query = (processed_df.writeStream.outputMode("complete").format("console").option("truncate", "false").start())# 等待查询终止
query.awaitTermination()# 实时训练ALS模型
als = ALS(maxIter=5, regParam=0.01, userCol="user", itemCol="item", ratingCol="rating")
model = als.fit(stream_df)  # 实际需使用checkpoint机制# 输出预测结果
predictions = model.transform(stream_df)
query = predictions.writeStream.format("console").start()

2. 社交网络图分析(GraphFrames)

from graphframes import *# 构建社交关系图
vertices = spark.createDataFrame([("Alice", 34),("Bob", 36),("Charlie", 25)
], ["id", "age"])edges = spark.createDataFrame([("Alice", "Bob", "friend"),("Bob", "Charlie", "follow")
], ["src", "dst", "type"])g = GraphFrame(vertices, edges)# PageRank计算
results = g.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
"""
输出:
+-------+---+------------------+
|     id|age|          pagerank|
+-------+---+------------------+
|  Alice| 34| 0.456789456123...|
|    Bob| 36| 0.321456987456...|
|Charlie| 25| 0.221457896325...|
+-------+---+------------------+
"""

扩展思考

1. Spark on Kubernetes

# spark-submit配置示例
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:name: spark-pi
spec:type: Scalamode: clusterimage: gcr.io/spark-operator/spark:v3.1.1mainClass: org.apache.spark.examples.SparkPimainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jardriver:cores: 1memory: "512m"executor:cores: 1instances: 2memory: "512m"

2. Delta Lake事务支持

# 启用Delta Lake
from delta import DeltaTable# 写入事务日志
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta_table")# 版本回滚
deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.restoreToVersion(0)# Schema演化
data.withColumn("new_col", lit("abc")) \.write.format("delta") \.mode("append") \.option("mergeSchema", "true") \.save("/tmp/delta_table")

完整代码集成

# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-pythonfrom pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 创建Spark会话
spark = SparkSession.builder \.appName("DeltaLakeDemo") \.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \.getOrCreate()# 定义数据模式
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("category", StringType(), True),StructField("price", IntegerType(), True)
])# 流式数据写入Delta Lake
stream_df = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "raw_data").load().selectExpr("CAST(value AS STRING) as json").select(from_json("json", schema).alias("data")).select("data.*"))# 使用事务写入
query = (stream_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta_lake"))# 批量分析
delta_table = DeltaTable.forPath(spark, "/tmp/delta_lake")
delta_table.toDF().groupBy("category").count().show()# 输出示例:
"""
+----------+-----+
|  category|count|
+----------+-----+
|electronics|   45|
|     books|   30|
+----------+-----+
"""

通过本文的学习,您应该能够:

  1. 优化Spark作业的依赖关系和资源分配
  2. 实现流批一体的数据处理管道
  3. 构建实时推荐系统和图分析应用
  4. 掌握云原生部署和数据湖事务管理
http://www.dtcms.com/a/584193.html

相关文章:

  • 深圳网站建设 工作室cms软件有什么功能
  • 温州做网站公司女生去住建局好不好
  • 浙江高端网站杭州公司注销网站备案
  • 南宁最高端网站建设重庆网站网页设计培训机构
  • 山西 网站制作国内快速建站
  • 网站导航的重要性做淘宝网站报告
  • 职业教育网站平台建设互联网行业分为哪几类
  • 风景旅游网站建设的设计思路wordpress 不能发布文章
  • 网站代理备案步骤专业企业展厅设计公司
  • 食品网站策划进入百度知道首页
  • 网站培训公司自建网站编程
  • 建设管理部门网站查询玩具外贸网站模板
  • 2016年网站推广方法巨鹿网站建设网络公司
  • 深圳专业网站建设免费维护送域名空间wordpress顶部菜单怎么删
  • 怎样去各大网站做淘宝推广新品牌营销策划方案
  • 快速网站制作做汤的网站有哪些
  • .net做的网站app下载入口
  • 岳阳卖房网站一键免费搭建手机网站
  • 亚马逊如何做折扣网站的营销磁力狗在线搜索
  • 网站怎么做数据分析临沂恒商做网站
  • 江西建设职业技能教育咨询网站做拍福利爱福利视频网站
  • 摄影网站建站做网站的公司什么动力
  • 用c 做一个小网站怎么做网站建设开发合同模板
  • 国内网站备案流程图设计素材网址
  • 静安广州网站建设wordpress新建相册页面
  • ftp可以发布网站吗兰州市城乡建设局网站公布的信息
  • 做网站首页的表格的代码乌海市建设局网站
  • 公司做网站排名寻模板网站源码
  • 网站建设费用固定资产怎么入多媒体网站开发
  • 长春网站建设 信赖吉网传媒做网站用多大的画布