当前位置: 首页 > news >正文

宁波网站建设流程有哪些网站设计考虑要素

宁波网站建设流程有哪些,网站设计考虑要素,网站开发有哪些术语,国家企业信用信息系统年报入口park-Streaming概述 Spark-Streaming是什么 Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:…

park-Streaming概述
Spark-Streaming是什么
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。
所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

Spark-Streaming的特点:易用、容错、易整合到spark体系。
易用性:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序
容错:Spark Streaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。
易整合:Spark Streaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,实现交互式的查询操作。
Spark-Streaming架构

背压机制:
在Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值为false,即不启用。

DStream实操
案例一:WordCount案例
需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

实验步骤:
1.添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
2.编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object value26 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lineStreams = ssc.socketTextStream("node01", 9999)
    val wordStreams = lineStreams.flatMap(_.split(" "))
    val wordAndOneStreams = wordStreams.map((_, 1))
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
    wordAndCountStreams.print()

    ssc.start()
    ssc.awaitTermination()

3.启动netcat发送数据
nc -lk 9999
案例解析:
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
对数据的操作也是按照 RDD 为单位来进行的

DStream 创建
创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源

RDD队列
可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。

案例:
需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.Queue

object value27 {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置,设置应用在本地模式运行,使用所有可用核心,应用名为"RDDStream"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    // 创建StreamingContext,批处理间隔为4秒
    val ssc = new StreamingContext(sparkConf, Seconds(4))

    // 创建一个可变队列用于存放RDD
    val rddQueue = new mutable.Queue[org.apache.spark.rdd.RDD[Int]]()
    // 通过队列创建DStream,设置不每次处理一个RDD
    val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
    // 将DStream中的元素映射为键值对,值为1
    val mappedStream = inputStream.map((_, 1))
    // 按键对值进行累加
    val reducedStream = mappedStream.reduceByKey(_ + _)
    // 打印结果
    reducedStream.print()

    // 启动StreamingContext
    ssc.start()
    // 循环创建5个RDD并放入队列,每个RDD包含1到300的数字,分区数为10
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      try {
        Thread.sleep(2000)
      } catch {
        case e: InterruptedException => e.printStackTrace()
      }
    }
    // 等待StreamingContext终止
    ssc.awaitTermination()
  }
}

自定义数据源
自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
案例:自定义数据源,实现监控某个端口号,获取该端口号内容。

1)自定义数据源
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.net.Socket
import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets

Class CustomerReceiver(host:String,port:Int)extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  override def onStart(): Unit = {
    new Thread("Socket - Receiver") {
      override def run(): Unit = {
        receive()
      }
    }.start()
  }

  def receive(): Unit = {
    var socket: Socket = null
    var input: String = null
    var reader: BufferedReader = null
    try {
      socket = new Socket(host, port)
      reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      while (!isStopped && {input = reader.readLine(); input != null}) {
        store(input)
      }
    } catch {
      case e: Exception =>
        restart("Error receiving data", e)
    } finally {
      if (reader != null) reader.close()
      if (socket != null) socket.close()
      restart("restart")
    }
  }

  override def onStop(): Unit = {
    // 可添加停止相关逻辑,目前为空实现
  }
}
2)使用自定义的数据源采集数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object value28 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 使用自定义数据源创建DStream
    val customStream = ssc.receiverStream(new CustomerReceiver("localhost", 9999))
    customStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

http://www.dtcms.com/a/504290.html

相关文章:

  • 新手学做免费网站西安 网站搭建
  • floodfill 算法(dfs)
  • node做网站岳阳做网站推荐
  • 成都营销型网站建设公司网站设计中新闻版块怎么做
  • 基于LM2904A(3PEAK)的5V~24V电源电压检测电路完整设计笔记
  • 如何获取PyTorch中间层的值:两种实用方法详解
  • 电容式传感器
  • 网络授时与授时概念解析
  • 设计师投稿网站自媒体平台大全
  • 计算机网站怎么做wordpress 软件价格
  • Linux中I2C常见问题二
  • 做泰迪狗网站的意义下载全网搜
  • linux 5.10 移植kfence调试踩内存纪要
  • 网站游戏网站建设动漫设计与制作主要学什么
  • [嵌入式系统-146]:五次工业革命对应的机器人形态的演进、主要功能的演进以及操作系统的演进
  • 网站开发工程师前景分析网站推广经验杂谈
  • 做网站行业统称叫什么行业那些网站可以做0首付分期手机号
  • 百度网址大全 官网首页seoul是什么国家
  • 山西忻州市忻府区怎么把做的网站优化到百度
  • Shell脚本入门:从基础到实战
  • 【Linux】深入理解线程同步与互斥
  • 山东青岛网站建设公司排名网站空白页黑链
  • C++学习之变量、常量、关键字、标识符命名规则、数据类型
  • BMS电池管理系统学习笔记_SOC算法
  • 浅谈信创数据库改造重难点
  • 建设银行唐山分行网站上海专业网站建设服务
  • 算法沉淀第七天(AtCoder Beginner Contest 428 和 小训练赛)
  • 温州做网站定制车载互联系统网站建设
  • 迅当网络深圳外贸网站建设竞价网络推广
  • 【GESP】C++四级真题 luogu-B4006 [GESP202406 四级] 宝箱