当前位置: 首页 > news >正文

Spring Boot + Kafka 全面实战案例

🌟 Spring Boot + Kafka 全面实战案例(覆盖市面 99% 场景)

本文使用 Spring Boot 3 + Kafka 3.x,演示 Kafka 在微服务中的完整应用场景,包括生产者、消费者、事务、幂等、批量消费、Streams、延迟消息、死信队列等。


📖 目录

  1. 项目简介
  2. 环境准备
  3. 项目结构
  4. 配置 Kafka
  5. Kafka 生产者示例
  6. Kafka 消费者示例
  7. 事务与幂等生产
  8. 批量消费与并发消费
  9. Kafka Streams 实时处理
  10. 延迟消息 & Dead Letter Queue
  11. 消息序列化与自定义对象传输
  12. 测试与调试
  13. 总结
  14. 源码

📝 项目简介

  • Spring Boot 版本:3.2.x
  • Kafka 版本:3.x
  • JDK:17
  • 覆盖场景:
    • 普通生产者/消费者
    • 事务消息 & 幂等生产
    • 批量 & 并发消费
    • Streams 流处理
    • 延迟消息 & 死信队列
    • 对象序列化与 JSON 消息
    • Spring Cloud Stream 方式集成

💻 环境准备

  • Kafka 集群(单机或多节点)
  • Zookeeper(Kafka 依赖)

Docker Compose 示例

version: '3.8'services:zookeeper:image: bitnami/zookeeper:3.8container_name: zookeeperports:- "2181:2181"environment:ALLOW_ANONYMOUS_LOGIN: "yes"restart: unless-stoppedkafka:image: bitnami/kafka:3.5container_name: kafkaports:- "9092:9092"- "29092:29092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeperrestart: unless-stopped

📂 项目结构

springboot-kafka-demo
├─ src/main/java/com/example/kafka
│  ├─ config
│  │   ├─ KafkaAdminConfig.java
│  │   ├─ KafkaConsumerConfig.java
│  │   ├─ KafkaProducerConfig.java
│  │   └─ KafkaStreamConfig.java
│  ├─ controller
│  │   └─ KafkaController.java
│  ├─ service
│  │   ├─ KafkaConsumerService.java
│  │   ├─ KafkaLogService.java
│  │   ├─ KafkaProducerService.java
│  │   └─ KafkaTopicService.java
│  ├─ dto
│  │   └─ UserQueryLog.java
│  └─ SpringbootKafkaDemoApplication.java
└─ src/main/resources└─ application.yml

⚙️ 配置 Kafka

server:port: 8081# kafka
spring:kafka:bootstrap-servers: 192.168.3.150:29092 # Kafka broker 地址列表,客户端连接 Kafka 集群的入口,格式 host:port,多节点用逗号分隔# Producer(生产者)配置producer:# 生产者发送消息时 key 的序列化器,将 Java 对象转换为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 生产者发送消息时 value 的序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 消息发送确认机制: all, none, lead (default: all),"all" 表示等待所有 ISR 副本确认,保证最高可靠性acks: all# 发送失败时重试次数,默认是 0(不重试)retries: 3# 批量发送的消息大小(字节),达到该大小后批量发送batch-size: 16384# 消息压缩类型,支持 none(无)、gzip、snappy、lz4、zstd,提高网络传输效率compression-type: snappy# 生产者网络请求缓冲区大小,单位字节(default: 131072)buffer-memory: 131072# 生产者的其他属性properties:# 是否开启幂等性,防止重复消息发送,确保消息不重复写入enable.idempotence: true# 最大的未确认请求数,设置为 1 可以保证严格的顺序写入 (default: 5)max.in.flight.requests.per.connection: 1# Consumer(消费者)配置consumer:# 消费者组 ID,Kafka 通过该 ID 管理消费者组协调消费分区group-id: my-group-id# key 的反序列化器,将字节数组转换为 Java 对象key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# value 的反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 自动提交偏移量的时间间隔(毫秒)auto-commit-interval: 1000# 是否开启自动提交偏移量,true 表示 Kafka 客户端自动提交消费进度(default: true)enable-auto-commit: false  # 禁用自动提交# 新消费组第一次消费时从哪里开始消费,earliest 表示从头开始消费,latest 表示最新消息开始消费auto-offset-reset: earliest# 每次 poll 拉取的最大记录数max-poll-records: 10# 消费者其他属性properties:# 会话超时时间(毫秒),超过该时间未收到心跳则认为消费者离线(default: 10000)session.timeout.ms: 10000# 心跳间隔(毫秒),定期发送心跳保持与 Kafka 协调器连接(default: 3000)heartbeat.interval.ms: 3000# 最大轮询间隔(毫秒),超过此时间未调用 poll 方法则认为消费者失效(default: 5000)max.poll.interval.ms: 5000# 指定消息值的反序列化器,将 Kafka 中的 JSON 数据转换为 Java 对象。spring.json.trusted.packages: '*'# Listener(消息监听器)配置listener:# 消费者监听器的确认模式,RECORD 表示每条消息确认,其他还有 MANUAL、TIME 等 (MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)ack-mode: MANUAL  # 手动 ack# 监听器的并发线程数,提高消费吞吐量concurrency: 3# 监听消息的类型,batch 表示批量消息消费,single 表示单条消息消费type: batch# KafkaTemplate 默认配置template:# 发送消息时默认的主题名default-topic: default-topicstreams:application-id: log-aggregation-appproperties:default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerdedefault.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerdelogging:level:org.springframework.web: INFOorg.example.microservice: DEBUG

