当前位置: 首页 > news >正文

Spring Boot 集成 Kafka 详解

Spring Boot 集成 Kafka 详解

Kafka 作为一款高性能的分布式消息队列,常被用于系统间解耦、异步通信和大数据处理等场景。下面详细介绍 Spring Boot 如何集成 Kafka,并提供实用示例。

一、核心依赖与版本兼容

Spring Boot 集成 Kafka 主要依赖 spring-kafka,需注意版本兼容性:

Spring Boot 版本Spring Kafka 版本Kafka 客户端版本
2.7.x2.8.x3.0.x
3.0.x3.0.x3.0.x
3.1.x3.1.x3.1.x
3.2.x3.2.x3.3.x

Maven 依赖配置:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、详细配置说明

1. 基础配置(application.yml)

spring:kafka:# Kafka服务器地址,集群用逗号分隔bootstrap-servers: localhost:9092# 生产者配置producer:# 键序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器(JSON格式)value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 消息确认机制:0-不确认,1-首领确认,all-所有副本确认acks: 1# 批量发送配置batch-size: 16384# 批量发送缓冲区大小buffer-memory: 33554432# 重试次数retries: 3# 重试间隔(毫秒)retry-backoff-ms: 1000# 消费者配置consumer:# 消费者组IDgroup-id: my-consumer-group# 键反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器(JSON格式)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 偏移量重置策略:earliest/latest/noneauto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: false# 自动提交偏移量的间隔(毫秒)auto-commit-interval: 1000# 反序列化信任的包properties:spring:json:trusted:packages: com.example.kafka.dto# 监听器配置listener:# 手动提交偏移量模式ack-mode: manual_immediate# 并发消费者数量concurrency: 3# 批量消费配置batch-listener: false# 记录监听异常log-container-config: true

2. 配置说明

  • 生产者配置

    • acks:控制消息持久化的确认级别,影响可靠性和吞吐量
    • batch-sizebuffer-memory:优化批量发送性能
    • retries:消息发送失败时的重试机制
  • 消费者配置

    • group-id:同一组内的消费者共同消费主题分区,避免重复消费
    • auto-offset-reset:无偏移量记录时的处理策略
    • enable-auto-commit:是否自动提交消费偏移量
  • 监听器配置

    • ack-mode:偏移量提交模式,manual_immediate 表示手动立即提交
    • concurrency:并发消费线程数,不应超过主题分区数

三、核心组件实现

1. 消息实体类

package com.example.kafka.dto;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO {private String id;private String content;private String sender;private LocalDateTime sendTime;
}

2. 生产者组件

package com.example.kafka.producer;import com.example.kafka.dto.MessageDTO;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.UUID;
import java.time.LocalDateTime;@Component
public class KafkaMessageProducer {private final KafkaTemplate<String, MessageDTO> kafkaTemplate;// 主题名称,建议通过配置注入private static final String DEFAULT_TOPIC = "user-messages";public KafkaMessageProducer(KafkaTemplate<String, MessageDTO> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到默认主题*/public void sendMessage(String content, String sender) {MessageDTO message = new MessageDTO(UUID.randomUUID().toString(),content,sender,LocalDateTime.now());sendMessageToTopic(DEFAULT_TOPIC, message);}/*** 发送消息到指定主题*/public void sendMessageToTopic(String topic, MessageDTO message) {// 发送消息,返回异步结果ListenableFuture<SendResult<String, MessageDTO>> future = kafkaTemplate.send(topic, message.getId(), message);// 处理发送结果future.addCallback(new ListenableFutureCallback<SendResult<String, MessageDTO>>() {@Overridepublic void onSuccess(SendResult<String, MessageDTO> result) {System.out.println("消息发送成功: " + "topic=" + result.getRecordMetadata().topic() + ", partition=" + result.getRecordMetadata().partition() +", offset=" + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息发送失败: " + ex.getMessage());// 可添加失败重试逻辑}});}
}

3. 消费者组件

package com.example.kafka.consumer;import com.example.kafka.dto.MessageDTO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaMessageConsumer {/*** 监听默认主题的消息*/@KafkaListener(topics = "user-messages",groupId = "my-consumer-group",containerFactory = "kafkaListenerContainerFactory")public void consumeMessage(ConsumerRecord<String, MessageDTO> record, Acknowledgment acknowledgment) {try {MessageDTO message = record.value();System.out.println("接收到消息: " + "id=" + message.getId() + ", sender=" + message.getSender() +", content=" + message.getContent());// 处理消息业务逻辑processMessage(message);// 手动提交偏移量acknowledgment.acknowledge();} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());// 可根据需求决定是否提交偏移量// 失败时不提交,会导致消息重新消费}}/*** 处理消息的业务逻辑*/private void processMessage(MessageDTO message) {// 实际业务处理逻辑// 例如:保存到数据库、调用其他服务等}/*** 监听多个主题的示例*/@KafkaListener(topics = {"system-notifications", "user-alerts"}, groupId = "notification-group")public void consumeNotifications(MessageDTO message) {System.out.println("接收到通知: " + message.getContent());}
}

四、高级特性使用

1. 批量消费配置

修改配置启用批量消费:

spring:kafka:listener:batch-listener: trueconsumer:max-poll-records: 50  # 每次拉取的最大记录数

批量消费实现:

@KafkaListener(topics = "batch-messages", groupId = "batch-group")
public void consumeBatch(List<ConsumerRecord<String, MessageDTO>> records, Acknowledgment acknowledgment) {System.out.println("接收到批量消息,数量: " + records.size());for (ConsumerRecord<String, MessageDTO> record : records) {try {processMessage(record.value());} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());}}// 批量提交偏移量acknowledgment.acknowledge();
}

