当前位置: 首页 > news >正文

【Spark征服之路-4.5-Spark-Streaming核心编程(三)】

DStream转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。

注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加

import StreamingContext._才能在 Scala 中使用。

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineDStream :ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
val wordAndCountDStream :DStream[(String,Int)] = lineDStream.transform(rdd => {
val words :RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne :RDD[(String,Int)] = words.map((_,1))
val value :RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
  value
})
wordAndCountDStream.print()

ssc.start()
ssc.awaitTermination()

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("join")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineDStream1 :ReceiverInputDStream[String] = ssc.
  socketTextStream("node01",9999)
val lineDStream2 :ReceiverInputDStream[String] = ssc.
  socketTextStream("node02",8888)

val wordToOneDStream :DStream[(String,Int)] = lineDStream1
  .flatMap(_.split(" ")).map((_,1))

val wordToADstream :DStream[(String,String)] = lineDStream2
  .flatMap(_.split(" ")).map((_,"a"))

val joinDStream :DStream[(String,(Int,String))]=wordToOneDStream
  .join(wordToADstream)

joinDStream.print()

ssc.start()
ssc.awaitTermination()

http://www.dtcms.com/a/314442.html

相关文章:

  • debian 时间同步 设置ntp服务端 客户端
  • FastAPI入门:中间件、CORS跨域资源共享、SQL数据库
  • 【笔记】ROS1|4 Turtlebot3仿真Waffle循线跟踪【旧文转载】
  • Linux 磁盘管理与分区配置
  • open-webui pipelines报404, ‘Filter pipeline.exporter not found‘
  • 【测试工程思考】云平台测试可重用性和场景覆盖度的平衡术
  • 遮天(太古篇)
  • windows内核研究(软件调试-软件断点)
  • [QMT量化交易小白入门]-七十六、从tick数据中获取高频交易的量价背离信号
  • Java开发时出现的问题---语言特性与基础机制陷阱
  • 使用AI IDE编程,如Cursor
  • 小迪安全v2023学习笔记(五十一讲)—— 持续更新中
  • Ubuntu 下编译 SQLCipher 4.8.0
  • yolo实现基于深度学习的龋齿检测系统pyqt
  • Java 发送 HTTP POST请求教程
  • 回归预测 | MATLAB实现BP神经网络多输入单输出回归预测+SHAP可解释分析
  • 基于Matlab的聚类彩色图像分割系统
  • 基于MATLAB实现的心电图自动诊断系统
  • 各种信号分解、模态分解方法合集【MATLAB实现】
  • 链表问题解决分析框架
  • python与C++
  • 【RH124知识点问答题】第7章 控制对文件的访问
  • 【秋招笔试】2025.08.03虾皮秋招笔试-第一题
  • 芯片行业中的EDA(电子设计自动化)是什么?
  • 房屋租赁小程序租房小程序房产信息发布系统房屋租赁微信小程序源码
  • 用户管理——配置文件和命令
  • 探索机器学习在医疗领域的应用与挑战
  • Visual Studio 2022安装与快捷键全攻略
  • AI产品经理面试宝典第61天:AI产品体验、数据安全与架构实战解析
  • Linux中netstat详细使用指南