当前位置: 首页 > news >正文

【SpringBoot】集成kafka之生产者、消费者、幂等性处理和消息积压

目录

  • 配置文件 application.properties
  • 启动类 Application
  • Kafka 配置
  • Message 消息实体类
  • MessageRepository 消息处理
  • 消息积压监控服务
  • Kafka消息消费者服务
  • Kafka消息生产者服务
  • API控制器提供测试接口
  • 关键特性说明
  • 生产环境建议

配置文件 application.properties

# 应用配置
server.port=8080 # 应用端口
spring.application.name=kafka-demo # 应用名称# Kafka配置
kafka.bootstrap-servers=localhost:9092 # Kafka服务器地址
kafka.consumer.group-id=demo-group # 消费者组ID
kafka.consumer.concurrency=5 # 消费者并发数# 日志配置
logging.level.root=INFO # 根日志级别
logging.level.org.springframework.kafka=INFO # Spring Kafka日志级别
logging.level.com.example=DEBUG # 应用自定义代码日志级别# 异步配置
spring.task.execution.pool.core-size=10 # 异步任务线程池核心大小
spring.task.execution.pool.max-size=20 # 异步任务线程池最大大小
spring.task.execution.pool.queue-capacity=100 # 异步任务队列容量     

启动类 Application

package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;/*** 应用启动类*/
@SpringBootApplication // 启用Spring Boot自动配置
@EnableScheduling // 启用定时任务支持,用于消息积压监控
public class DemoApplication {public static void main(String[] args) {// 启动Spring Boot应用SpringApplication.run(DemoApplication.class, args);}
}    

Kafka 配置

package com.example.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka // 启用Kafka注解支持
public class KafkaConfig {@Value("${kafka.bootstrap-servers}")private String bootstrapServers;@Value("${kafka.consumer.group-id}")private String groupId;@Value("${kafka.consumer.concurrency:5}")private int concurrency;// 生产者配置 - 确保消息可靠发送@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本都确认后才认为消息发送成功configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 消息发送失败时的重试次数configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性,防止消息重复发送configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 允许5个未确认请求return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// 消费者配置 - 确保消息可靠消费@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交,使用手动提交configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取的最大消息数return new DefaultKafkaConsumerFactory<>(configProps);}// 普通消息消费者容器工厂@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency); // 并发消费者数量,应根据分区数调整factory.getContainerProperties().setPollTimeout(3000); // 拉取超时时间factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 手动立即提交factory.setMessageConverter(new StringJsonMessageConverter()); // JSON消息转换器return factory;}// 用于处理积压消息的消费者配置 - 增加了并发度和批量处理能力@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> backlogKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10); // 更高的并发数处理积压factory.getContainerProperties().setPollTimeout(3000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.setBatchListener(true); // 启用批量消费模式return factory;}
}    

Message 消息实体类

package com.example.entity;import java.io.Serializable;
import java.util.UUID;/*** 消息实体类,用于封装消息内容和元数据*/
public class Message implements Serializable {private static final long serialVersionUID = 1L;private String id; // 消息唯一标识,用于幂等性检查private String content; // 消息内容private long timestamp; // 消息创建时间戳private String traceId; // 跟踪ID,用于分布式追踪public Message() {this.id = UUID.randomUUID().toString(); // 自动生成唯一IDthis.timestamp = System.currentTimeMillis();}public Message(String content) {this();this.content = content;}// Getters and Setterspublic String getId() { return id; }public void setId(String id) { this.id = id; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public long getTimestamp() { return timestamp; }public void setTimestamp(long timestamp) { this.timestamp = timestamp; }public String getTraceId() { return traceId; }public void setTraceId(String traceId) { this.traceId = traceId; }@Overridepublic String toString() {return "Message{id='" + id + "', content='" + content + "', timestamp=" + timestamp + '}';}
}    

MessageRepository 消息处理

package com.example.repository;import org.springframework.stereotype.Repository;
import java.util.concurrent.ConcurrentHashMap;/*** 消息处理记录仓库,用于存储和查询已处理的消息ID* 实际生产环境建议使用Redis或数据库实现,确保分布式环境下的幂等性*/
@Repository
public class MessageRepository {// 使用ConcurrentHashMap存储已处理的消息ID,键为消息ID,值为是否处理的标志private final ConcurrentHashMap<String, Boolean> processedMessages = new ConcurrentHashMap<>();// 检查消息是否已处理public boolean exists(String messageId) {return processedMessages.containsKey(messageId);}// 记录消息已处理public void save(String messageId) {processedMessages.put(messageId, true);}// 获取已处理消息数量public long count() {return processedMessages.size();}
}    

消息积压监控服务

package com.example.service;import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.ExecutionException;/*** 消息积压监控服务,定期检查Kafka主题的积压情况并触发告警*/
@Service
public class BacklogMonitorService {private final AdminClient adminClient;public BacklogMonitorService() {// 创建Kafka管理客户端,用于获取主题和消费者组信息Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");this.adminClient = AdminClient.create(props);}// 定时任务:每5分钟检查一次消息积压情况@Scheduled(fixedRate = 300000)public void monitorBacklog() {try {// 获取所有主题ListTopicsResult topics = adminClient.listTopics();Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics.names().get()).all().get().values();// 遍历每个主题,计算积压量for (TopicDescription topic : topicDescriptions) {if (topic.name().endsWith("-topic")) { // 只监控业务主题long totalBacklog = calculateTopicBacklog(topic.name());System.out.println("主题: " + topic.name() + ", 总积压量: " + totalBacklog);// 积压超过阈值触发告警if (totalBacklog > 100000) {triggerAlert(topic.name(), totalBacklog);}}}} catch (Exception e) {// 监控过程中发生异常,记录错误信息System.err.println("监控积压时发生异常: " + e.getMessage());}}// 计算指定主题的积压量private long calculateTopicBacklog(String topic) throws ExecutionException, InterruptedException {// 获取主题各分区的最新偏移量Map<TopicPartition, Long> endOffsets = getEndOffsets(topic);// 获取消费者组当前消费的偏移量Map<TopicPartition, Long> consumerOffsets = getConsumerGroupOffsets("demo-group", topic);// 计算总积压量:各分区最新偏移量减去消费者当前偏移量long totalBacklog = 0;for (TopicPartition partition : endOffsets.keySet()) {long endOffset = endOffsets.get(partition);long consumerOffset = consumerOffsets.getOrDefault(partition, 0L);totalBacklog += endOffset - consumerOffset;}return totalBacklog;}// 获取主题各分区的最新偏移量private Map<TopicPartition, Long> getEndOffsets(String topic) throws ExecutionException, InterruptedException {List<TopicPartition> partitions = new ArrayList<>();DescribeTopicsResult describeTopics = adminClient.describeTopics(Collections.singletonList(topic));TopicDescription topicDescription = describeTopics.values().get(topic).get();// 构建主题分区列表for (PartitionInfo partition : topicDescription.partitions()) {partitions.add(new TopicPartition(topic, partition.partition()));}// 获取各分区的最新偏移量return adminClient.endOffsets(partitions).get();}// 获取消费者组在指定主题各分区的当前偏移量private Map<TopicPartition, Long> getConsumerGroupOffsets(String groupId, String topic) throws ExecutionException, InterruptedException {Map<TopicPartition, Long> offsets = new HashMap<>();ListConsumerGroupOffsetsResult consumerGroupOffsets = adminClient.listConsumerGroupOffsets(groupId);// 提取指定主题的偏移量信息for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumerGroupOffsets.partitionsToOffsetAndMetadata().get().entrySet()) {if (entry.getKey().topic().equals(topic)) {offsets.put(entry.getKey(), entry.getValue().offset());}}return offsets;}// 触发积压告警,实际项目中可集成邮件、短信或监控系统private void triggerAlert(String topic, long backlogSize) {System.out.println("告警: 主题 " + topic + " 积压量达到 " + backlogSize + " 条消息!");// 可扩展实现:// 1. 发送邮件通知// 2. 调用短信API// 3. 集成监控系统如Prometheus和Grafana}
}    

Kafka消息消费者服务

package com.example.service;import com.example.entity.Message;
import com.example.repository.MessageRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.util.List;/*** Kafka消息消费者服务,负责从Kafka主题消费消息并进行业务处理*/
@Service
public class KafkaConsumerService {@Autowiredprivate MessageRepository messageRepository;private final ObjectMapper objectMapper = new ObjectMapper(); // JSON序列化/反序列化工具// 消费普通消息,单条处理模式@KafkaListener(topics = "demo-topic", groupId = "demo-group")public void listen(String messageJson, Acknowledgment acknowledgment) {try {// 1. 从JSON字符串解析消息对象Message message = objectMapper.readValue(messageJson, Message.class);// 2. 幂等性检查:检查消息是否已经处理过if (messageRepository.exists(message.getId())) {System.out.println("重复消息,跳过处理: " + message.getId());acknowledgment.acknowledge(); // 提交偏移量return;}// 3. 处理消息业务逻辑processMessage(message);// 4. 记录消息已处理messageRepository.save(message.getId());// 5. 手动提交偏移量,表示消息已成功处理acknowledgment.acknowledge();} catch (Exception e) {// 处理异常情况System.err.println("处理消息失败: " + e.getMessage());// 可以实现重试逻辑或发送到死信队列acknowledgment.acknowledge(); // 这里选择提交,避免重复消费导致处理失败}}// 消费积压消息,批量处理模式@KafkaListener(topics = "backlog-topic", groupId = "backlog-group", containerFactory = "backlogKafkaListenerContainerFactory")public void listenBatch(List<String> messages, Acknowledgment acknowledgment) {System.out.println("接收到批量消息: " + messages.size());for (String messageJson : messages) {try {// 解析消息Message message = objectMapper.readValue(messageJson, Message.class);// 幂等性检查并处理if (!messageRepository.exists(message.getId())) {processMessage(message);messageRepository.save(message.getId());}} catch (Exception e) {// 处理单条消息异常,不影响其他消息处理System.err.println("处理批量消息中的一条失败: " + e.getMessage());}}// 批量处理完成后,统一提交偏移量acknowledgment.acknowledge();}// 消息业务处理逻辑private void processMessage(Message message) {// 模拟业务处理逻辑System.out.println("处理消息: " + message);// 模拟耗时操作,实际项目中替换为真实业务逻辑try {Thread.sleep(10); // 每条消息处理10ms} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}    

Kafka消息生产者服务

package com.example.service;import com.example.entity.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** Kafka消息生产者服务,负责发送消息到Kafka主题*/
@Service
public class KafkaProducerService {private static final String TOPIC = "demo-topic"; // 普通消息主题private static final String BACKLOG_TOPIC = "backlog-topic"; // 用于测试积压的主题@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// 发送普通消息,带异步回调确认public void sendMessage(Message message) {// 发送消息,使用消息ID作为键,确保相同ID的消息发送到同一分区ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message.getId(), message.getContent());// 添加异步回调,处理发送结果future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功,记录发送信息System.out.println("消息发送成功: " + message.getId() + " 分区: " + result.getRecordMetadata().partition() +" 偏移量: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败,记录错误信息System.err.println("消息发送失败: " + message.getId() + " 错误: " + ex.getMessage());// 可实现重试逻辑或记录失败消息到日志}});}// 发送大量消息用于测试积压情况public void sendBatchMessages(int count) {for (int i = 0; i < count; i++) {Message message = new Message("Batch message-" + i);kafkaTemplate.send(BACKLOG_TOPIC, message.getId(), message.getContent());// 每发送1000条消息打印一次进度if (i % 1000 == 0) {System.out.println("已发送 " + i + " 条消息");}}}
}    

API控制器提供测试接口

package com.example.controller;import com.example.entity.Message;
import com.example.service.KafkaProducerService;
import com.example.repository.MessageRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** REST API控制器,提供测试接口*/
@RestController
@RequestMapping("/api")
public class DemoController {@Autowiredprivate KafkaProducerService producerService;@Autowiredprivate MessageRepository messageRepository;// 发送单条消息的API接口@PostMapping("/send")public String sendMessage(@RequestBody String content) {// 创建消息对象Message message = new Message(content);// 调用生产者服务发送消息producerService.sendMessage(message);return "消息已发送: " + message.getId();}// 发送批量消息测试积压的API接口@PostMapping("/send/batch/{count}")public String sendBatchMessages(@PathVariable int count) {// 调用生产者服务发送大量消息,用于测试积压情况producerService.sendBatchMessages(count);return "开始发送 " + count + " 条消息";}// 查询已处理消息数量的API接口@GetMapping("/processed/count")public String getProcessedCount() {// 查询已处理的消息数量,用于监控处理进度return "已处理消息数量: " + messageRepository.count();}
}    

关键特性说明

