当前位置: 首页 > news >正文

Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述

相关文章:

  • Flink CDC将MySQL数据同步到数据湖
  • 结构型设计模式之装饰模式
  • TDengine 高级功能——读缓存
  • 3,信号与槽机制
  • React 18新特性介绍
  • [ Qt ] | 与系统相关的操作(一):鼠标相关事件
  • LangGraph framework
  • FFmpeg移植教程(linux平台)
  • Webpack依赖
  • 【git-首次初始化本地项目、关联远程仓库】
  • 基于Qt的app开发的过渡期
  • 数据库系统概论(十六)数据库安全性(安全标准,控制,视图机制,审计与数据加密)
  • Linux运维笔记:服务器感染 netools 病毒案例
  • PostgreSQL不同的等级认证体系
  • 【Android】MT6835 + MT6631 WiFi进入Meta模式出现WiFi_HQA_OpenAdapter failed
  • BUUCTF[HCTF 2018]WarmUp 1题解
  • 【iOS】ARC 与 Autorelease
  • (未解决)日历清单-扩展屏壁纸显示问题
  • 代码随想录60期day54
  • 定制开发开源AI智能名片驱动下的海报工厂S2B2C商城小程序运营策略——基于社群口碑传播与子市场细分的实证研究
  • wordpress图片位置/济南seo网站优化公司
  • 做网站设计的公司排名/seo管理系统
  • 织梦网站如何做seo/凡科建站怎么收费
  • 网站建设与维护banner/网络营销活动推广方式
  • 建设银行网站app/百度广告代理
  • html简单百度网页代码/想找搜索引擎优化