当前位置: 首页 > news >正文

Kafka消息中间件

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

window中的安装

①、下载并解压kafka压缩包,进入config目录下修改zookeeper.properties配置文件

因为kafka内置了zookeeper,所以不需安装zookeeper。设置zookeeper数据存储位置,如果该路径不存在,则自动创建

dataDir = E:/kafka/data/zk

②、进入bin目录下(linux脚本命令),进入window下属于window命令

cd bin/windows

执行:zookeeper-server-start.bat …/…/config/zookeeper.properties

为了后续的方便启动,可以创建一个zk.cmd文件

文件内容如下:
在这里插入图片描述

③、进入kafka的config目录下,修改server.properties配置文件

log.dirs=E:/kafka/local/data/

④、进入bin/windows目录下启动:kafka-server-start.bat …/…/config/server.properties

为了后续的方便启动,可以创建一个kfk.cmd文件

文件内容如下:
在这里插入图片描述

⑤、查看启动后的进程
在这里插入图片描述

集群规划安装

①、准备三台服务器
在这里插入图片描述
②、在其中一台服务器上操作:
在这里插入图片描述
server.properties主要修改的内容:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

③、通过命令xsync kafka 将其分发到其他服务器,并修改每台服务器的server.properties中broker.id的值
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
④、配置kafka环境变量
在这里插入图片描述
在这里插入图片描述
将xsync脚本分发到其他服务器
在这里插入图片描述

④、启动zookeeper:sk.sh start

启动kafka:
在这里插入图片描述

启动、停止脚本
在这里插入图片描述

#!/bin/bashcase $1 in
"start")for i in hadoop102 hadoop103 hadoop104 doecho "---启动 $i kafka ---"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -ddaemon /opt/module/kafka/config/server.properties" done
;;"stop")for i in hadoop102 hadoop103 hadoop103doecho "---停止 $i kafka ---"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh" done
;;
esac

在这里插入图片描述

命令行操作:

  • 主题 kafka-topic.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first
    (3) --create
    (4) --delete
    (5) --partitions 分区
    (6) --replication-factor 副本

  • 生产者 kafka-console-producer.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first

  • 消费者 kafka-console-consumer.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first

window命令行

主题创建
在这里插入图片描述
如果JDK版本过低,会出现很多日志,所以提高JDK的版本,再创建主题
在这里插入图片描述
可以修改以下文件,重新设置JDK环境变量:
在这里插入图片描述

查看(包括详细查看):
在这里插入图片描述
修改:
在这里插入图片描述

生产者和消费者
在这里插入图片描述

linux命令行
在这里插入图片描述

bin/kafka-topics.sh # 显示所有操作选项bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list #查看当前服务器所有的topic#创建first主题,指定分区,指定副本数
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3

在这里插入图片描述

# 修改分区数,只能增加,不能减少
bin/kafka-topics.sh --bootstrap=server hadoop102:9092 --topic first --alter --partitions 3
# 生产者连接 hadoop102:9092的first主题,生产消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello# 消费者相对应地消费消息
bin/kafla-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstbin/kafla-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning #查看消费所有的历史记录

代码操作

①、创建项目并添加依赖
在这里插入图片描述
在这里插入图片描述

②、生产者

public class KafkaProducerTest{public static void main(String[] args){//创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//对key/value进行序列化configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者KafkaProducer<String,String> producer = new KafkaProducer<>(configMap);//创建数据ProducerRecord<String,String> record = new ProducerRecord<>("test","key","value");//通过生产者对象将数据发送到kafkaproducer.send(record);//关闭生产者对象producer.close();}
}

③、消费者

public class KafkaConsumerTest{public static void main(String[] args){Map<String,Object> consumerConfig = new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_ERVERS_CONFIG,"localhost:9092");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//对key/value进行反序列化consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu")KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));//从kafka的主题中获取数据,参数为超时时间final ConsumerRecords<String,String> datas = consumer.poll(100);for(ConsumerRecord<String,String> data:datas){System.out.println(data);}//关闭消费者对象consumer.close();}
}

Kafka工具

在这里插入图片描述
双击安装之后
在这里插入图片描述
在这里插入图片描述
创建主题
在这里插入图片描述
添加数据
在这里插入图片描述

查看数据
在这里插入图片描述

生产者

在这里插入图片描述
生产者三种发送方式:

