当前位置: 首页 > news >正文

Spark自定义累加器实现高效WordCount

目录

1. 代码功能概述

2. 代码逐段解析

主程序逻辑

自定义累加器 MyAccumulator

3. Spark累加器原理

累加器的作用

AccumulatorV2 vs AccumulatorV1

累加器执行流程

4. 代码扩展与优化建议

支持多词统计

线程安全优化

使用内置累加器

5. Spark累加器的适用场景

6. 总结


package core.bcimport org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutableobject AccWordCount {def main(args: Array[String]): Unit = {val sparkConf=new SparkConf().setMaster("local").setAppName("AccWordCount")val sc = new SparkContext(sparkConf)val value = sc.makeRDD(List("hello","spark","hello"))//累加器:WordCount//创建累加器对象val wcAcc=new MyAccumulator()//向Spark进行注册sc.register(wcAcc, "wordCountAcc")value.foreach(word=>{//数据的累加(使用累加器)wcAcc.add(word)})//获取累加器结果println(wcAcc.value)sc.stop()}/*** 自定义数据累加器* 1、继承AccumulatorV2。定义泛型*  IN:累加器输入的数据类型*  OUT:返回的数据类型* 2、重写方法*/class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{val wcMap = mutable.Map[String, Long]()override def isZero: Boolean = wcMap.isEmpty//判断知否为初始状态override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()//复制一个新的累加器override def reset(): Unit = wcMap.clear()//重置累加器override def add(word: String): Unit ={   //获取累加器需要计算的值val newcount=wcMap.getOrElse(word,0L)+1LwcMap.update(word,newcount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {//Driver合并多个累加器val map1=this.wcMapval map2=other.valuemap2.foreach {case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countwcMap.update(word, newCount)}}}override def value: mutable.Map[String, Long] = wcMap //获取累加器结果}
}
1. 代码功能概述

该代码使用Apache Spark实现了一个基于自定义累加器的单词计数(WordCount)程序。通过自定义MyAccumulator类(继承AccumulatorV2),统计RDD中每个单词的出现次数,并利用累加器的分布式聚合特性将结果汇总到驱动程序。


2. 代码逐段解析
主程序逻辑
object AccWordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local").setAppName("AccWordCount")val sc = new SparkContext(sparkConf)val value = sc.makeRDD(List("hello", "spark", "hello"))// 创建并注册累加器val wcAcc = new MyAccumulator()sc.register(wcAcc, "wordCountAcc")// 遍历RDD,累加单词value.foreach(word => wcAcc.add(word))// 输出结果println(wcAcc.value) // 预期输出:Map(hello -> 2, spark -> 1)sc.stop()}
}
  • RDD创建sc.makeRDD生成包含3个单词的RDD。
  • 累加器注册MyAccumulator实例通过sc.register注册到SparkContext,名称为wordCountAcc
  • 累加操作foreach遍历RDD中的每个单词,调用wcAcc.add(word)累加计数。
  • 结果获取wcAcc.value返回最终的单词计数Map。

自定义累加器 MyAccumulator
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {val wcMap = mutable.Map[String, Long]()override def isZero: Boolean = wcMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()override def reset(): Unit = wcMap.clear()override def add(word: String): Unit = {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach { case (word, count) =>val newCount = map1.getOrElse(word, 0L) + countwcMap.update(word, newCount)}}override def value: mutable.Map[String, Long] = wcMap
}
  • 核心字段wcMap用于存储单词及其计数。
  • 关键方法
    • isZero:判断累加器是否为空(初始状态)。
    • copy:创建累加器的副本(用于任务节点本地计算)。
    • reset:清空累加器状态。
    • add:累加单个单词的计数。
    • merge:合并其他累加器的统计结果(分布式汇总)。
    • value:返回最终结果。

3. Spark累加器原理
累加器的作用
  • 分布式聚合:在多个任务节点上独立计算局部结果,最后汇总到驱动程序。
  • 高效通信:避免频繁的Shuffle操作,减少网络开销。
  • 线程安全:Spark保证每个任务节点内的累加器操作是串行的。
