当前位置: 首页 > news >正文

生产环境Spark Structured Streaming实时数据处理应用实践分享

封面图片

生产环境Spark Structured Streaming实时数据处理应用实践分享

一、业务场景描述

我们所在的电商平台需要实时监控用户行为数据(如点击、下单、支付等),基于事件级别的流式数据进行实时统计、会话聚合、漏斗分析,并将结果推送到Dashboard和报表存储。原有系统使用的Storm+Kafka方案在高并发时存在容错难、状态管理复杂、维护成本高的问题。

核心需求:

  • 低延迟:端到端处理延迟控制在2秒以内。
  • 可伸缩:能水平扩展,应对峰值10万条/秒消息吞吐。
  • 容错性:任务失败自动重启且保证端到端数据不丢失。
  • 状态管理:支持有状态聚合(窗口、会话)和超大状态存储。

二、技术选型过程

我们对主流实时计算框架进行了对比:

| 框架 | 延迟 | 状态管理 | 易用性 | 扩展性 | 社区成熟度 | | ---- | ---- | ---- | ---- | ---- | ---- | | Apache Storm | 500ms~1s | 需自行实现State Store | 开发复杂 | 高 | 高 | | Apache Flink | 200ms~500ms | 内置强大状态管理 | 编程模型复杂 | 高 | 高 | | Spark Structured Streaming | 1s~2s | 使用Checkpoint and WAL,可容错 | API友好,基于Spark SQL | 高 | 高 | | Apache Kafka Streams | <1s | 基于RocksDB,状态管理受限 | 与Kafka耦合高 | 中 | 中 |

综合考虑团队技术栈和运维成本,我们最终选定Spark Structured Streaming:

  • 与现有Spark Batch集群共用资源。
  • 编程模型统一,SQL/DS/Lambda API支持灵活。
  • Checkpoint与WAL机制简化状态管理,集成HDFS持久化状态。

三、实现方案详解

3.1 项目结构

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.company.streaming
│   │   │       ├── App.java
│   │   │       └── utils
│   │   │           └── KafkaOffsetManager.java
│   │   └── resources
│   │       └── application.conf
└── README.md

3.2 核心配置(application.conf)

spark.app.name=RealTimeUserBehavior
spark.master=yarn
spark.sql.shuffle.partitions=200
spark.streaming.backpressure.enabled=true
spark.checkpoint.dir=hdfs://namenode:8020/app/checkpoints/structured-streaming
kafka.bootstrap.servers=broker1:9092,broker2:9092
kafka.topic.user=topic_user_behavior
kafka.group.id=user_behavior_group

3.3 主入口代码(App.java)

package com.company.streaming;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;public class App {public static void main(String[] args) throws Exception {SparkSession spark = SparkSession.builder().appName("RealTimeUserBehavior").getOrCreate();// 从Kafka读取原始数据Dataset<Row> raw = spark.readStream().format("kafka").option("kafka.bootstrap.servers", spark.sparkContext().getConf().get("kafka.bootstrap.servers")).option("subscribe", spark.sparkContext().getConf().get("kafka.topic.user")).option("startingOffsets", "latest").load();// 解析JSON并选取字段Dataset<Row> userEvents = raw.selectExpr("CAST(value AS STRING) as json").select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("json"),DataSchema.eventSchema()).as("data")).select("data.*");// 实时会话聚合:10分钟无操作认为会话结束Dataset<Row> sessions = userEvents.withWatermark("eventTime", "2 minutes").groupBy(org.apache.spark.sql.functions.window(org.apache.spark.sql.functions.col("eventTime"),"10 minutes", "5 minutes"),org.apache.spark.sql.functions.col("userId")).agg(org.apache.spark.sql.functions.count("eventType").alias("eventCount"),org.apache.spark.sql.functions.min("eventTime").alias("startTime"),org.apache.spark.sql.functions.max("eventTime").alias("endTime"));// 输出到HDFS OR 更新到外部系统sessions.writeStream().outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://namenode:8020/app/output/user_sessions").option("checkpointLocation", spark.sparkContext().getConf().get("spark.checkpoint.dir") + "/sessions").start().awaitTermination();}
}

3.4 关键工具类(KafkaOffsetManager.java)

package com.company.streaming.utils;// 省略:管理Kafka手动提交offset、读写Zookeeper存储偏移量

四、踩过的坑与解决方案

  1. 状态膨胀导致Checkpoint文件过大:

    • 方案:定期做State TTL清理,结合Spark 3.1.1+的state cleanup策略。
  2. Kafka消费位点重复或丢失:

    • 方案:使用KafkaOffsetManager手动管理,结合幂等写入目标系统保证At-Least-Once语义。
  3. 延迟抖动:

    • 方案:开启backpressure,限制最大并行度,并合理调整Trigger频率。
  4. Driver内存溢出:

    • 方案:提升driver内存,拆分业务流程;或将部分轻量计算迁移至Executors。

五、总结与最佳实践

  • 合理规划Checkpoint和WAL存储目录,避免与业务数据混淆。
  • 利用Spark监控UI实时观察批次时长、shuffle写入、延迟指标。
  • 结合PeriodicStateCleanup+Watermark确保有状态算子状态可控。
  • 抽象共通工具类(KafkaOffsetManager、JSON解析、公用Schema),提高代码可维护性。
  • 复杂业务可拆分成多个流式子作业,下游合并结果,增强可扩展性。

通过以上实践,我们成功将平台数据实时处理延迟稳定在1.2秒左右,作业稳定运行10+节点集群一个季度零故障。

http://www.dtcms.com/a/356925.html

相关文章:

  • 【3D入门-指标篇下】 3D重建评估指标对比-附实现代码
  • SwiGLU激活函数的原理
  • 【原版系统】Windows 11 LTSC 2024
  • Blender中旋转与翻转纹理的实用方法教学
  • Java全栈工程师的面试实战:从技术细节到业务场景
  • 企业级数据库管理实战(三):数据库性能监控与调优的实战方法
  • 达梦数据库-数据缓冲区
  • React前端开发_Day5
  • OCELOT 2023:细胞 - 组织相互作用场景下的细胞检测挑战赛|文献速递-深度学习人工智能医疗图像
  • BSS138-7-F 电子元器件Diodes美台N沟道小信号增强型MOSFET晶体管
  • 基于MCP工具的开发-部署-上线与维护全流程技术实现与应用研究
  • Bert学习笔记
  • CSS scale函数详解
  • 基于BeautifulSoup库的简易爬虫实现:以大学排名为例
  • 【K8s】整体认识K8s之与集群外部访问--service
  • 机器学习回顾——逻辑回归
  • pcl封装6 connection_cloud 提取聚簇后的每个点云
  • 开源vs商用美颜sdk:美白滤镜功能在直播中的优劣对比
  • RoadMP3告别车载音乐烦恼,一键get兼容音频
  • FDTD_mie散射_项目研究(1)
  • 抖音电商首创最严珠宝玉石质检体系,推动行业规范与消费扩容
  • Shader开发(十八)实现纹理滚动效果
  • Shell 脚本基础教程
  • AARRR模型(用户生命周期模型)——用户怎么长大的?
  • 【人工智能99问】GPT4的原理是什么?(32/99)
  • 【备战2025数模国赛】(三)数模常见赛题类型及解决办法
  • 矩池云中LLaMA- Factory多机多卡训练
  • 介绍⼀下Llama的结构
  • 身份证实名认证API集成—身份核验接口-网络平台安全合规
  • GoogLeNet:深度学习中的“卷积网络变形金刚“