当前位置: 首页 > wzjs >正文

网站建设中的端口广安做网站公司

网站建设中的端口,广安做网站公司,买网站平台名字吗,网站开发需要证书吗No18: 使用 Apache Spark 进行分布式计算 摘要 本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代…

No18: 使用 Apache Spark 进行分布式计算


摘要

本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代码示例(含输入输出),帮助开发者掌握大规模数据处理的核心技术。扩展部分探讨云原生部署和数据湖事务支持,为构建企业级大数据系统提供完整解决方案。
在这里插入图片描述


核心概念

1. 宽依赖 vs 窄依赖(性能影响)

  • 窄依赖:每个父分区对应单个子分区(如map/filter)
  • 宽依赖:父分区可能被多个子分区依赖(如groupByKey/join)
# 窄依赖示例
rdd = sc.parallelize([1,2,3])
squared = rdd.map(lambda x: x**2)  # 父子分区1:1# 宽依赖示例
grouped = squared.groupBy(lambda x: x%2)  # 触发shuffle

2. Catalyst优化器

通过四个阶段优化查询计划:

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()
df = spark.range(1000).filter("id > 500")
df.explain(True)  # 查看优化后的物理计划

3. 动态资源分配

# spark-defaults.conf配置
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20

4. Structured Streaming微批处理

# 实时词频统计
from pyspark.sql.functions import *lines = (spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load())word_counts = lines.select(explode(split(lines.value, " ")).alias("word")) \.groupBy("word").count()query = (word_counts.writeStream.outputMode("complete").format("console").start())

实战案例

1. 实时推荐系统

# 模拟用户行为数据
# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-pythonfrom pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.ml.recommendation import ALS# 创建Spark会话
spark = SparkSession.builder \.appName("StreamingDemo") \.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \.getOrCreate()# 定义数据模式
schema = StructType([StructField("id", StringType(), True),StructField("timestamp", TimestampType(), True),StructField("value", IntegerType(), True)
])# 读取流数据
stream_df = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "sensor_data").load().selectExpr("CAST(value AS STRING) as json").select(from_json("json", schema).alias("data")).select("data.*"))# 处理流数据
processed_df = stream_df.withWatermark("timestamp", "10 minutes") \.groupBy(col("id"), col("timestamp").cast("date").alias("date")) \.agg({"value": "avg"}) \.withColumnRenamed("avg(value)", "avg_value")# 输出到控制台(用于调试)
query = (processed_df.writeStream.outputMode("complete").format("console").option("truncate", "false").start())# 等待查询终止
query.awaitTermination()# 实时训练ALS模型
als = ALS(maxIter=5, regParam=0.01, userCol="user", itemCol="item", ratingCol="rating")
model = als.fit(stream_df)  # 实际需使用checkpoint机制# 输出预测结果
predictions = model.transform(stream_df)
query = predictions.writeStream.format("console").start()

2. 社交网络图分析(GraphFrames)

from graphframes import *# 构建社交关系图
vertices = spark.createDataFrame([("Alice", 34),("Bob", 36),("Charlie", 25)
], ["id", "age"])edges = spark.createDataFrame([("Alice", "Bob", "friend"),("Bob", "Charlie", "follow")
], ["src", "dst", "type"])g = GraphFrame(vertices, edges)# PageRank计算
results = g.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
"""
输出:
+-------+---+------------------+
|     id|age|          pagerank|
+-------+---+------------------+
|  Alice| 34| 0.456789456123...|
|    Bob| 36| 0.321456987456...|
|Charlie| 25| 0.221457896325...|
+-------+---+------------------+
"""

扩展思考

1. Spark on Kubernetes

# spark-submit配置示例
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:name: spark-pi
spec:type: Scalamode: clusterimage: gcr.io/spark-operator/spark:v3.1.1mainClass: org.apache.spark.examples.SparkPimainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jardriver:cores: 1memory: "512m"executor:cores: 1instances: 2memory: "512m"

2. Delta Lake事务支持

# 启用Delta Lake
from delta import DeltaTable# 写入事务日志
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta_table")# 版本回滚
deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.restoreToVersion(0)# Schema演化
data.withColumn("new_col", lit("abc")) \.write.format("delta") \.mode("append") \.option("mergeSchema", "true") \.save("/tmp/delta_table")

完整代码集成

# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-pythonfrom pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 创建Spark会话
spark = SparkSession.builder \.appName("DeltaLakeDemo") \.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \.getOrCreate()# 定义数据模式
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("category", StringType(), True),StructField("price", IntegerType(), True)
])# 流式数据写入Delta Lake
stream_df = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "raw_data").load().selectExpr("CAST(value AS STRING) as json").select(from_json("json", schema).alias("data")).select("data.*"))# 使用事务写入
query = (stream_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta_lake"))# 批量分析
delta_table = DeltaTable.forPath(spark, "/tmp/delta_lake")
delta_table.toDF().groupBy("category").count().show()# 输出示例:
"""
+----------+-----+
|  category|count|
+----------+-----+
|electronics|   45|
|     books|   30|
+----------+-----+
"""

通过本文的学习,您应该能够:

  1. 优化Spark作业的依赖关系和资源分配
  2. 实现流批一体的数据处理管道
  3. 构建实时推荐系统和图分析应用
  4. 掌握云原生部署和数据湖事务管理
http://www.dtcms.com/wzjs/551599.html

相关文章:

  • 信誉比较好的网上做任务的网站手机网站网页设计
  • 长沙网站 建设推广世云网络怎样才能创建网站
  • 网站建设自查合肥做网站加盟
  • 重庆网站建公司大全网站建设 网络科技公司
  • 网站管理与维护做网站与做网页的区别
  • 企业电子商务网站有哪些做网站需要 的文档
  • 济宁网站建设优化亿峰wordpress企业主题免费
  • 苏ICP备网站建设中企动力无锡公众号涨粉
  • 网站主页图片怎么换北京seo关键词优化收费
  • 白狐网站建设网站开发实训内容
  • 怎么优化网站源码关键词网站建设与网页设计从入门到精通 pdf
  • 做国外lead应该做什么网站陕西省交通集团建设网站
  • 北京好用的h5建站百度查重入口免费版
  • 做商务网站要多少钱手机版网站怎么上传
  • 网站建设终端是什么C语言做网站需要创建窗口吗
  • 网站开发培训实训公司网站与营销网站在栏目上的不同
  • 兵团建设环保局网站网上医疗和医院网站建设制作
  • 福州企业网站建设专业服务上海高端品牌网站建设
  • 精品课程网站建设毕业设计论文怎么做正规网站
  • t恤在线设计网站app免费下载
  • 做最优秀的自己演讲视频网站网络工程属于计算机类吗
  • 做影视网站难吗专门做微信小程序的公司
  • 阿里巴巴网站建设缺点白银市城县建设局网站
  • 个人备案网站会影响吗网页设计和网站建设实战大全
  • 购买一个网站需要多少钱高端网站设计上海网站建设上海
  • 学了lamp做网站就足够了吗南昌网站seo外包服务
  • 网站开发文档撰写模板旅游网站建设网
  • 湖南平台网站建设哪家好帝国网站管理系统安装连接不上数据库
  • 福州网红外包网站怎么做seo
  • 论坛型网站开发山东机关建设网站