Spring Boot + Kafka 全面实战案例
🌟 Spring Boot + Kafka 全面实战案例(覆盖市面 99% 场景)
本文使用 Spring Boot 3 + Kafka 3.x,演示 Kafka 在微服务中的完整应用场景,包括生产者、消费者、事务、幂等、批量消费、Streams、延迟消息、死信队列等。
📖 目录
- 项目简介
- 环境准备
- 项目结构
- 配置 Kafka
- Kafka 生产者示例
- Kafka 消费者示例
- 事务与幂等生产
- 批量消费与并发消费
- Kafka Streams 实时处理
- 延迟消息 & Dead Letter Queue
- 消息序列化与自定义对象传输
- 测试与调试
- 总结
- 源码
📝 项目简介
- Spring Boot 版本:3.2.x
- Kafka 版本:3.x
- JDK:17
- 覆盖场景:
- 普通生产者/消费者
- 事务消息 & 幂等生产
- 批量 & 并发消费
- Streams 流处理
- 延迟消息 & 死信队列
- 对象序列化与 JSON 消息
- Spring Cloud Stream 方式集成
💻 环境准备
- Kafka 集群(单机或多节点)
- Zookeeper(Kafka 依赖)
Docker Compose 示例
version: '3.8'services:zookeeper:image: bitnami/zookeeper:3.8container_name: zookeeperports:- "2181:2181"environment:ALLOW_ANONYMOUS_LOGIN: "yes"restart: unless-stoppedkafka:image: bitnami/kafka:3.5container_name: kafkaports:- "9092:9092"- "29092:29092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeperrestart: unless-stopped
📂 项目结构
springboot-kafka-demo
├─ src/main/java/com/example/kafka
│ ├─ config
│ │ ├─ KafkaAdminConfig.java
│ │ ├─ KafkaConsumerConfig.java
│ │ ├─ KafkaProducerConfig.java
│ │ └─ KafkaStreamConfig.java
│ ├─ controller
│ │ └─ KafkaController.java
│ ├─ service
│ │ ├─ KafkaConsumerService.java
│ │ ├─ KafkaLogService.java
│ │ ├─ KafkaProducerService.java
│ │ └─ KafkaTopicService.java
│ ├─ dto
│ │ └─ UserQueryLog.java
│ └─ SpringbootKafkaDemoApplication.java
└─ src/main/resources└─ application.yml
⚙️ 配置 Kafka
server:port: 8081# kafka
spring:kafka:bootstrap-servers: 192.168.3.150:29092 # Kafka broker 地址列表,客户端连接 Kafka 集群的入口,格式 host:port,多节点用逗号分隔# Producer(生产者)配置producer:# 生产者发送消息时 key 的序列化器,将 Java 对象转换为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 生产者发送消息时 value 的序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 消息发送确认机制: all, none, lead (default: all),"all" 表示等待所有 ISR 副本确认,保证最高可靠性acks: all# 发送失败时重试次数,默认是 0(不重试)retries: 3# 批量发送的消息大小(字节),达到该大小后批量发送batch-size: 16384# 消息压缩类型,支持 none(无)、gzip、snappy、lz4、zstd,提高网络传输效率compression-type: snappy# 生产者网络请求缓冲区大小,单位字节(default: 131072)buffer-memory: 131072# 生产者的其他属性properties:# 是否开启幂等性,防止重复消息发送,确保消息不重复写入enable.idempotence: true# 最大的未确认请求数,设置为 1 可以保证严格的顺序写入 (default: 5)max.in.flight.requests.per.connection: 1# Consumer(消费者)配置consumer:# 消费者组 ID,Kafka 通过该 ID 管理消费者组协调消费分区group-id: my-group-id# key 的反序列化器,将字节数组转换为 Java 对象key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# value 的反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 自动提交偏移量的时间间隔(毫秒)auto-commit-interval: 1000# 是否开启自动提交偏移量,true 表示 Kafka 客户端自动提交消费进度(default: true)enable-auto-commit: false # 禁用自动提交# 新消费组第一次消费时从哪里开始消费,earliest 表示从头开始消费,latest 表示最新消息开始消费auto-offset-reset: earliest# 每次 poll 拉取的最大记录数max-poll-records: 10# 消费者其他属性properties:# 会话超时时间(毫秒),超过该时间未收到心跳则认为消费者离线(default: 10000)session.timeout.ms: 10000# 心跳间隔(毫秒),定期发送心跳保持与 Kafka 协调器连接(default: 3000)heartbeat.interval.ms: 3000# 最大轮询间隔(毫秒),超过此时间未调用 poll 方法则认为消费者失效(default: 5000)max.poll.interval.ms: 5000# 指定消息值的反序列化器,将 Kafka 中的 JSON 数据转换为 Java 对象。spring.json.trusted.packages: '*'# Listener(消息监听器)配置listener:# 消费者监听器的确认模式,RECORD 表示每条消息确认,其他还有 MANUAL、TIME 等 (MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)ack-mode: MANUAL # 手动 ack# 监听器的并发线程数,提高消费吞吐量concurrency: 3# 监听消息的类型,batch 表示批量消息消费,single 表示单条消息消费type: batch# KafkaTemplate 默认配置template:# 发送消息时默认的主题名default-topic: default-topicstreams:application-id: log-aggregation-appproperties:default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerdedefault.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerdelogging:level:org.springframework.web: INFOorg.example.microservice: DEBUG
🚀 Kafka 生产者示例
package com.example.kafka.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;/*** Kafka生产者服务*/
@Service
@Slf4j
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题** @param topic 目标主题* @param message 消息内容*/public void sendMessage(String topic, String message) {log.info("Producing message to topic {}: {}", topic, message);// 发送消息,不关心结果kafkaTemplate.send(topic, message);}/*** 发送消息并异步处理结果** @param topic 目标主题* @param key 消息键* @param message 消息内容*/public void sendMessageWithCallback(String topic, String key, String message) {log.info("Producing message to topic {} with key {}: {}", topic, key, message);kafkaTemplate.executeInTransaction(kt -> {// 发送消息并添加回调CompletableFuture<SendResult<String, String>> future = kt.send(topic, key, message);future.thenAccept(result -> {log.info("Sent message successfully to topic {} with offset {}",result.getRecordMetadata().topic(),result.getRecordMetadata().offset());}).exceptionally(ex -> {log.error("Failed to send message to topic {}", topic, ex);return null;});return true;});}
}
调用示例:
package com.example.kafka.controller;import com.alibaba.fastjson2.JSONObject;
import com.example.kafka.service.KafkaLogService;
import com.example.kafka.service.KafkaProducerService;
import com.example.kafka.service.KafkaTopicService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.Map;
import java.util.Set;@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {@AutowiredKafkaProducerService producerService;@Autowiredprivate KafkaTopicService topicService;@Autowiredprivate KafkaLogService logService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {producerService.sendMessage("default-topic", message);return "Message sent: " + message;}@GetMapping("/send")public String sendMessage() {JSONObject json = new JSONObject();json.put("level", "ERROR");producerService.sendMessage("application-logs", json.toString());return "Message sent: " + json;}/*** 发送消息*/@GetMapping("/send/{topic}/{key}/{message}")public String sendMessage(@PathVariable String topic, @PathVariable String key, @PathVariable String message) {producerService.sendMessageWithCallback(topic, key, message);return "topic sent: " + topic + ",key sent: " + key + ",Message sent: " + message;}}
发送消息:http://localhost:8081/kafka/send/测试发送
📥 Kafka 消费者示例
package com.example.kafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.nio.charset.StandardCharsets;
import java.util.List;/*** | 概念 | 作用 |* | --------- | -------------------------------- |* | **Topic** | 消息分类容器(生产者写入、消费者读取) |* | **Group** | 消费者逻辑分组,控制消费并发与 offset 提交 |* | **Key** | 控制消息落入哪个 Partition(分区),从而控制消费顺序性 |* 丨* Kafka消费者服务** @author wkj*/
@Service
@Slf4j
public class KafkaConsumerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaConsumerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;this.kafkaTemplate.setTransactionIdPrefix("txn-");}/*** 监听单个消息** @param consumeRecord 消费的消息记录*/@KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}")public void listenSingleMessage(ConsumerRecord<String, String> consumeRecord) {log.info("监听单个消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());}/*** 监听批量消息** @param consumeRecords 批量消息记录* @param ack 手动确认对象(当配置为手动确认时使用)*//*containerFactory = "kafkaListenerContainerFactory":使用哪个 Kafka 监听容器工厂(ConcurrentKafkaListenerContainerFactory)来创建消费者监听容器。该容器 在 KafkaConsumerConfig 配置中配置了批量消费,因此这里使用 kafkaListenerContainerFactory*/@KafkaListener(topics = "my-batch", containerFactory = "kafkaListenerContainerFactory")public void listenBatchMessages(List<ConsumerRecord<String, String>> consumeRecords, Acknowledgment ack) {try {log.info("批量消息数量: {}", consumeRecords.size());for (ConsumerRecord<String, String> consumeRecord : consumeRecords) {log.info("监听单个消息,主题:{},key:{},value: {},偏移量:{},分区:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), consumeRecord.offset(), consumeRecord.partition());// 处理逻辑 ...}// 处理成功后手动 ackack.acknowledge();} catch (Exception e) {log.error("处理失败,提交 offset,失败原因:{}", e.getMessage());// 不调用 ack.acknowledge(),Kafka 将在下一次重新投递}}/*** 监听特定主题的消息 (类似于 监听单个消息, 只需要自己设置topics,和groupId)** @param message 消息内容*/@KafkaListener(topics = "custom-topic", groupId = "custom-group")public void listenCustomTopic(String message) {log.info("Received message from custom topic: {}", message);}/*** 手动提交偏移量示例** @param consumeRecord 消费者记录* @param ack 手动确认对象*/@KafkaListener(topics = "reliable-topic", groupId = "reliable-group")public void consumeReliably(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {try {log.info("手动提交偏移量示例,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 模拟业务处理// ..........// 手动提交偏移量ack.acknowledge();log.info("已提交偏移量");} catch (Exception e) {// 不提交偏移量,消息会被重新消费log.error("不提交偏移量,消息会被重新消费,失败原因:{}", e.getMessage());}}@KafkaListener(topics = "dead-letter-topic", groupId = "dlt-group")public void listenDLT(ConsumerRecord<String, String> consumeRecord) {log.warn("死信队列接收到失败消息:Key:{},value:{}", consumeRecord.key(), consumeRecord.value());}private static final int MAX_RETRIES = 3;private static final String DEAD_LETTER_TOPIC = "dead-letter-topic";/*** 消费者重试机制,适用于消费失败的消息,会根据重试次数进行重试。* 超过最大重试次数后,消息会被发送到死信队列。** @param consumeRecord 消费者记录* @param ack 手动确认对象*/@KafkaListener(topics = "retryable-topic", groupId = "retryable-group")public void consumeWithRetry(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 从消息头中提取重试次数int retryCount = extractRetryCount(consumeRecord.headers());try {log.info("手动提交偏移量示例,主题:{},key:{},value: {},重试次数:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), retryCount);// 业务处理// .......ack.acknowledge();} catch (Exception e) {retryCount++;if (retryCount <= MAX_RETRIES) {log.info("重试失败,失败次数:{},消息:{}", retryCount, consumeRecord.value());// 更新消息头中的重试次数并重新发送到原主题sendWithRetryHeader(consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), retryCount);} else {log.error("超过最大重试次数,消息:{}", consumeRecord.value());// 发送到死信队列kafkaTemplate.send(DEAD_LETTER_TOPIC, consumeRecord.key(), consumeRecord.value());ack.acknowledge(); // ❗转发到 DLQ 后,手动 ack,防止 Kafka 一直重复投递}}}/*** Kafka支持事务性消息** @param consumeRecord 消费者记录* @param ack 手动确认对象*//*Kafka 能做到的是“事务性生产 + 最终一致性消费”,不等于数据库级别的全局分布式事务。如果你要跨 Kafka 与数据库、Redis、HTTP 服务等实现一致性,请考虑:✅ 幂等性设计(业务唯一 key)✅ 手动提交 offset(控制消费确认时机)✅ 本地消息表 + 补偿机制(适合 DDD/Event Sourcing)✅ 事务出错转 DLQ + 监控*/@KafkaListener(topics = "transactional-topic", groupId = "transactional-group")public void consumeTransactional(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 开启事务kafkaTemplate.executeInTransaction(operations -> {try {log.info("处理事务消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 业务处理// .......// 发送到其他主题operations.send("processed-topic1", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic2", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic3", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题// 手动提交偏移量(事务中会自动处理)ack.acknowledge(); // ✅ 第二步:提交 offset,表示成功处理return true; // 事务成功} catch (Exception e) {log.error("事务失败,失败原因:{}", e.getMessage());// 事务回滚throw new RuntimeException(e);}});}/*** 发送带有重试次数的消息** @param topic 目标主题* @param key 消息的 key* @param value 消息的内容* @param retryCount 重试次数*/private void sendWithRetryHeader(String topic, String key, String value, int retryCount) {// 创建带有重试次数的消息头// 使用 Kafka 的 RecordHeaders 添加重试次数RecordHeaders headers = new RecordHeaders();headers.add(new RecordHeader("retry-count", String.valueOf(retryCount).getBytes(StandardCharsets.UTF_8)));// 构造 Kafka 原生 ProducerRecordProducerRecord<String, String> producerRecord =new ProducerRecord<>(topic, null, null, key, value, headers);kafkaTemplate.send(producerRecord);}/*** 从消息头中提取重试次数** @param headers 消息头* @return 重试次数*/public int extractRetryCount(Headers headers) {Header retryHeader = headers.lastHeader("retry-count");if (retryHeader != null) {try {// 假设 retry-count 是以字符串形式存的String value = new String(retryHeader.value(), StandardCharsets.UTF_8);return Integer.parseInt(value);} catch (Exception e) {// 解析失败默认返回 0return 0;}}return 0;}
}
🔄 事务与幂等生产
package com.example.kafka.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 可选配置configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);// 开启事务// 保证幂等性
// configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 唯一事务 ID
// configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-producer-1");return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
/*** Kafka支持事务性消息** @param consumeRecord 消费者记录* @param ack 手动确认对象*//*Kafka 能做到的是“事务性生产 + 最终一致性消费”,不等于数据库级别的全局分布式事务。如果你要跨 Kafka 与数据库、Redis、HTTP 服务等实现一致性,请考虑:✅ 幂等性设计(业务唯一 key)✅ 手动提交 offset(控制消费确认时机)✅ 本地消息表 + 补偿机制(适合 DDD/Event Sourcing)✅ 事务出错转 DLQ + 监控*/@KafkaListener(topics = "transactional-topic", groupId = "transactional-group")public void consumeTransactional(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 开启事务kafkaTemplate.executeInTransaction(operations -> {try {log.info("处理事务消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 业务处理// .......// 发送到其他主题operations.send("processed-topic1", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic2", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic3", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题// 手动提交偏移量(事务中会自动处理)ack.acknowledge(); // ✅ 第二步:提交 offset,表示成功处理return true; // 事务成功} catch (Exception e) {log.error("事务失败,失败原因:{}", e.getMessage());// 事务回滚throw new RuntimeException(e);}});}
📦 批量消费与并发消费
/*** 监听批量消息** @param consumeRecords 批量消息记录* @param ack 手动确认对象(当配置为手动确认时使用)*//*containerFactory = "kafkaListenerContainerFactory":使用哪个 Kafka 监听容器工厂(ConcurrentKafkaListenerContainerFactory)来创建消费者监听容器。该容器 在 KafkaConsumerConfig 配置中配置了批量消费,因此这里使用 kafkaListenerContainerFactory*/@KafkaListener(topics = "my-batch", containerFactory = "kafkaListenerContainerFactory")public void listenBatchMessages(List<ConsumerRecord<String, String>> consumeRecords, Acknowledgment ack) {try {log.info("批量消息数量: {}", consumeRecords.size());for (ConsumerRecord<String, String> consumeRecord : consumeRecords) {log.info("监听单个消息,主题:{},key:{},value: {},偏移量:{},分区:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), consumeRecord.offset(), consumeRecord.partition());// 处理逻辑 ...}// 处理成功后手动 ackack.acknowledge();} catch (Exception e) {log.error("处理失败,提交 offset,失败原因:{}", e.getMessage());// 不调用 ack.acknowledge(),Kafka 将在下一次重新投递}}
⚡ Kafka Streams 实时处理
package com.example.kafka.config;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;import java.time.Duration;/*** Kafka Streams 实现 日志实时聚合* 每分钟统计一次不同日志级别(如 INFO、ERROR、WARN)的日志数量,并输出到 Kafka 的新主题 log-level-stats,同时打印控制台信息。* @author wkj*/
//@Configuration
// EnableKafkaStreams:启用 Spring Kafka Streams 功能
//@EnableKafkaStreams
public class KafkaStreamConfig {/*** 定义一个 Kafka 流处理逻辑的 Bean,处理来自 Kafka 的日志流(KStream)。* StreamsBuilder 是 Kafka Streams 构建处理拓扑的类。*/
// @Beanpublic KStream<String, String> kStream(StreamsBuilder builder) {// 从名为 application-logs 的 Kafka topic 中读取数据(格式为 JSON 字符串)KStream<String, String> stream = builder.stream("application-logs");ObjectMapper objectMapper = new ObjectMapper();// 提取 level 字段并统计stream.map((key, value) -> {try {JsonNode node = objectMapper.readTree(value);// 提取 "level" 字段String level = node.get("level").asText();// 返回键为 level,值为 1return new KeyValue<>(level, "1");} catch (Exception e) {// 解析失败则标记为 UNKNOWNreturn new KeyValue<>("UNKNOWN", "1");}})// 分组 + 滑动窗口聚合统计// .groupByKey():按日志级别(INFO、ERROR 等)分组。.groupByKey()// .windowedBy(...):基于时间窗口(这里是 1 分钟)统计。.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))// .count():对每个日志级别在每个 1 分钟内出现的次数进行计数。.count().toStream()// 输出控制台调试信息:每分钟内每种日志级别的数量。.peek((key, count) -> System.out.printf("窗口[%s] 等级[%s] 数量: %d%n",key.window().startTime(),key.key(),count))// 写回 Kafka:发送到 log-level-stats 主题.map((key, count) -> new KeyValue<>(key.key(), count.toString()))// 写入到 Kafka 的 log-level-stats topic 中,供下游消费或存入 Elasticsearch。.to("log-level-stats", Produced.with(Serdes.String(), Serdes.String()));// 返回源数据流(不是聚合流),通常用于后续处理或测试,不影响核心逻辑。return stream;}
}
⏰ 延迟消息 & Dead Letter Queue
@KafkaListener(topics = "dead-letter-topic", groupId = "dlt-group")public void listenDLT(ConsumerRecord<String, String> consumeRecord) {log.warn("死信队列接收到失败消息:Key:{},value:{}", consumeRecord.key(), consumeRecord.value());}
🛠 消息序列化与自定义对象传输
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDTO implements Serializable {private String username;private int age;
}@Autowired
private KafkaTemplate<String, UserDTO> userTemplate;userTemplate.send("user-topic", new UserDTO("Alice", 20));
🧪 测试与调试
- 使用
kafka-console-producer.sh
与kafka-console-consumer.sh
验证消息发送接收 - 使用 Postman 或 Controller 调用生产者接口发送消息
- 监控 Kafka UI(如 Kafka Manager、CMAK)
✅ 总结
本文提供了 Spring Boot + Kafka 的全面实战案例,覆盖:
- 普通生产者/消费者
- 批量与并发消费
- 事务 & 幂等生产
- Kafka Streams 流处理
- 延迟消息 & 死信队列
- 自定义对象消息 & JSON
本教程可直接用于生产环境参考,覆盖市面上 99% Kafka 使用场景。
🔄 源码
源码下载