当前位置: 首页 > news >正文

Spark-streaming核心编程

1.导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.编写代码

创建SparkConfStreamingContext

定义Kafka相关参数,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法创建DStream,该方法接受StreamingContext、位置策略、消费者策略等参数。

提取数据中的value部分,并进行word count计算。

启动StreamingContext并等待其终止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //定义kafka相关参数

    val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

      ->"node01:9092,node02:9092,node03:9092",

      ConsumerConfig.GROUP_ID_CONFIG->"kafka",

      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

    )

    //通过读取kafka数据,创建DStream

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

      ssc,LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

    )

    //提取出数据中的value部分

    val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

    //wordCount计算逻辑

    valueDStream.flatMap(_.split(" "))

      .map((_,1))

      .reduceByKey(_+_)

      .print()

    ssc.start()

    ssc.awaitTermination()

  }

  }

3.运行程序

开启Kafka集群。

4.使用Kafka生产者产生数据。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

​5运行Spark Streaming程序,接收Kafka生产的数据并进行处理。

6.查看消费进度

使用Kafka提供的kafka-consumer-groups.sh脚本查看消费组的消费进度。


文章转载自:

http://2M8tR3rt.nxhjg.cn
http://Ix9VhUpW.nxhjg.cn
http://bao2YE4U.nxhjg.cn
http://etNhFBih.nxhjg.cn
http://1j7eTyTE.nxhjg.cn
http://SLkIHwDZ.nxhjg.cn
http://JdTsNYMq.nxhjg.cn
http://hIwAAsEE.nxhjg.cn
http://h6WMLWe4.nxhjg.cn
http://NTPECH2b.nxhjg.cn
http://Rl2Opq0K.nxhjg.cn
http://Sw8KRwAZ.nxhjg.cn
http://rzrfwAKh.nxhjg.cn
http://RWW9ACle.nxhjg.cn
http://tDZKYbES.nxhjg.cn
http://dfqxbI7o.nxhjg.cn
http://DkaBXQ40.nxhjg.cn
http://H2QXNOXl.nxhjg.cn
http://wIJvZT4N.nxhjg.cn
http://FIM6KRWz.nxhjg.cn
http://kNwNOWWx.nxhjg.cn
http://aIFXigAf.nxhjg.cn
http://g9xglGks.nxhjg.cn
http://8NMKn58E.nxhjg.cn
http://J30tMUGU.nxhjg.cn
http://OUlKFtOk.nxhjg.cn
http://9S7mIXAo.nxhjg.cn
http://SppdLOka.nxhjg.cn
http://cmKP6Wn0.nxhjg.cn
http://MBB9Bayu.nxhjg.cn
http://www.dtcms.com/a/151908.html

相关文章:

  • 甘特图Vue3 | 原生绘制
  • leetcode 69和367
  • 构造函数体赋值和初始化列表
  • 面试题:在1亿个数据中取前10个最大的数据(Java实现)
  • 【数据结构】Map与Set结构详解
  • 开源交易所源码,交易所开发
  • 时序数据库IoTDB构建的能源电力解决方案
  • 无人设备遥控之调度自动化技术篇
  • 从岗位依附到能力生态:AI革命下“什么叫就业”的重构与价值
  • Python3(8) 字符串
  • 使用HYPRE库并行装配IJ稀疏矩阵指南: 矩阵预分配和重复利用
  • 数据集-目标检测系列- F35 战斗机 检测数据集 F35 plane >> DataBall
  • 数据分析之技术干货业务价值​​ powerquery 分组排序后取TOP
  • Code Splitting 分包策略
  • 【网络原理】从零开始深入理解TCP的各项特性和机制.(一)
  • 立錡科技优化 HDD、LPDDR、SoC 供电的高性能降压转换器
  • Python实现技能记录系统
  • 【华为OD机试真题】428、连续字母长度 | 机试真题+思路参考+代码解析(E卷)(C++)
  • Browser-Use WebUI:让AI自动使用浏览器帮你查询信息执行任务
  • StableDiffusionPipeline原理解读——引导尺度是如何调整噪声残差的
  • 【C语言经典算法实战】:从“移动距离”问题看矩阵坐标计算
  • 审计效率升级!快速匹配Excel报表项目对应的Word附注序号
  • Ubuntu / WSL 安装pipx
  • E3650工具链生态再增强,IAR全面支持芯驰科技新一代旗舰智控MCU
  • unity使用iTextSharp生成PDF文件
  • 焊接机排错
  • Qt 入门 6 之布局管理
  • spring-ai使用Document存储至milvus的数据结构
  • 【MongoDB + Spark】 技术问题汇总与解决方案笔记
  • JavaScript学习教程,从入门到精通,XMLHttpRequest 与 Ajax 请求详解(25)