2. 事务消息

启用 Kafka 事务支持:

spring:kafka:producer:transaction-id-prefix: tx-  # 事务ID前缀

事务消息使用:

@Service
public class TransactionalMessageService {private final KafkaTemplate<String, MessageDTO> kafkaTemplate;// 构造函数注入...@Transactionalpublic void sendTransactionalMessage(MessageDTO message) {// 发送事务消息kafkaTemplate.send("transactional-topic", message);// 数据库操作或其他事务操作// ...// 如果发生异常,消息会被回滚}
}

五、测试方法

1. 集成测试

package com.example.kafka;import com.example.kafka.dto.MessageDTO;
import com.example.kafka.producer.KafkaMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import static org.junit.jupiter.api.Assertions.assertTrue;@SpringBootTest
@EmbeddedKafka(partitions = 3,topics = {"test-topic", "user-messages"},brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}
)
public class KafkaIntegrationTest {@Autowiredprivate KafkaMessageProducer producer;private CountDownLatch latch = new CountDownLatch(1);private MessageDTO receivedMessage;// 动态配置Kafka服务器地址@DynamicPropertySourcestatic void kafkaProperties(DynamicPropertyRegistry registry) {Map<String, Object> props = KafkaTestUtils.producerProps("test-group");registry.add("spring.kafka.bootstrap-servers", props::get("bootstrap.servers"));}@Testpublic void testMessageSendingAndReceiving() throws InterruptedException {// 准备测试消息String testContent = "Test message content";String testSender = "test-user";// 发送消息producer.sendMessage(testContent, testSender);// 等待消息被消费boolean messageReceived = latch.await(10, TimeUnit.SECONDS);// 验证结果assertTrue(messageReceived, "消息未被消费");assertTrue(receivedMessage.getContent().equals(testContent), "消息内容不匹配");}// 测试用的临时消费者@KafkaListener(topics = "user-messages", groupId = "test-group")public void testConsumer(MessageDTO message) {receivedMessage = message;latch.countDown(); // 通知测试线程消息已接收}
}

2. 控制器测试接口

@RestController
@RequestMapping("/kafka")
public class KafkaTestController {@Autowiredprivate KafkaMessageProducer producer;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestParam String content, @RequestParam String sender) {producer.sendMessage(content, sender);return ResponseEntity.ok("消息已发送");}
}

使用 curl 测试:

curl -X POST "http://localhost:8080/kafka/send?content=Hello&sender=test"

六、性能优化建议

  1. 生产者优化

    • 启用批量发送(调整 batch-sizelinger.ms
    • 适当增大 buffer-memory
    • 使用压缩(compression-type: lz4
  2. 消费者优化

    • 合理设置 concurrency(建议等于分区数)
    • 增大 fetch.min.bytesfetch.max.wait.ms 减少请求次数
    • 处理消息逻辑尽量高效,避免阻塞
  3. 主题设计

    • 根据业务合理划分主题
    • 适当设置分区数(通常是broker数量的1-3倍)
    • 配置合理的消息保留策略

通过以上内容,你可以构建一个稳定、高效的 Spring Boot 与 Kafka 集成应用,并根据实际需求进行扩展和优化。

http://www.dtcms.com/a/427205.html

相关文章:

  • MQTT数据集成
  • 网站的会员系统怎么做电商小程序价格
  • Redis 进阶:跳出缓存局限!7 大核心场景的原理与工程化实践
  • 数据结构——LinkedList和链表
  • 一学一做专题网站哈尔滨黑大主题邮局
  • 基于类的四种设计模式
  • 用ChatGPT修改论文,如何在提升质量的同时降低AI检测风险?
  • 实验指导-基于阿里云Serverless应用引l擎SAE的服务部署迀移
  • 黔西县住房和城乡建设局网站个人网页制作方法
  • 长沙网站推广系统动态wordpress动态主题
  • 基于Matlab实现路径规划
  • 给定数据规模的ACM风格笔试题-子矩阵的最大累加和问题
  • 一站式服务图片wordpress博客整站源码
  • 明星粉丝网站怎么做建设银行手机银行官方网站下载安装
  • Spring boot中 限制 Mybatis SQL日志的大字段输出
  • SQL Server数据库事务日志问题的诊断与解法(从膨胀到瘦身)
  • Postgresql CLOG文件及其从库同步解析
  • wordpress 授权一个空间两个网站对seo
  • 正规的招聘网站永州市网站建设
  • 加强教育信息网站建设昆山建设工程安监站网站
  • EndoChat:面向内镜手术的基于事实依据的多模态大型语言模型|文献速递-文献分享
  • 零基础学AI大模型之ChatModel聊天模型与ChatPromptTemplate实战
  • 产生式规则对自然语言处理深层语义分析的影响与启示研究
  • web渗透之Python反序列化漏洞
  • 做办公用品网站工作计划黄页网站是什么
  • 论文阅读 (1) :Control Flow Management in Modern GPUs
  • 吉林省软环境建设网站网络营销属于哪个专业
  • iOS 26 系统流畅度检测 从视觉特效到帧率稳定的实战策略
  • 2025云栖大会,机器人商业时代降临
  • C++面向对象编程三大特性之一:多态