当前位置: 首页 > news >正文

Spring Boot + Spring Kafka 集成

Spring Boot 集成 org.springframework.kafka 的完整代码示例,包含 pom 依赖、配置类、生产者、消费者、测试入口,做到开箱即用。


1. pom.xml 依赖

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.2.4</version> <!-- 选用最新稳定版本 --></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></dependency>
</dependencies>

2. application.yml 配置

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:group-id: test-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest

3. Kafka 配置类(可选,定制用)

package com.example.kafkademo.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaTopicConfig {// 自动创建一个 topic(分区=3,副本=1)@Beanpublic NewTopic topic() {return new NewTopic("test-topic", 3, (short) 1);}
}

4. 生产者 Service

package com.example.kafkademo.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;private static final String TOPIC = "test-topic";public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// 发送消息public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message).thenAccept(result -> {System.out.printf("Sent message=[%s] with key=[%s] to partition=[%d] offset=[%d]%n",message, key,result.getRecordMetadata().partition(),result.getRecordMetadata().offset());});}
}

5. 消费者 Listener

package com.example.kafkademo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerListener {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(ConsumerRecord<String, String> record) {System.out.printf("Consumed message: key=%s value=%s partition=%d offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}
}

6. 控制器测试入口

package com.example.kafkademo.controller;import com.example.kafkademo.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}@GetMapping("/send")public String send(@RequestParam String key, @RequestParam String msg) {producerService.sendMessage(key, msg);return "Message sent: " + msg;}
}

7. 启动类

package com.example.kafkademo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}

8. 使用方式

  1. 启动 Kafka & Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    
  2. 启动 Spring Boot 应用

    mvn spring-boot:run
    
  3. 在浏览器/命令行发送消息

    curl "http://localhost:8080/kafka/send?key=1&msg=HelloKafka"
    
  4. 控制台会看到消费者打印出消费到的消息。


http://www.dtcms.com/a/337284.html

相关文章:

  • 深层语义知识图谱:提升NLP文本预处理效果的关键技术
  • 《基于改进 MobileNetV2 的轻量化茶叶病虫害检测方法》论文解析
  • Redis--day8--黑马点评--分布式锁(一)
  • HTML应用指南:利用POST请求获取全国华为旗舰店门店位置信息
  • Python函数:装饰器
  • c++最长上升子序列长度
  • 雷卯针对香橙派Orange Pi 5 Plus开发板防雷防静电方案
  • JavaWeb 请求与响应乱码问题全面解决方案
  • React diff——差异协调算法简介
  • 算法-决策树
  • 从决策树基础到熵与信息增益
  • 网络间的通用语言TCP/IP-网络中的通用规则1
  • 本地文件上传到gitee仓库的详细步骤
  • sem_post函数的定义及作用
  • 【81页PPT】国内某知名大型制药企业制药数字化转型项目汇报方案(附下载方式)
  • 无人设备遥控器之操控信号精度篇
  • 【68页PPT】MES系统数字化工厂解决方案(附下载方式)
  • 深入剖析以太坊虚拟机(EVM):区块链世界的计算引擎
  • go 多版本共存【goup + alias方案】
  • React diff Vue diff介绍
  • 【2025CVPR-目标检测方向】RaCFormer:通过基于查询的雷达-相机融合实现高质量的 3D 目标检测
  • 牛子图论进阶
  • TEST_
  • Linux系统启动原理及故障排除
  • 场外个股期权的行权日是t+多少个交易日?
  • 【牛客刷题】最大公约数与最小公倍数:算法详解与实现
  • linux中的hostpath卷与nfs卷以及静态持久卷的区别
  • JAiRouter 架构揭秘:一个面向 AI 时代的响应式网关设计
  • Flutter 自定义 Switch 切换组件完全指南
  • 技术经典场景之协议转换