当前位置: 首页 > news >正文

SpringCloud 项目阶段九:Kafka 接入实战指南 —— 从基础概念、安装配置到 Spring Boot 实战及高可用设计

前言:

项目地址:https://gitee.com/whltaoin_admin/hmtt_cloud-project.git

阶段七进度版本号:90fb42d23a7f3dd1045d0d8c2a70e936a41eb45e

本文末尾附带:Fakfa生产者和消费者的配置属性说明书

kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统

有如下特性:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

其核心架构包含Broker服务器集群、主题(Topic)及分区(Partition),通过ZooKeeper协调分布式节点状态。消息持久化采用顺序追加与分段索引机制,配合零拷贝技术和批量压缩实现高吞吐量,副本机制保障数据可靠性。平台提供Producer API、Consumer API及Streams API,支持事件发布订阅、流式处理和跨系统数据集成,主要应用于实时金融交易、物联网数据分析、日志收集和微服务通信等领域。当前稳定版本为3.9.0,支持多语言客户端与主流大数据框架对接。

kafla名称解释

kafla安装配置

安装zookeeper

afka依赖zookeeper,所以需要先安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

效果:

安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=varin.cn \
--env KAFKA_ZOOKEEPER_CONNECT=varin.cn:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://varin.cn:9999 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9999 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
-p 9999:9999 \
wurstmeister/kafka:2.12-2.3.1
参数/配置项说明
docker runDocker启动容器的核心命令
-d容器在后台(detached mode)运行,不阻塞当前终端
--name kafka为容器指定名称为kafka,方便后续通过名称管理容器(如停止、删除等)
--env KAFKA_ADVERTISED_HOST_NAME=varin.cn指定Kafka对外公布的主机名(客户端可通过此主机名连接),新版本中逐渐被KAFKA_ADVERTISED_LISTENERS替代,此处可能用于兼容旧逻辑
--env KAFKA_ZOOKEEPER_CONNECT=varin.cn:2181指定Kafka依赖的Zookeeper服务地址(需确保该地址有可用的Zookeeper服务,Kafka依赖其存储元数据)
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://varin.cn:9999关键参数,指定Kafka向客户端“宣告”的连接地址,客户端将使用此地址建立连接。其中PLAINTEXT为无加密传输协议,varin.cn:9999为客户端实际连接的主机和端口
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9999关键参数,指定Kafka在容器内部实际监听的地址和端口。0.0.0.0表示监听容器内所有网络接口,9999为容器内Kafka绑定的端口
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M"设置Kafka运行的JVM堆内存参数,-Xms256M为初始堆内存,-Xmx256M为最大堆内存(控制内存占用)
-p 9999:9999端口映射,将宿主机的9999端口映射到容器内的9999端口,使外部客户端可通过“宿主机IP:9999”访问容器内的Kafka服务
wurstmeister/kafka:2.12-2.3.1指定使用的Kafka镜像及版本,2.12为Scala版本(Kafka依赖Scala编译),2.3.1为Kafka版本

效果

kaka 入门案例

依赖

<dependency>
<groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

生产者

package cn.varin.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.Properties;/*** 生产者*/
public class PropertiesObject {public static void main(String[] args) {// kafka 配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "varin.cn:9999");props.put(ProducerConfig.RETRIES_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 建立生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String,String>("topic1", "name", "varin"+i));}// 发送消息// 关闭连接producer.close();}
}

消费者

package cn.varin.kafka;// 消费者import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;public class ConsumerObject {public static void main(String[] args) {// kafka 配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "varin.cn:9999");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);// 订阅主题consurmer.subscribe(Collections.singletonList("topic1"));while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}

效果

总结

  1. 如果想要在在一个生产者和多个消费者中,实现一对一效果:
    1. 将所有消费者限定在同一个组中即可
  2. 实现一对多的话:
    1. 消费者分成多个组别即可。

kafka高可用设计详解

集群模式

  • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
  • 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。

备份机制

在kafka的备份机制中,定义了两种副本方式:

领导者副本

追随者副本

当信息发送过来时,会存到领导者中,领导者在存储到追随者(追随者可能有多个)

在追随者中有分为了两类:

ISR:同步存储leader中的数据(和leader存储的数据是高度一致的)

普通:异步存储leader中的数据

同步方式

情况一:leader死亡,在追随者中会先使用isr作为新的leader

情况二:如果isr中的都不行,再选择普通