  • 异步发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu”+i));
  • 异步回调发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu” + i),new Callback(){}
  • 同步发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu”+i)).get();
public class CustomProducer{public static void main(){//配置Properteis properties = new Properties();//连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(PorducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(PorducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.创建Kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//2.发送数据for(int i=0;i<5;i++){//异步发送kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));	//回调异步发送kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i),new Callback(){@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception){if(exception == null){System.out.println("主题:"+metadata.topic()+"分区:"+metadata.partition());}}});//发送同步数据kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();}//3.关闭资源kafkaProducer.close();}
}

分区

  • 默认分区
  • 自定义分区
//参数中指定分区
kafkaProducer.send(new ProducerRecord<>("first",1,"","atguigu"+i),new Callback(){@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception){if(exception == null){System.out.println("主题:"+metadata.topic()+" 分区: "+metadata.partition());}}
});

在这里插入图片描述

自定义分区器

①、发送过来的数据如果包含atguigu,就发往0号分区;不包含atguigu,就发往1号区

②、分区策略配置

public class MyPartitioner implements Partitioner{@Overridepublic int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,cluster cluster){String msgValues = value.toString();int patition;if(msgValues.contains("atguigu")){partition = 0;}else{partition = 1;}return partition;}@Overridepublic void close(){}@Overridepublic void configure(Map<String,?> configs){}
}

③、生产者

