SpringCloud 项目阶段九:Kafka 接入实战指南 —— 从基础概念、安装配置到 Spring Boot 实战及高可用设计
前言:
项目地址:https://gitee.com/whltaoin_admin/hmtt_cloud-project.git
阶段七进度版本号:90fb42d23a7f3dd1045d0d8c2a70e936a41eb45e
本文末尾附带:Fakfa生产者和消费者的配置属性说明书
kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统
有如下特性:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
其核心架构包含Broker服务器集群、主题(Topic)及分区(Partition),通过ZooKeeper协调分布式节点状态。消息持久化采用顺序追加与分段索引机制,配合零拷贝技术和批量压缩实现高吞吐量,副本机制保障数据可靠性。平台提供Producer API、Consumer API及Streams API,支持事件发布订阅、流式处理和跨系统数据集成,主要应用于实时金融交易、物联网数据分析、日志收集和微服务通信等领域。当前稳定版本为3.9.0,支持多语言客户端与主流大数据框架对接。
kafla名称解释
kafla安装配置
安装zookeeper
afka依赖zookeeper,所以需要先安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
效果:
安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=varin.cn \
--env KAFKA_ZOOKEEPER_CONNECT=varin.cn:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://varin.cn:9999 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9999 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
-p 9999:9999 \
wurstmeister/kafka:2.12-2.3.1
参数/配置项 | 说明 |
---|---|
docker run | Docker启动容器的核心命令 |
-d | 容器在后台(detached mode)运行,不阻塞当前终端 |
--name kafka | 为容器指定名称为kafka ,方便后续通过名称管理容器(如停止、删除等) |
--env KAFKA_ADVERTISED_HOST_NAME=varin.cn | 指定Kafka对外公布的主机名(客户端可通过此主机名连接),新版本中逐渐被KAFKA_ADVERTISED_LISTENERS 替代,此处可能用于兼容旧逻辑 |
--env KAFKA_ZOOKEEPER_CONNECT=varin.cn:2181 | 指定Kafka依赖的Zookeeper服务地址(需确保该地址有可用的Zookeeper服务,Kafka依赖其存储元数据) |
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://varin.cn:9999 | 关键参数,指定Kafka向客户端“宣告”的连接地址,客户端将使用此地址建立连接。其中PLAINTEXT 为无加密传输协议,varin.cn:9999 为客户端实际连接的主机和端口 |
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9999 | 关键参数,指定Kafka在容器内部实际监听的地址和端口。0.0.0.0 表示监听容器内所有网络接口,9999 为容器内Kafka绑定的端口 |
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" | 设置Kafka运行的JVM堆内存参数,-Xms256M 为初始堆内存,-Xmx256M 为最大堆内存(控制内存占用) |
-p 9999:9999 | 端口映射,将宿主机的9999端口映射到容器内的9999端口,使外部客户端可通过“宿主机IP:9999”访问容器内的Kafka服务 |
wurstmeister/kafka:2.12-2.3.1 | 指定使用的Kafka镜像及版本,2.12 为Scala版本(Kafka依赖Scala编译),2.3.1 为Kafka版本 |
效果
kaka 入门案例
依赖
<dependency>
<groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
生产者
package cn.varin.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.Properties;/*** 生产者*/
public class PropertiesObject {public static void main(String[] args) {// kafka 配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "varin.cn:9999");props.put(ProducerConfig.RETRIES_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 建立生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String,String>("topic1", "name", "varin"+i));}// 发送消息// 关闭连接producer.close();}
}
消费者
package cn.varin.kafka;// 消费者import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;public class ConsumerObject {public static void main(String[] args) {// kafka 配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "varin.cn:9999");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);// 订阅主题consurmer.subscribe(Collections.singletonList("topic1"));while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}
效果
总结
- 如果想要在在一个生产者和多个消费者中,实现一对一效果:
- 将所有消费者限定在同一个组中即可
- 实现一对多的话:
- 消费者分成多个组别即可。
kafka高可用设计详解
集群模式
- Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
- 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。
备份机制
在kafka的备份机制中,定义了两种副本方式:
领导者副本
追随者副本
当信息发送过来时,会存到领导者中,领导者在存储到追随者(追随者可能有多个)
在追随者中有分为了两类:
ISR:同步存储leader中的数据(和leader存储的数据是高度一致的)
普通:异步存储leader中的数据
同步方式
情况一:leader死亡,在追随者中会先使用isr作为新的leader
情况二:如果isr中的都不行,再选择普通
- 假设所有副本都失效了选择方式:
保证一致性:等待irs
保证可用性:谁先活谁最为leader
kafka生成者同步和异步发送信息
同步发送
缺陷:如果发送数据量大的话,会一直占用线程,造成阻塞。
producer.send(new ProducerRecord<String,String>("topic1", "name", "varin"+i));
异步发送
通过在send方法中多加一个callback进行异步发送,可以在异步中添加日志等操作
for (int i = 0; i < 10; i++) {ProducerRecord<String, String>record= new ProducerRecord<>("topic1", "name", "varin" + i);producer.send(record,new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {System.out.println("会记录错误信息");}// 打印偏移量System.out.println(metadata.offset());}});}
kafka消费者详解
消费者组
- 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
- 一个发布在Topic上消息被分发给此消费者组中的一个消费者
- 所有的消费者都在一个组中,那么这就变成了**queue模型**
- 所有的消费者都在不同的组中,那么就完全变成了**发布-订阅模型**
消息有序性
应用场景:
- 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
- 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
消费者手提提交的三种模式
kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡。
提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
- 自动提交偏移量
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
- 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式
- 提交当前偏移量(同步提交)
- 异步提交
- 同步和异步组合提交
手动提交:同步
// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());try {consurmer.commitSync();}catch (Exception e){e.printStackTrace();}}
}
手动提交:异步
// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}consurmer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.out.println("error");}System.out.println(offsets);}});
}
手动提交:同步异步组合
// 设置为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key.serializer.encoding" : "value.serializer.encoding";// 建立消费者对象
KafkaConsumer<String, String> consurmer = new KafkaConsumer<String,String >(props);
// 订阅主题
consurmer.subscribe(Collections.singletonList("topic1"));
try {while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consurmer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.printf(consumerRecord.key());System.out.println(consumerRecord.value());}// 异步consurmer.commitSync();}
}catch (Exception e){e.printStackTrace();System.out.println(e.getMessage());
}finally {//同步consurmer.commitSync();
}
kafkaj接入SpringBoot示例
- 依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- kafka--><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
- 配置项
server:port: 10000
spring:application:name: kafka-testkafka:bootstrap-servers: "varin.cn:9999"producer:key-serializer: "org.apache.kafka.common.serialization.StringSerializer"value-serializer: "org.apache.kafka.common.serialization.StringSerializer"consumer:key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"value-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"group-id: ${spring.application.name}
- Controller
package cn.varin.kafka.controller;import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SendKafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/send")public void send(){kafkaTemplate.send("topic1", "varin");}
}
- listener建立
package cn.varin.kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {@KafkaListener(topics = "topic1")public void onMessage(String message){System.out.println(message);}
}
- 测试
- 访问路径:http://localhost:10000/send
传递对象为对象解析方式
- 说明
因为我们在配置文件中以及限定了kafka解析key和value的类型了。
所以当我们需要传递对象时,需要先将对象转换成String字符串,
接收到消息时,再将对应的JSON字符串转成对应的对象。
示例
- 自定义类
package cn.varin.kafka.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {private String name;private Integer age;}
- Controller类
package cn.varin.kafka.controller;import cn.varin.kafka.pojo.User;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SendKafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/send")public void send(){kafkaTemplate.send("topic1", "varin");}@GetMapping("/sendUser")public void sendUser(){kafkaTemplate.send("topic_user", JSON.toJSONString(new User("varin",1)));}
}
- listener类
package cn.varin.kafka.listener;import cn.varin.kafka.pojo.User;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {@KafkaListener(topics = "topic1")public void onMessage(String message){System.out.println(message);}@KafkaListener(topics = "topic_user")public void onMessageToUser(String message){System.out.println(JSON.parseObject(message, User.class));}
}
- 效果
加油呀~