目录
- 配置文件 application.properties
- 启动类 Application
- Kafka 配置
- Message 消息实体类
- MessageRepository 消息处理
- 消息积压监控服务
- Kafka消息消费者服务
- Kafka消息生产者服务
- API控制器提供测试接口
- 关键特性说明
- 生产环境建议
配置文件 application.properties
# 应用配置
server.port=8080 # 应用端口
spring.application.name=kafka-demo # 应用名称# Kafka配置
kafka.bootstrap-servers=localhost:9092 # Kafka服务器地址
kafka.consumer.group-id=demo-group # 消费者组ID
kafka.consumer.concurrency=5 # 消费者并发数# 日志配置
logging.level.root=INFO # 根日志级别
logging.level.org.springframework.kafka=INFO # Spring Kafka日志级别
logging.level.com.example=DEBUG # 应用自定义代码日志级别# 异步配置
spring.task.execution.pool.core-size=10 # 异步任务线程池核心大小
spring.task.execution.pool.max-size=20 # 异步任务线程池最大大小
spring.task.execution.pool.queue-capacity=100 # 异步任务队列容量
启动类 Application
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}
Kafka 配置
package com.example.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${kafka.bootstrap-servers}")private String bootstrapServers;@Value("${kafka.consumer.group-id}")private String groupId;@Value("${kafka.consumer.concurrency:5}")private int concurrency;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.RETRIES_CONFIG, 3); configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setMessageConverter(new StringJsonMessageConverter()); return factory;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> backlogKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10); factory.getContainerProperties().setPollTimeout(3000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.setBatchListener(true); return factory;}
}
Message 消息实体类
package com.example.entity;import java.io.Serializable;
import java.util.UUID;
public class Message implements Serializable {private static final long serialVersionUID = 1L;private String id; private String content; private long timestamp; private String traceId; public Message() {this.id = UUID.randomUUID().toString(); this.timestamp = System.currentTimeMillis();}public Message(String content) {this();this.content = content;}public String getId() { return id; }public void setId(String id) { this.id = id; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public long getTimestamp() { return timestamp; }public void setTimestamp(long timestamp) { this.timestamp = timestamp; }public String getTraceId() { return traceId; }public void setTraceId(String traceId) { this.traceId = traceId; }@Overridepublic String toString() {return "Message{id='" + id + "', content='" + content + "', timestamp=" + timestamp + '}';}
}
MessageRepository 消息处理
package com.example.repository;import org.springframework.stereotype.Repository;
import java.util.concurrent.ConcurrentHashMap;
@Repository
public class MessageRepository {private final ConcurrentHashMap<String, Boolean> processedMessages = new ConcurrentHashMap<>();public boolean exists(String messageId) {return processedMessages.containsKey(messageId);}public void save(String messageId) {processedMessages.put(messageId, true);}public long count() {return processedMessages.size();}
}
消息积压监控服务
package com.example.service;import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.ExecutionException;
@Service
public class BacklogMonitorService {private final AdminClient adminClient;public BacklogMonitorService() {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");this.adminClient = AdminClient.create(props);}@Scheduled(fixedRate = 300000)public void monitorBacklog() {try {ListTopicsResult topics = adminClient.listTopics();Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics.names().get()).all().get().values();for (TopicDescription topic : topicDescriptions) {if (topic.name().endsWith("-topic")) { long totalBacklog = calculateTopicBacklog(topic.name());System.out.println("主题: " + topic.name() + ", 总积压量: " + totalBacklog);if (totalBacklog > 100000) {triggerAlert(topic.name(), totalBacklog);}}}} catch (Exception e) {System.err.println("监控积压时发生异常: " + e.getMessage());}}private long calculateTopicBacklog(String topic) throws ExecutionException, InterruptedException {Map<TopicPartition, Long> endOffsets = getEndOffsets(topic);Map<TopicPartition, Long> consumerOffsets = getConsumerGroupOffsets("demo-group", topic);long totalBacklog = 0;for (TopicPartition partition : endOffsets.keySet()) {long endOffset = endOffsets.get(partition);long consumerOffset = consumerOffsets.getOrDefault(partition, 0L);totalBacklog += endOffset - consumerOffset;}return totalBacklog;}private Map<TopicPartition, Long> getEndOffsets(String topic) throws ExecutionException, InterruptedException {List<TopicPartition> partitions = new ArrayList<>();DescribeTopicsResult describeTopics = adminClient.describeTopics(Collections.singletonList(topic));TopicDescription topicDescription = describeTopics.values().get(topic).get();for (PartitionInfo partition : topicDescription.partitions()) {partitions.add(new TopicPartition(topic, partition.partition()));}return adminClient.endOffsets(partitions).get();}private Map<TopicPartition, Long> getConsumerGroupOffsets(String groupId, String topic) throws ExecutionException, InterruptedException {Map<TopicPartition, Long> offsets = new HashMap<>();ListConsumerGroupOffsetsResult consumerGroupOffsets = adminClient.listConsumerGroupOffsets(groupId);for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumerGroupOffsets.partitionsToOffsetAndMetadata().get().entrySet()) {if (entry.getKey().topic().equals(topic)) {offsets.put(entry.getKey(), entry.getValue().offset());}}return offsets;}private void triggerAlert(String topic, long backlogSize) {System.out.println("告警: 主题 " + topic + " 积压量达到 " + backlogSize + " 条消息!");}
}
Kafka消息消费者服务
package com.example.service;import com.example.entity.Message;
import com.example.repository.MessageRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.util.List;
@Service
public class KafkaConsumerService {@Autowiredprivate MessageRepository messageRepository;private final ObjectMapper objectMapper = new ObjectMapper(); @KafkaListener(topics = "demo-topic", groupId = "demo-group")public void listen(String messageJson, Acknowledgment acknowledgment) {try {Message message = objectMapper.readValue(messageJson, Message.class);if (messageRepository.exists(message.getId())) {System.out.println("重复消息,跳过处理: " + message.getId());acknowledgment.acknowledge(); return;}processMessage(message);messageRepository.save(message.getId());acknowledgment.acknowledge();} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());acknowledgment.acknowledge(); }}@KafkaListener(topics = "backlog-topic", groupId = "backlog-group", containerFactory = "backlogKafkaListenerContainerFactory")public void listenBatch(List<String> messages, Acknowledgment acknowledgment) {System.out.println("接收到批量消息: " + messages.size());for (String messageJson : messages) {try {Message message = objectMapper.readValue(messageJson, Message.class);if (!messageRepository.exists(message.getId())) {processMessage(message);messageRepository.save(message.getId());}} catch (Exception e) {System.err.println("处理批量消息中的一条失败: " + e.getMessage());}}acknowledgment.acknowledge();}private void processMessage(Message message) {System.out.println("处理消息: " + message);try {Thread.sleep(10); } catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
Kafka消息生产者服务
package com.example.service;import com.example.entity.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class KafkaProducerService {private static final String TOPIC = "demo-topic"; private static final String BACKLOG_TOPIC = "backlog-topic"; @Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(Message message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message.getId(), message.getContent());future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {System.out.println("消息发送成功: " + message.getId() + " 分区: " + result.getRecordMetadata().partition() +" 偏移量: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息发送失败: " + message.getId() + " 错误: " + ex.getMessage());}});}public void sendBatchMessages(int count) {for (int i = 0; i < count; i++) {Message message = new Message("Batch message-" + i);kafkaTemplate.send(BACKLOG_TOPIC, message.getId(), message.getContent());if (i % 1000 == 0) {System.out.println("已发送 " + i + " 条消息");}}}
}
API控制器提供测试接口
package com.example.controller;import com.example.entity.Message;
import com.example.service.KafkaProducerService;
import com.example.repository.MessageRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api")
public class DemoController {@Autowiredprivate KafkaProducerService producerService;@Autowiredprivate MessageRepository messageRepository;@PostMapping("/send")public String sendMessage(@RequestBody String content) {Message message = new Message(content);producerService.sendMessage(message);return "消息已发送: " + message.getId();}@PostMapping("/send/batch/{count}")public String sendBatchMessages(@PathVariable int count) {producerService.sendBatchMessages(count);return "开始发送 " + count + " 条消息";}@GetMapping("/processed/count")public String getProcessedCount() {return "已处理消息数量: " + messageRepository.count();}
}
关键特性说明
- 幂等性保障
- 为每条消息生成全局唯一 ID(UUID)
- 消费者通过MessageRepository检查消息是否已处理
- 先检查再处理,处理成功后记录,确保不重复处理
- 高性能设计
- 生产者配置:
- 启用幂等性(ENABLE_IDEMPOTENCE_CONFIG=true)
- 批量发送
- 异步发送带回调
- 消费者配置:
- 手动提交偏移量
- 批量消费模式(处理积压)
- 高并发消费者实例
- 异常处理机制
- 生产者端:
- 消费者端:
- 单条消息处理失败不影响其他消息
- 提供失败消息处理策略(可扩展)
- 消息积压解决方案
- 独立的积压处理消费者组
- 更高的并发度(10 个消费者实例)
- 批量消费模式(每次处理 500 条)
- 定时监控积压量并触发告警
- 可观测性
- 积压监控定时任务
- 处理进度统计 API
- 详细的日志记录
生产环境建议
- 替换内存存储
- 将MessageRepository替换为基于 Redis 或数据库的实现,确保幂等性检查的分布式一致性
- 扩展积压处理能力
- 增加分区数提高并行度
- 部署多个消费者实例
- 使用独立的积压处理 Topic 和消费者组
- 完善监控告警
- 集成 Prometheus 和 Grafana 可视化积压指标
- 配置多渠道告警(邮件、短信、即时通讯)
- 增强异常处理