//配置
Properteis properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(PorducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(PorducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//关联自定义分区策略
properties.put(ProducerConfig.APRITIONER_CLASS_CONFIG,"MyPartitioner");KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//消息发送
//........

提供生产者吞吐量

四个提高吞吐量的参数

public class CustomProducerParameters{public static void main(){//配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓存区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.msproperties.put(ProducerConfig.LINGER-MS_CONFIG,1);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//发送数据for(int i = 0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}//关闭资源kafkaProducer.close();}
}

数据可靠性
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

public class CustomProducerParameters{public static void main(){//配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//acks,数据的可靠性properties.put(ProducerConfig.ACKS_CONFIG,"1");//重复次数properties.put(ProduceConfig.RETRIES_CONFIG,3);//发送数据for(int i = 0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}//关闭资源kafkaProducer.close();}
}

事务
在这里插入图片描述

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERRIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");//指定事务idKafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);kafkaProducer.initTransactions();//初始化
kafkaProducer.beginTransaction();//开始事务try{for(int i = 0;i < 5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));kafkaProducer.commitTransaction();//提交事务
}
}catch(Exception e){kafkaProducer.abortTransaction();//终止事务
}finally{kafkaProducer.close();
}

数据乱序
在这里插入图片描述

Kafka_Broker

连接zookeeper工具
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

服役新节点

①、关闭hadoop104(克隆的方式,不需重新安装JDK,Kafka),开启hadoop105(修改IP)
在这里插入图片描述

在这里插入图片描述

②、hadoop105上,修改主机名称hadoop105
在这里插入图片描述
删除克隆后,原先主机的内容
在这里插入图片描述
修改kafka/config目录下的配置文件server.properties内容

  • broker.id=3

③、启动hadoop105的kafka

bin/kafka-server-start.sh -daema config/server.properties

④、执行负载均衡操作

创建一个要均衡的主题(可以对多个主题):vim topics-to-move.json

{"topics":[{"topic":"first"}],"version":1
}

在这里插入图片描述
为4台服务器生成负载均衡计划
在这里插入图片描述

⑤、创建副本存储计划(所有副本存储在broker0,broker1,broker2,broker3)

将第④步的生成计划复制到lication-factor.json文件中
在这里插入图片描述

执行副本存储计划
在这里插入图片描述

验证副本存储计划
在这里插入图片描述

退役旧节点

①、执行负载均衡操作,创建一个要均衡的主题
在这里插入图片描述

②、创建执行计划
在这里插入图片描述
将生成的内容复制到:vim increase-replication-factor.json

③、创建副本存储计划(所有副本存储在broker0 broker1 broker2)
在这里插入图片描述

④、验证副本计划
在这里插入图片描述

副本
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
依照命令创建主题atguigu2,并查看详细情况
在这里插入图片描述
进入对应的服务器中,停用对应的kafka,再查看详细情况
在这里插入图片描述
进入对应的服务器中,恢复对应的kafka,再查看详细情况
在这里插入图片描述

分区副本分配

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产环境中,每台服务器的配置和性能不一致,但是kafka只会根据自己的代码规则创建对应的分区副本
就会导致个别的服务器存储压力较大,所以需要手动调整分区副本的存储

需求:创建一个新的topic,4个分区,两个副本,名称为three。
将该topic所有副本存储到broker0和broker1两台服务器上
在这里插入图片描述
手动调整分区副本存储:
在这里插入图片描述
在这里插入图片描述

增加副本因子

通过命令无法提高副本,需要通过文件计划的方式
在这里插入图片描述

文件存储

在这里插入图片描述

topic数据存储的位置
在这里插入图片描述
在这里插入图片描述

文件清理策略
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

public class CustomerConsumer{public static void main(String[] args){//配置Properties properteis = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//连接properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建一个消费者KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);//订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}
}

在这里插入图片描述

public class CustomerConsumer{public static void main(String[] args){//配置Properties properteis = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//连接properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建一个消费者KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);//订阅主题对应的分区ArrayList<String> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));//指定分区发送数据kafkaConsumer.subscribe(topicPartitions);//消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}
}

分区的分配以及再平衡

  • Range以及再平衡
  • RoundRobin以及再平衡
  • Sticky以及再平衡

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

自动提交offset
在这里插入图片描述

//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

手动提交offset
在这里插入图片描述

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//.......while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}//手动提交offsetkafkaConsumer.commitSync();//同步提交//kafkaConsumer.commitAsync(); 异步提交
}

指定offset消费
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

按照时间进行消费
在这里插入图片描述

重复消费:已经消费的数据,但是offset没提交
漏消费:先提交offset后消费,有可能会造成数据的漏消费
在这里插入图片描述

消费者事务

  • 生产端——》集群
  • 集群——》消费者
  • 消费者——》下游的框架

数据积压(消费者如何提高吞吐量)

  • 增加分区,增加消费者个数
  • 生产=》集群 4个参数

在这里插入图片描述

Kafka-Eagle框架监控

监控kafka集群

①、准备MySQL
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

②、Kafka环境准备
在这里插入图片描述

③、Kafka-Eagle安装

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
访问用户名admin 密码123456
在这里插入图片描述

Kafka-Kafka模式

取代Zookeeper

在这里插入图片描述
在这里插入图片描述

集群部署
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka-Kafka模式集群启动和停止脚本

在这里插入图片描述

相关文章:

  • K 值选对,准确率翻倍:KNN 算法调参的黄金法则
  • 【Python进阶】元编程、并发
  • 《STL--stack 和 queue 的使用及其底层实现》
  • 《数据结构初阶》【番外篇:二路归并的外排史诗】
  • 流媒体基础解析:视频清晰度的关键因素
  • 当前用户的Git全局配置情况:git config --global --list
  • MySQL 读懂explain 执行计划
  • 性能优化 - 理论篇:常见指标及切入点
  • M4Pro安装ELK(ElasticSearch+LogStash+Kibana)踩坑记录
  • uniapp调试,设置默认展示的toolbar内容
  • Java 单例模式详解
  • 通过mqtt 点灯
  • 【Kotlin】数字字符串数组集合
  • go|channel源码分析
  • 视频监控联网系统GB28181协议中事件通知流程详解以及通知失败常见原因
  • 如何避免 N+1 查询问题
  • Acrobat DC v25.001 最新专业版已破,像word一样编辑PDF!
  • 4.2.5 Spark SQL 分区自动推断
  • 使用MCP和Ollama本地创建AI代理:实操教程
  • elasticsearch低频字段优化
  • 相亲网站男人拉我做外汇/广点通投放平台
  • 做企业网站的合同/随州seo
  • 做网站的伪原创怎么弄/搜索引擎原理
  • 女人做绿叶网站相亲拉人/seo舆情优化
  • 学校展示型网站建设方案书/重庆 seo
  • 网站主题定位/运营商大数据精准营销