Spring Boot 集成 Kafka 详解
Spring Boot 集成 Kafka 详解
Kafka 作为一款高性能的分布式消息队列,常被用于系统间解耦、异步通信和大数据处理等场景。下面详细介绍 Spring Boot 如何集成 Kafka,并提供实用示例。
一、核心依赖与版本兼容
Spring Boot 集成 Kafka 主要依赖 spring-kafka
,需注意版本兼容性:
Spring Boot 版本 | Spring Kafka 版本 | Kafka 客户端版本 |
---|---|---|
2.7.x | 2.8.x | 3.0.x |
3.0.x | 3.0.x | 3.0.x |
3.1.x | 3.1.x | 3.1.x |
3.2.x | 3.2.x | 3.3.x |
Maven 依赖配置:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
二、详细配置说明
1. 基础配置(application.yml)
spring:kafka:# Kafka服务器地址,集群用逗号分隔bootstrap-servers: localhost:9092# 生产者配置producer:# 键序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器(JSON格式)value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 消息确认机制:0-不确认,1-首领确认,all-所有副本确认acks: 1# 批量发送配置batch-size: 16384# 批量发送缓冲区大小buffer-memory: 33554432# 重试次数retries: 3# 重试间隔(毫秒)retry-backoff-ms: 1000# 消费者配置consumer:# 消费者组IDgroup-id: my-consumer-group# 键反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器(JSON格式)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 偏移量重置策略:earliest/latest/noneauto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: false# 自动提交偏移量的间隔(毫秒)auto-commit-interval: 1000# 反序列化信任的包properties:spring:json:trusted:packages: com.example.kafka.dto# 监听器配置listener:# 手动提交偏移量模式ack-mode: manual_immediate# 并发消费者数量concurrency: 3# 批量消费配置batch-listener: false# 记录监听异常log-container-config: true
2. 配置说明
-
生产者配置:
acks
:控制消息持久化的确认级别,影响可靠性和吞吐量batch-size
和buffer-memory
:优化批量发送性能retries
:消息发送失败时的重试机制
-
消费者配置:
group-id
:同一组内的消费者共同消费主题分区,避免重复消费auto-offset-reset
:无偏移量记录时的处理策略enable-auto-commit
:是否自动提交消费偏移量
-
监听器配置:
ack-mode
:偏移量提交模式,manual_immediate
表示手动立即提交concurrency
:并发消费线程数,不应超过主题分区数
三、核心组件实现
1. 消息实体类
package com.example.kafka.dto;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO {private String id;private String content;private String sender;private LocalDateTime sendTime;
}
2. 生产者组件
package com.example.kafka.producer;import com.example.kafka.dto.MessageDTO;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.UUID;
import java.time.LocalDateTime;@Component
public class KafkaMessageProducer {private final KafkaTemplate<String, MessageDTO> kafkaTemplate;// 主题名称,建议通过配置注入private static final String DEFAULT_TOPIC = "user-messages";public KafkaMessageProducer(KafkaTemplate<String, MessageDTO> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到默认主题*/public void sendMessage(String content, String sender) {MessageDTO message = new MessageDTO(UUID.randomUUID().toString(),content,sender,LocalDateTime.now());sendMessageToTopic(DEFAULT_TOPIC, message);}/*** 发送消息到指定主题*/public void sendMessageToTopic(String topic, MessageDTO message) {// 发送消息,返回异步结果ListenableFuture<SendResult<String, MessageDTO>> future = kafkaTemplate.send(topic, message.getId(), message);// 处理发送结果future.addCallback(new ListenableFutureCallback<SendResult<String, MessageDTO>>() {@Overridepublic void onSuccess(SendResult<String, MessageDTO> result) {System.out.println("消息发送成功: " + "topic=" + result.getRecordMetadata().topic() + ", partition=" + result.getRecordMetadata().partition() +", offset=" + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息发送失败: " + ex.getMessage());// 可添加失败重试逻辑}});}
}
3. 消费者组件
package com.example.kafka.consumer;import com.example.kafka.dto.MessageDTO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaMessageConsumer {/*** 监听默认主题的消息*/@KafkaListener(topics = "user-messages",groupId = "my-consumer-group",containerFactory = "kafkaListenerContainerFactory")public void consumeMessage(ConsumerRecord<String, MessageDTO> record, Acknowledgment acknowledgment) {try {MessageDTO message = record.value();System.out.println("接收到消息: " + "id=" + message.getId() + ", sender=" + message.getSender() +", content=" + message.getContent());// 处理消息业务逻辑processMessage(message);// 手动提交偏移量acknowledgment.acknowledge();} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());// 可根据需求决定是否提交偏移量// 失败时不提交,会导致消息重新消费}}/*** 处理消息的业务逻辑*/private void processMessage(MessageDTO message) {// 实际业务处理逻辑// 例如:保存到数据库、调用其他服务等}/*** 监听多个主题的示例*/@KafkaListener(topics = {"system-notifications", "user-alerts"}, groupId = "notification-group")public void consumeNotifications(MessageDTO message) {System.out.println("接收到通知: " + message.getContent());}
}
四、高级特性使用
1. 批量消费配置
修改配置启用批量消费:
spring:kafka:listener:batch-listener: trueconsumer:max-poll-records: 50 # 每次拉取的最大记录数
批量消费实现:
@KafkaListener(topics = "batch-messages", groupId = "batch-group")
public void consumeBatch(List<ConsumerRecord<String, MessageDTO>> records, Acknowledgment acknowledgment) {System.out.println("接收到批量消息,数量: " + records.size());for (ConsumerRecord<String, MessageDTO> record : records) {try {processMessage(record.value());} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());}}// 批量提交偏移量acknowledgment.acknowledge();
}
2. 事务消息
启用 Kafka 事务支持:
spring:kafka:producer:transaction-id-prefix: tx- # 事务ID前缀
事务消息使用:
@Service
public class TransactionalMessageService {private final KafkaTemplate<String, MessageDTO> kafkaTemplate;// 构造函数注入...@Transactionalpublic void sendTransactionalMessage(MessageDTO message) {// 发送事务消息kafkaTemplate.send("transactional-topic", message);// 数据库操作或其他事务操作// ...// 如果发生异常,消息会被回滚}
}
五、测试方法
1. 集成测试
package com.example.kafka;import com.example.kafka.dto.MessageDTO;
import com.example.kafka.producer.KafkaMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import static org.junit.jupiter.api.Assertions.assertTrue;@SpringBootTest
@EmbeddedKafka(partitions = 3,topics = {"test-topic", "user-messages"},brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}
)
public class KafkaIntegrationTest {@Autowiredprivate KafkaMessageProducer producer;private CountDownLatch latch = new CountDownLatch(1);private MessageDTO receivedMessage;// 动态配置Kafka服务器地址@DynamicPropertySourcestatic void kafkaProperties(DynamicPropertyRegistry registry) {Map<String, Object> props = KafkaTestUtils.producerProps("test-group");registry.add("spring.kafka.bootstrap-servers", props::get("bootstrap.servers"));}@Testpublic void testMessageSendingAndReceiving() throws InterruptedException {// 准备测试消息String testContent = "Test message content";String testSender = "test-user";// 发送消息producer.sendMessage(testContent, testSender);// 等待消息被消费boolean messageReceived = latch.await(10, TimeUnit.SECONDS);// 验证结果assertTrue(messageReceived, "消息未被消费");assertTrue(receivedMessage.getContent().equals(testContent), "消息内容不匹配");}// 测试用的临时消费者@KafkaListener(topics = "user-messages", groupId = "test-group")public void testConsumer(MessageDTO message) {receivedMessage = message;latch.countDown(); // 通知测试线程消息已接收}
}
2. 控制器测试接口
@RestController
@RequestMapping("/kafka")
public class KafkaTestController {@Autowiredprivate KafkaMessageProducer producer;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestParam String content, @RequestParam String sender) {producer.sendMessage(content, sender);return ResponseEntity.ok("消息已发送");}
}
使用 curl 测试:
curl -X POST "http://localhost:8080/kafka/send?content=Hello&sender=test"
六、性能优化建议
-
生产者优化:
- 启用批量发送(调整
batch-size
和linger.ms
) - 适当增大
buffer-memory
- 使用压缩(
compression-type: lz4
)
- 启用批量发送(调整
-
消费者优化:
- 合理设置
concurrency
(建议等于分区数) - 增大
fetch.min.bytes
和fetch.max.wait.ms
减少请求次数 - 处理消息逻辑尽量高效,避免阻塞
- 合理设置
-
主题设计:
- 根据业务合理划分主题
- 适当设置分区数(通常是broker数量的1-3倍)
- 配置合理的消息保留策略
通过以上内容,你可以构建一个稳定、高效的 Spring Boot 与 Kafka 集成应用,并根据实际需求进行扩展和优化。