🚀 Kafka 生产者示例

package com.example.kafka.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;/*** Kafka生产者服务*/
@Service
@Slf4j
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题** @param topic   目标主题* @param message 消息内容*/public void sendMessage(String topic, String message) {log.info("Producing message to topic {}: {}", topic, message);// 发送消息,不关心结果kafkaTemplate.send(topic, message);}/*** 发送消息并异步处理结果** @param topic   目标主题* @param key     消息键* @param message 消息内容*/public void sendMessageWithCallback(String topic, String key, String message) {log.info("Producing message to topic {} with key {}: {}", topic, key, message);kafkaTemplate.executeInTransaction(kt -> {// 发送消息并添加回调CompletableFuture<SendResult<String, String>> future = kt.send(topic, key, message);future.thenAccept(result -> {log.info("Sent message successfully to topic {} with offset {}",result.getRecordMetadata().topic(),result.getRecordMetadata().offset());}).exceptionally(ex -> {log.error("Failed to send message to topic {}", topic, ex);return null;});return true;});}
}

调用示例:

package com.example.kafka.controller;import com.alibaba.fastjson2.JSONObject;
import com.example.kafka.service.KafkaLogService;
import com.example.kafka.service.KafkaProducerService;
import com.example.kafka.service.KafkaTopicService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.Map;
import java.util.Set;@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {@AutowiredKafkaProducerService producerService;@Autowiredprivate KafkaTopicService topicService;@Autowiredprivate KafkaLogService logService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {producerService.sendMessage("default-topic", message);return "Message sent: " + message;}@GetMapping("/send")public String sendMessage() {JSONObject json = new JSONObject();json.put("level", "ERROR");producerService.sendMessage("application-logs", json.toString());return "Message sent: " + json;}/*** 发送消息*/@GetMapping("/send/{topic}/{key}/{message}")public String sendMessage(@PathVariable String topic, @PathVariable String key, @PathVariable String message) {producerService.sendMessageWithCallback(topic, key, message);return "topic sent: " + topic + ",key sent: " + key + ",Message sent: " + message;}}

发送消息:http://localhost:8081/kafka/send/测试发送
在这里插入图片描述


📥 Kafka 消费者示例