  • 幂等性保障
    • 为每条消息生成全局唯一 ID(UUID)
    • 消费者通过MessageRepository检查消息是否已处理
    • 先检查再处理,处理成功后记录,确保不重复处理

  • 高性能设计
    • 生产者配置:
      • 启用幂等性(ENABLE_IDEMPOTENCE_CONFIG=true)
      • 批量发送
      • 异步发送带回调
    • 消费者配置:
      • 手动提交偏移量
      • 批量消费模式(处理积压)
      • 高并发消费者实例

  • 异常处理机制
    • 生产者端:
      • 异步发送失败时记录错误并支持重试
    • 消费者端:
      • 单条消息处理失败不影响其他消息
      • 提供失败消息处理策略(可扩展)

  • 消息积压解决方案
    • 独立的积压处理消费者组
    • 更高的并发度(10 个消费者实例)
    • 批量消费模式(每次处理 500 条)
    • 定时监控积压量并触发告警

  • 可观测性
    • 积压监控定时任务
    • 处理进度统计 API
    • 详细的日志记录

生产环境建议

  • 替换内存存储
    • 将MessageRepository替换为基于 Redis 或数据库的实现,确保幂等性检查的分布式一致性
  • 扩展积压处理能力
    • 增加分区数提高并行度
    • 部署多个消费者实例
    • 使用独立的积压处理 Topic 和消费者组
  • 完善监控告警
    • 集成 Prometheus 和 Grafana 可视化积压指标
    • 配置多渠道告警(邮件、短信、即时通讯)
  • 增强异常处理
    • 实现死信队列存储失败消息
    • 增加消息重试策略配置

相关文章:

