当前位置: 首页 > wzjs >正文

一级a做爰片免费网站体验百度seo关键词排名 s

一级a做爰片免费网站体验,百度seo关键词排名 s,青岛正规网站建设哪家好,微网站分享功能无状态转换操作与有状态转换操作 无状态转换操作: 无状态转换操作仅处理当前时间跨度内的数据。例如,设置的采集时间为三秒,则只处理这三秒内的数据。 有状态转换操作(UpdateStateByKey): 有状态转换操作可以跨批次处理数据。涉及…

无状态转换操作与有状态转换操作

无状态转换操作

        无状态转换操作仅处理当前时间跨度内的数据。例如,设置的采集时间为三秒,则只处理这三秒内的数据。

有状态转换操作(UpdateStateByKey)

        有状态转换操作可以跨批次处理数据。涉及跨批次的状态维护,如累加整个输入数据流的数据。        (主函数中获取当前数据和之前的状态,并进行累加合并更新状态。)

              

UpdateStateByKey 函数

功能

用于跨批次维护状态,记录历史记录。提供对状态变量的访问,更新每个键对应的状态。

  updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

使用步骤

定义状态(任意数据类型)。定义状态更新函数,基于新数据和当前状态更新状态。

updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

                1. 定义状态,状态可以是一个任意的数据类型。

                2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

代码示例

示例代码展示了如何定义和使用 updateStateByKey 函数进行累加操作。

(使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。)

val updateFunc = (values:Seq[Int],state:Option[Int])=>{val currentCount = values.foldLeft(0)(_+_)val previousCount = state.getOrElse(0)Some(currentCount+previousCount)
}
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")
val ssc = new StreamingContext(sparkConf,Seconds(5))
ssc.checkpoint("./ck")val lines = ssc.socketTextStream("node01",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val stateDStream = pairs.updateStateByKey[Int](updateFunc)
stateDStream.print()ssc.start()
ssc.awaitTermination()

Window Operations

        功能

设置窗口大小和滑动窗口间隔,动态获取流数据的状态。需要两个参数:窗口时长和滑动步长。

        使用要求

窗口时长和滑动步长必须是采集周期大小的整数倍。示例代码展示了如何设置窗口时长为12秒,滑动步长为6秒。

注意:

        这两者都必须为采集周期大小的整数倍。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")
val ssc = new StreamingContext(sparkConf,Seconds(3))
ssc.checkpoint("./ck")val lines = ssc.socketTextStream("node01",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(12),Seconds(6))
wordCounts.print()ssc.start()
ssc.awaitTermination()

DStream输出操作

常见方式

        打印在控制台上。保存成文本文件。保存成Java对象序列化形式。结合RDD进行输出。

输出操作如下:

 print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。

saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。

saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".

saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。

foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。

        通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

注意:

连接不能写在 driver 层面(序列化)

如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;

增加 foreachPartition,在分区创建(获取)。

http://www.dtcms.com/wzjs/238108.html

相关文章:

  • 网站怎么做才算精致百度一下百度搜索百度一下
  • 新网站域名备案流程seo品牌
  • 湖南手机网站建设公司企业员工培训内容及计划
  • 深圳网站导航电商网络销售是做什么
  • 想办个网站怎么做厦门人才网手机版
  • 广州网站设计联系方式营销型网站的类型有哪些
  • 上海政府网站建设报告百度一下百度搜索入口
  • 个人网站怎么做详情页seo需求
  • 网站建设原创软文百度分析
  • 怎么创建平台卖自己的产品seo技术教程网
  • 永年哪做网站叶涛网站推广优化
  • 黄页网站推广下载免费怎么制作属于自己的网址
  • 郑州网站建设361国内新闻最新消息十条
  • devmyapp免费网站seo排名优化
  • 企业网站建设步骤是什么网络宣传渠道有哪些
  • 西北电力建设第一工程公司网站做网络推广怎么收费
  • 网络营销方式对比分析论文seo关键词布局案例
  • 舞钢网站建设苏州网站制作
  • 电子商务网站建设与运维论文引擎搜索优化
  • 网站建设学费网站推广的营销策划方案
  • 国外主流媒体网站网络营销成功的原因
  • 商城网站功能广州seo优化外包服务
  • 营销网站建设汉狮电话seo营销名词解释
  • 自己做鲜花网站怎么样网络广告推广平台
  • 企商网站建设我是做推广的怎么找客户
  • 网站结构组成部分有那些百度投放广告平台
  • 印度人通过什么网站做国际贸易手机优化软件排行
  • 网站建设需要的文案优化网站排名方法
  • 企业开发软件公司拓展方案seo的名词解释
  • 网络营销搜索引擎友情链接seo