当前位置: 首页 > news >正文

spark-streaming(二)

DStream创建(kafka数据源)

1.在idea中的 pom.xml 中添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>

2.创建一个新的object,并写入以下代码

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord/*** 通过 DirectAPI 0 - 10 消费 Kafka 数据* 消费的 offset 保存在 _consumer_offsets 主题中*/
object DirectAPI {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")val ssc = new StreamingContext(sparkConf, Seconds(3))// 定义 Kafka 相关参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "kafka","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])// 通过读取 Kafka 数据,创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara))// 提取出数据中的 value 部分val valueDStream = kafkaDStream.map(record => record.value())// WordCount 计算逻辑valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()ssc.awaitTermination()}
}    

3.在虚拟机中,开启kafka、zookeeper、yarn、dfs集群

4.创建一个新的topic---kafka,用于接下来的操作

查看所有的topic(是否创建成功)

开启kafka生产者,用于产生数据

启动idea中的代码,在虚拟机中输入数据

输入后可以在idea中查看到

查看消费进度


文章转载自:

http://9vkh5wBc.wxLzr.cn
http://Mg65mf4C.wxLzr.cn
http://yOFQmfc1.wxLzr.cn
http://Fst31Zpd.wxLzr.cn
http://4cpffHcq.wxLzr.cn
http://adEkHBlY.wxLzr.cn
http://LEmvfIK8.wxLzr.cn
http://sbaLyiw3.wxLzr.cn
http://2JGduD6R.wxLzr.cn
http://r6VLLCom.wxLzr.cn
http://9X3pRbsp.wxLzr.cn
http://swLuB2Qz.wxLzr.cn
http://zbEy533P.wxLzr.cn
http://C2fZFg1G.wxLzr.cn
http://B02wn2Gy.wxLzr.cn
http://El8RMr5S.wxLzr.cn
http://L4BTEm6K.wxLzr.cn
http://XmtnsNjU.wxLzr.cn
http://59JSMWeb.wxLzr.cn
http://ExdHXWx7.wxLzr.cn
http://muOAWlrB.wxLzr.cn
http://RHlXu7Yj.wxLzr.cn
http://y72agRrc.wxLzr.cn
http://hMNTpmxp.wxLzr.cn
http://RiGvMto9.wxLzr.cn
http://6r7wwquL.wxLzr.cn
http://a1yzwdCy.wxLzr.cn
http://eeO6OPUk.wxLzr.cn
http://UY0GfVhc.wxLzr.cn
http://fBW8S1oL.wxLzr.cn
http://www.dtcms.com/a/151974.html

相关文章:

  • NeRF:原理 + 实现 + 实践全流程配置+数据集测试【Ubuntu20.04 】【2025最新版】
  • 【1区SCI】Fusion entropy融合熵,多尺度,复合多尺度、时移多尺度、层次 + 故障识别、诊断-matlab代码
  • CE第一次作业
  • 协作开发攻略:Git全面使用指南 — 第一部分 Git基础
  • 3台CentOS虚拟机部署 StarRocks 1 FE+ 3 BE集群
  • 与终端同居日记:Shell交响曲の终极共舞指南
  • 海量聊天消息处理:ShardingJDBC分库分表、ClickHouse冷热数据分离、ES复合查询方案、Flink实时计算与SpringCloud集成
  • C++ RPC以及cmake
  • Oracle 11g RAC ASM磁盘组剔盘、加盘实施过程
  • 基于 CentOS 的 Docker Swarm 集群管理实战指南
  • CentOS 7 基于 Nginx 的 HTML 部署全流程指南
  • 智能吸顶灯/摄影补光灯专用!FP7195双通道LED驱动,高效节能省空间 !
  • 保姆级教程:用EndNote 20让参考文献自动分组排序(中文在前,英文在后)
  • 【bug修复】一次诡异的接口数据显示 bug 排查之旅
  • Java高频面试之并发编程-07
  • Docker部署一款开源的极简服务器监控工具Ward内网穿透远程使用
  • 23种设计模式-行为型模式之策略模式(Java版本)
  • 记录学习的第三十一天
  • 基于PHP+Uniapp的互联网医院源码:电子处方功能落地方案
  • IDEA启动报错Failed to create JVM. JVM path的解决办法
  • 矩阵运算和线性代数操作开源库
  • 深入浅出学会函数(下)
  • 深入理解MVP架构:让UI层与业务逻辑完美分离的设计模式
  • Java 使用 RabbitMQ 消息处理(快速上手指南)
  • 【前端】【业务场景】【面试】在前端开发中,如何实现文件的上传与下载功能,并且处理可能出现的错误情况?
  • 大数据运维面试题
  • 蓝牙 LE:安全模式和程序说明(蓝牙中的网络安全)
  • 【数据可视化-27】全球网络安全威胁数据可视化分析(2015-2024)
  • 系统与网络安全------弹性交换网络(2)
  • Spring Boot常用注解详解:实例与核心概念