package com.example.kafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.nio.charset.StandardCharsets;
import java.util.List;/*** | 概念       | 作用                               |* | --------- | -------------------------------- |* | **Topic** | 消息分类容器(生产者写入、消费者读取)              |* | **Group** | 消费者逻辑分组,控制消费并发与 offset 提交        |* | **Key**   | 控制消息落入哪个 Partition(分区),从而控制消费顺序性 |* 丨* Kafka消费者服务** @author wkj*/
@Service
@Slf4j
public class KafkaConsumerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaConsumerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;this.kafkaTemplate.setTransactionIdPrefix("txn-");}/*** 监听单个消息** @param consumeRecord 消费的消息记录*/@KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}")public void listenSingleMessage(ConsumerRecord<String, String> consumeRecord) {log.info("监听单个消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());}/*** 监听批量消息** @param consumeRecords 批量消息记录* @param ack            手动确认对象(当配置为手动确认时使用)*//*containerFactory = "kafkaListenerContainerFactory":使用哪个 Kafka 监听容器工厂(ConcurrentKafkaListenerContainerFactory)来创建消费者监听容器。该容器 在 KafkaConsumerConfig 配置中配置了批量消费,因此这里使用 kafkaListenerContainerFactory*/@KafkaListener(topics = "my-batch", containerFactory = "kafkaListenerContainerFactory")public void listenBatchMessages(List<ConsumerRecord<String, String>> consumeRecords, Acknowledgment ack) {try {log.info("批量消息数量: {}", consumeRecords.size());for (ConsumerRecord<String, String> consumeRecord : consumeRecords) {log.info("监听单个消息,主题:{},key:{},value: {},偏移量:{},分区:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), consumeRecord.offset(), consumeRecord.partition());// 处理逻辑 ...}// 处理成功后手动 ackack.acknowledge();} catch (Exception e) {log.error("处理失败,提交 offset,失败原因:{}", e.getMessage());// 不调用 ack.acknowledge(),Kafka 将在下一次重新投递}}/*** 监听特定主题的消息 (类似于 监听单个消息, 只需要自己设置topics,和groupId)** @param message 消息内容*/@KafkaListener(topics = "custom-topic", groupId = "custom-group")public void listenCustomTopic(String message) {log.info("Received message from custom topic: {}", message);}/*** 手动提交偏移量示例** @param consumeRecord 消费者记录* @param ack           手动确认对象*/@KafkaListener(topics = "reliable-topic", groupId = "reliable-group")public void consumeReliably(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {try {log.info("手动提交偏移量示例,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 模拟业务处理// ..........// 手动提交偏移量ack.acknowledge();log.info("已提交偏移量");} catch (Exception e) {// 不提交偏移量,消息会被重新消费log.error("不提交偏移量,消息会被重新消费,失败原因:{}", e.getMessage());}}@KafkaListener(topics = "dead-letter-topic", groupId = "dlt-group")public void listenDLT(ConsumerRecord<String, String> consumeRecord) {log.warn("死信队列接收到失败消息:Key:{},value:{}", consumeRecord.key(), consumeRecord.value());}private static final int MAX_RETRIES = 3;private static final String DEAD_LETTER_TOPIC = "dead-letter-topic";/*** 消费者重试机制,适用于消费失败的消息,会根据重试次数进行重试。* 超过最大重试次数后,消息会被发送到死信队列。** @param consumeRecord 消费者记录* @param ack           手动确认对象*/@KafkaListener(topics = "retryable-topic", groupId = "retryable-group")public void consumeWithRetry(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 从消息头中提取重试次数int retryCount = extractRetryCount(consumeRecord.headers());try {log.info("手动提交偏移量示例,主题:{},key:{},value: {},重试次数:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), retryCount);// 业务处理// .......ack.acknowledge();} catch (Exception e) {retryCount++;if (retryCount <= MAX_RETRIES) {log.info("重试失败,失败次数:{},消息:{}", retryCount, consumeRecord.value());// 更新消息头中的重试次数并重新发送到原主题sendWithRetryHeader(consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), retryCount);} else {log.error("超过最大重试次数,消息:{}", consumeRecord.value());// 发送到死信队列kafkaTemplate.send(DEAD_LETTER_TOPIC, consumeRecord.key(), consumeRecord.value());ack.acknowledge(); // ❗转发到 DLQ 后,手动 ack,防止 Kafka 一直重复投递}}}/*** Kafka支持事务性消息** @param consumeRecord 消费者记录* @param ack           手动确认对象*//*Kafka 能做到的是“事务性生产 + 最终一致性消费”,不等于数据库级别的全局分布式事务。如果你要跨 Kafka 与数据库、Redis、HTTP 服务等实现一致性,请考虑:✅ 幂等性设计(业务唯一 key)✅ 手动提交 offset(控制消费确认时机)✅ 本地消息表 + 补偿机制(适合 DDD/Event Sourcing)✅ 事务出错转 DLQ + 监控*/@KafkaListener(topics = "transactional-topic", groupId = "transactional-group")public void consumeTransactional(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 开启事务kafkaTemplate.executeInTransaction(operations -> {try {log.info("处理事务消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 业务处理// .......// 发送到其他主题operations.send("processed-topic1", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic2", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic3", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题// 手动提交偏移量(事务中会自动处理)ack.acknowledge();  // ✅ 第二步:提交 offset,表示成功处理return true; // 事务成功} catch (Exception e) {log.error("事务失败,失败原因:{}", e.getMessage());// 事务回滚throw new RuntimeException(e);}});}/*** 发送带有重试次数的消息** @param topic      目标主题* @param key        消息的 key* @param value      消息的内容* @param retryCount 重试次数*/private void sendWithRetryHeader(String topic, String key, String value, int retryCount) {// 创建带有重试次数的消息头// 使用 Kafka 的 RecordHeaders 添加重试次数RecordHeaders headers = new RecordHeaders();headers.add(new RecordHeader("retry-count", String.valueOf(retryCount).getBytes(StandardCharsets.UTF_8)));// 构造 Kafka 原生 ProducerRecordProducerRecord<String, String> producerRecord =new ProducerRecord<>(topic, null, null, key, value, headers);kafkaTemplate.send(producerRecord);}/*** 从消息头中提取重试次数** @param headers 消息头* @return 重试次数*/public int extractRetryCount(Headers headers) {Header retryHeader = headers.lastHeader("retry-count");if (retryHeader != null) {try {// 假设 retry-count 是以字符串形式存的String value = new String(retryHeader.value(), StandardCharsets.UTF_8);return Integer.parseInt(value);} catch (Exception e) {// 解析失败默认返回 0return 0;}}return 0;}
}

🔄 事务与幂等生产

package com.example.kafka.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 可选配置configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);// 开启事务// 保证幂等性
//        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 唯一事务 ID
//        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-producer-1");return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
/*** Kafka支持事务性消息** @param consumeRecord 消费者记录* @param ack           手动确认对象*//*Kafka 能做到的是“事务性生产 + 最终一致性消费”,不等于数据库级别的全局分布式事务。如果你要跨 Kafka 与数据库、Redis、HTTP 服务等实现一致性,请考虑:✅ 幂等性设计(业务唯一 key)✅ 手动提交 offset(控制消费确认时机)✅ 本地消息表 + 补偿机制(适合 DDD/Event Sourcing)✅ 事务出错转 DLQ + 监控*/@KafkaListener(topics = "transactional-topic", groupId = "transactional-group")public void consumeTransactional(ConsumerRecord<String, String> consumeRecord, Acknowledgment ack) {// 开启事务kafkaTemplate.executeInTransaction(operations -> {try {log.info("处理事务消息,主题:{},key:{},value: {}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value());// 业务处理// .......// 发送到其他主题operations.send("processed-topic1", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic2", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题operations.send("processed-topic3", consumeRecord.key(), consumeRecord.value()); // ✅ 第一步:发送到其他主题// 手动提交偏移量(事务中会自动处理)ack.acknowledge();  // ✅ 第二步:提交 offset,表示成功处理return true; // 事务成功} catch (Exception e) {log.error("事务失败,失败原因:{}", e.getMessage());// 事务回滚throw new RuntimeException(e);}});}

📦 批量消费与并发消费

 /*** 监听批量消息** @param consumeRecords 批量消息记录* @param ack            手动确认对象(当配置为手动确认时使用)*//*containerFactory = "kafkaListenerContainerFactory":使用哪个 Kafka 监听容器工厂(ConcurrentKafkaListenerContainerFactory)来创建消费者监听容器。该容器 在 KafkaConsumerConfig 配置中配置了批量消费,因此这里使用 kafkaListenerContainerFactory*/@KafkaListener(topics = "my-batch", containerFactory = "kafkaListenerContainerFactory")public void listenBatchMessages(List<ConsumerRecord<String, String>> consumeRecords, Acknowledgment ack) {try {log.info("批量消息数量: {}", consumeRecords.size());for (ConsumerRecord<String, String> consumeRecord : consumeRecords) {log.info("监听单个消息,主题:{},key:{},value: {},偏移量:{},分区:{}",consumeRecord.topic(), consumeRecord.key(), consumeRecord.value(), consumeRecord.offset(), consumeRecord.partition());// 处理逻辑 ...}// 处理成功后手动 ackack.acknowledge();} catch (Exception e) {log.error("处理失败,提交 offset,失败原因:{}", e.getMessage());// 不调用 ack.acknowledge(),Kafka 将在下一次重新投递}}

⚡ Kafka Streams 实时处理

package com.example.kafka.config;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;import java.time.Duration;/***  Kafka Streams 实现 日志实时聚合*  每分钟统计一次不同日志级别(如 INFO、ERROR、WARN)的日志数量,并输出到 Kafka 的新主题 log-level-stats,同时打印控制台信息。* @author wkj*/
//@Configuration
// EnableKafkaStreams:启用 Spring Kafka Streams 功能
//@EnableKafkaStreams
public class KafkaStreamConfig {/*** 定义一个 Kafka 流处理逻辑的 Bean,处理来自 Kafka 的日志流(KStream)。* StreamsBuilder 是 Kafka Streams 构建处理拓扑的类。*/
//    @Beanpublic KStream<String, String> kStream(StreamsBuilder builder) {// 从名为 application-logs 的 Kafka topic 中读取数据(格式为 JSON 字符串)KStream<String, String> stream = builder.stream("application-logs");ObjectMapper objectMapper = new ObjectMapper();// 提取 level 字段并统计stream.map((key, value) -> {try {JsonNode node = objectMapper.readTree(value);// 提取 "level" 字段String level = node.get("level").asText();// 返回键为 level,值为 1return new KeyValue<>(level, "1");} catch (Exception e) {// 解析失败则标记为 UNKNOWNreturn new KeyValue<>("UNKNOWN", "1");}})// 分组 + 滑动窗口聚合统计// .groupByKey():按日志级别(INFO、ERROR 等)分组。.groupByKey()// .windowedBy(...):基于时间窗口(这里是 1 分钟)统计。.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))// .count():对每个日志级别在每个 1 分钟内出现的次数进行计数。.count().toStream()// 输出控制台调试信息:每分钟内每种日志级别的数量。.peek((key, count) -> System.out.printf("窗口[%s] 等级[%s] 数量: %d%n",key.window().startTime(),key.key(),count))// 写回 Kafka:发送到 log-level-stats 主题.map((key, count) -> new KeyValue<>(key.key(), count.toString()))// 写入到 Kafka 的 log-level-stats topic 中,供下游消费或存入 Elasticsearch。.to("log-level-stats", Produced.with(Serdes.String(), Serdes.String()));// 返回源数据流(不是聚合流),通常用于后续处理或测试,不影响核心逻辑。return stream;}
}

⏰ 延迟消息 & Dead Letter Queue

    @KafkaListener(topics = "dead-letter-topic", groupId = "dlt-group")public void listenDLT(ConsumerRecord<String, String> consumeRecord) {log.warn("死信队列接收到失败消息:Key:{},value:{}", consumeRecord.key(), consumeRecord.value());}

🛠 消息序列化与自定义对象传输

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDTO implements Serializable {private String username;private int age;
}@Autowired
private KafkaTemplate<String, UserDTO> userTemplate;userTemplate.send("user-topic", new UserDTO("Alice", 20));

🧪 测试与调试

  1. 使用 kafka-console-producer.shkafka-console-consumer.sh 验证消息发送接收
  2. 使用 Postman 或 Controller 调用生产者接口发送消息
  3. 监控 Kafka UI(如 Kafka Manager、CMAK)

✅ 总结

本文提供了 Spring Boot + Kafka 的全面实战案例,覆盖:

  • 普通生产者/消费者
  • 批量与并发消费
  • 事务 & 幂等生产
  • Kafka Streams 流处理
  • 延迟消息 & 死信队列
  • 自定义对象消息 & JSON

本教程可直接用于生产环境参考,覆盖市面上 99% Kafka 使用场景。


🔄 源码

源码下载

http://www.dtcms.com/a/490113.html

相关文章:

  • MATLAB基于GWO-BP神经网络对某拨叉件锻造金属流动性的参数分析
  • 建网站教学视频wordpress外汇
  • Ubuntu-8卡H20服务器升级nvidia驱动+cuda版本
  • 2.6 代码注释与编码规
  • html css js网页制作成品——饮料官网html+css+js 4页网页设计(4页)附源码
  • Langchain流式自定义生成器函数
  • 基于单片机的智能收银机模拟系统设计
  • ssh连接本地虚拟机
  • PyInstaller多模块项目打包指南
  • access 网站后台汕头自助建站
  • 从图纸到三维模型:智能装配指导的突破之路
  • 「JMM+Java锁+AQS」 知识图谱
  • 【广州公共资源交易-注册安全分析报告-无验证方式导致安全隐患】
  • C++ 学习日记
  • 晶晨S905L3SB芯片_安卓9.0_高安版_支持外置WIFI_线刷固件包
  • 4G5G 移动代理实战:什么时候必须用移动 IP?
  • 【OpenHarmony】传感器轻量级服务模块架构
  • 面向服务架构(SOA)模式全解析:设计、实践与价值
  • HTML 零基础入门到实战:从骨架到页面的完整指南
  • 【Java EE进阶 --- SpringBoot】Mybatis操作数据库(进阶)
  • 成都海鸥手表网站crm系统的销售管理功能包括
  • 『 QT 』QT信号机制深度解析
  • stp,rstp,mstp的区别
  • 海外盲盒APP开发:从“未知”到“精准”的用户体验革命
  • 网站建设yuanmus站长工具seo综合查询5g
  • 使用 IntelliJ IDEA 结合 DBeaver 连接 MySQL 数据库并实现数据增删查改的详细步骤:
  • 零知IDE——基于STM32F407VET6和ESP-01的SHT2X温湿度监测与云传输系统
  • 记一次生产服务器磁盘I/O性能瓶颈与负载过高分析与处理
  • MEMS加速度计深度解析:从智能手机到结构健康监测
  • LLMs-from-scratch(dataloader)