  • (顺序表、单链表、双链表)==>一篇解决!(Java版)
  • 网安学途—流量分析 attack.pcap
  • 豌豆 760 收录泛滥现象深度解析与应对策略
  • 常见排序算法及复杂度分析
  • 中国区adsense接收pin码,身份验证和地址验证指南
  • Linux:进程控制2
  • django扩展练习记录
  • 【工作记录】Kong Gateway入门篇之简介
  • 用AI制作黑神话悟空质感教程,3D西游记裸眼效果,西游人物跳出书本
  • 大数据——解决Matplotlib 字体不足问题(Linux\mac\windows)
  • 考研复习全年规划
  • Java:final的作用和原理介绍
  • Vue 3.5 :新特性全解析与开发实践指南
  • Python作业练习2
  • 解锁生命周期评价密码:OpenLCA、GREET 与 R 语言的融合应用
  • 浅析AI大模型为何需要向量数据库?从记忆存储到认知进化
  • 图灵爬虫练习平台 第十四题 逆向
  • 2025年金融创新、区块链与信息技术国际会议(FRCIT 2025 2025)
  • aardio - 虚表 —— 绘制整行背景进度条功能
  • RASP的运行时注入与更新
  • 小耳朵等来了春天:公益义诊筛查专家走进安徽安庆
  • A股午后拉升,沪指收复3400点:大金融发力,两市成交超1.3万亿元
  • 远如《月球背面》,近似你我内心
  • 中国巴西关于乌克兰危机的联合声明
  • 印称印巴军事行动总指挥同意将局势降级
  • 云南一男子持刀致邻居3死1重伤案二审开庭,未当庭宣判