AccumulatorV2 vs AccumulatorV1
  • AccumulatorV1:仅支持简单数据类型(如LongDouble),适用于计数、求和等场景。
  • AccumulatorV2:支持复杂数据类型(如Map、List),需自定义addmerge方法,适用于更灵活的聚合需求(如WordCount)。
累加器执行流程
  1. 任务节点本地计算:每个任务节点维护累加器的本地副本,通过add方法累加数据。
  2. 结果汇总:任务完成后,Spark将各节点的累加器副本发送到驱动程序,调用merge方法合并结果。
  3. 驱动程序获取结果:通过value方法获取全局聚合结果。

4. 代码扩展与优化建议
支持多词统计

当前代码统计单次出现的单词,若需统计多个单词(如键值对),可修改add方法:

override def add(input: String): Unit = {val words = input.split("\\s+") // 按空格分割多词words.foreach(word => {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)})
}

线程安全优化

add方法可能被多线程并发调用(如在复杂算子中),需添加同步锁:

override def add(word: String): Unit = this.synchronized {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)
}
使用内置累加器

对于简单场景(如全局计数),可直接使用Spark内置的LongAccumulator

val countAcc = sc.longAccumulator("countAcc")
value.foreach(_ => countAcc.add(1))
println(countAcc.value) // 输出总记录数

5. Spark累加器的适用场景
  • 全局计数:统计任务处理的总记录数、错误数等。
  • 分组统计:如WordCount、用户行为分类统计。
  • 指标监控:实时计算平均值、最大值等(需结合自定义逻辑)。
  • 调试与日志:在不中断作业的情况下收集分布式运行状态。

6. 总结

该代码通过自定义AccumulatorV2实现了分布式单词计数,展示了累加器的核心原理:任务节点本地计算 + 驱动程序全局汇总。通过合理设计addmerge方法,累加器可支持复杂聚合逻辑,是Spark中高效的分布式统计工具。

http://www.dtcms.com/a/361135.html

相关文章:

  • 众擎机器人开源代码解读
  • 液态神经网络(LNN)2:LTC改进成CFC详细推导过程
  • Linux 孤儿进程 (Orphan Process)
  • 动作指令活体检测通过动态交互验证真实活人,保障安全
  • 【大模型】大模型微调-RLHF(强化学习)
  • 技术速递|构建你的第一个 MCP 服务器:如何使用自定义功能扩展 AI 工具
  • 分享智能电动窗帘方案
  • 串口通讯个人见解
  • 智能核心:机器人芯片的科技革新与未来挑战
  • 【STM32】贪吃蛇 [阶段 8] 嵌入式游戏引擎通用框架设计
  • 山东教育报省级报刊简介
  • Axios拦截器:前端通信的交通警察[特殊字符]
  • 手机网络IP归属地更改方法总结
  • 人工智能-python-深度学习-项目全流程解析
  • LeetCode刷题记录----74.搜索二维矩阵(Medium)
  • 2025年中国GEO优化服务商全景分析:技术演进、核心能力与选型指南
  • 设计模式14-组合模式
  • 内存管理 - 从虚拟到物理
  • ADSL 代理 Proxy API 申请与使用指南
  • 前端安全防护深度实践:从XSS到CSRF的完整安全解决方案
  • T507 音频调试
  • 在 Qt 中:QString 好,还是 std::string 好?
  • DVWA靶场通关笔记-Weak Session IDs (Impossible级别)
  • 【Flask】测试平台开发,实现全局邮件发送工具 第十二篇
  • 【SpringBoot】20 - SpringBoot中的Ajax和MyBatis究竟是什么?
  • 【lucene核心】impacts的由来
  • 【Web安全】CRLF注入攻击深度解析:原理、场景与安全测试防御指南
  • hive表不显示列注释column comment的问题解决
  • 【Proteus仿真】蜂鸣器控制系列仿真——蜂鸣器控制/蜂鸣器播放音乐/蜂鸣器播放多种音乐/蜂鸣器和LED组成报警装置
  • UE5 C++ 第三方动态库的使用