  1. 假设所有副本都失效了选择方式:

保证一致性:等待irs

保证可用性:谁先活谁最为leader

kafka生成者同步和异步发送信息

同步发送

缺陷:如果发送数据量大的话,会一直占用线程,造成阻塞。

producer.send(new ProducerRecord<String,String>("topic1", "name", "varin"+i));

异步发送

通过在send方法中多加一个callback进行异步发送,可以在异步中添加日志等操作

for (int i = 0; i < 10; i++) {ProducerRecord<String, String>record= new ProducerRecord<>("topic1", "name", "varin" + i);producer.send(record,new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {System.out.println("会记录错误信息");}// 打印偏移量System.out.println(metadata.offset());}});}

kafka消费者详解

消费者组

  • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
  • 一个发布在Topic上消息被分发给此消费者组中的一个消费者
    • 所有的消费者都在一个组中,那么这就变成了**queue模型**
    • 所有的消费者都在不同的组中,那么就完全变成了**发布-订阅模型**

消息有序性

应用场景:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
  • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

消费者手提提交的三种模式

kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡。

提交偏移量的方式有两种,分别是自动提交偏移量和手动提交

  • 自动提交偏移量

当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去

  • 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式
    • 提交当前偏移量(同步提交)
    • 异步提交
    • 同步和异步组合提交

手动提交:同步

// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());try {consurmer.commitSync();}catch (Exception e){e.printStackTrace();}}
}

手动提交:异步

// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}consurmer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.out.println("error");}System.out.println(offsets);}});
}

手动提交:同步异步组合

// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
try {while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}// 异步consurmer.commitSync();}
}catch (Exception e){e.printStackTrace();System.out.println(e.getMessage());
}finally {//同步consurmer.commitSync();
}

kafkaj接入SpringBoot示例

  1. 依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--        kafka--><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
  1. 配置项
server:port: 10000
spring:application:name: kafka-testkafka:bootstrap-servers: "varin.cn:9999"producer:key-serializer: "org.apache.kafka.common.serialization.StringSerializer"value-serializer: "org.apache.kafka.common.serialization.StringSerializer"consumer:key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"value-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"group-id: ${spring.application.name}
  1. Controller
package cn.varin.kafka.controller;import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SendKafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/send")public void send(){kafkaTemplate.send("topic1", "varin");}
}
  1. listener建立
package cn.varin.kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {@KafkaListener(topics = "topic1")public void onMessage(String message){System.out.println(message);}
}
  1. 测试
  1. 访问路径:http://localhost:10000/send

传递对象为对象解析方式

  1. 说明

因为我们在配置文件中以及限定了kafka解析key和value的类型了。

所以当我们需要传递对象时,需要先将对象转换成String字符串,

接收到消息时,再将对应的JSON字符串转成对应的对象。

示例

  1. 自定义类
package cn.varin.kafka.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {private String name;private Integer age;}
  1. Controller类
package cn.varin.kafka.controller;import cn.varin.kafka.pojo.User;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SendKafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/send")public void send(){kafkaTemplate.send("topic1", "varin");}@GetMapping("/sendUser")public void sendUser(){kafkaTemplate.send("topic_user", JSON.toJSONString(new User("varin",1)));}
}
  1. listener类
package cn.varin.kafka.listener;import cn.varin.kafka.pojo.User;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {@KafkaListener(topics = "topic1")public void onMessage(String message){System.out.println(message);}@KafkaListener(topics = "topic_user")public void onMessageToUser(String message){System.out.println(JSON.parseObject(message, User.class));}
}
  1. 效果


加油呀~

http://www.dtcms.com/a/406167.html

相关文章:

  • 徐州企业建站模板一个网站的制作过程
  • phpmysql网站开发项目式教程房地产开发公司招聘
  • python+springboot+uniapp基于微信小程序的巴马旅居养老系统 旅游养老小程序
  • 阿里云无影发布首个Agentic Computer形态的个人计算产品
  • PHP 8.0+ 元编程与编译时优化:构建下一代PHP框架
  • aws用ami新创建之后用密码登录不了
  • 安科瑞Acrel-1000DP分布式光伏监控系统:赋能光储充一体化,光功率预测助力电站高效运维与收益提升
  • 网站建设引擎旅游网站开发系统
  • 建设银行网站点不进去了怎么办网站全站开发
  • 【Java后端】Spring Boot 比 Spring 的优势:以 RESTful 接口开发为例 一文详解
  • 计算机软件工程毕设项目推荐—基于协同过滤算法的理财产品推荐系统(采用余弦相似度计算推荐,Python,Flask,Vue,Mysql,B/S架构)
  • docker-卷
  • 电子行业如何通过MES管理系统实现柔性制造,应对订单波动?
  • ​​[硬件电路-324]:芯片根据功能、信号类型、应用场景、制造工艺、集成度及设计理念等多个维度进行分类
  • 扶沟县建设局网站网络规划与设计教程
  • 文化传播公司网站模版网站建设哪好
  • Charles 抓包 HTTPS 原理详解,从 CONNECT 到 SSL Proxying、常见问题与真机调试实战(含 Sniffmaster 补充方案)
  • LeetCode 135.分发糖果
  • 计算机视觉:OpenCV+Dlib 人脸检测
  • 开源 C# 快速开发(二)基础控件
  • 安庆公司做网站国外开源商城系统
  • 烟台哪家公司可以做网站灌云县建设局网站
  • 基于sprintboot+vue的智慧辅助学习系统(源码+论文+部署+安装)
  • 基于阿里云系列平台的python微服务设计与DevOps实践
  • 山东临沂网站开发免费的推广网站
  • PAT乙级_1047 编程团体赛_Python_AC解法_无疑难点
  • SystemVerilog小白入门1, iverilog+VScode
  • 微算法科技(NASDAQ: MLGO)融合二次矩阵变换模型,研发基于区块链的可溯源IP版权保护算法
  • 示范校建设验收网站做ppt图片用的网站
  • 新宁县建设局网站沭阳网站建设多少钱