当前位置: 首页 > news >正文

Spring Boot整合Kafka的详细步骤

1. 安装Kafka

  1. 下载Kafka:从Kafka官网下载最新版本的Kafka。

  2. 解压并启动

    • 解压Kafka文件后,进入bin目录。

    • 启动ZooKeeper:./zookeeper-server-start.sh ../config/zookeeper.properties

    • 启动Kafka:./kafka-server-start.sh ../config/server.properties

    • 确认启动成功后,Kafka服务即可使用。

2. 创建Spring Boot项目

  1. 在Spring Initializr创建一个新项目,选择需要的依赖(如Spring Web和Spring Kafka)。

  2. 下载并解压项目,导入到IDE中。

3. 添加Kafka依赖

pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

这个依赖会自动配置Spring Kafka的相关组件。

4. 配置Kafka

application.yml中添加Kafka的配置:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

这里配置了Kafka服务器地址、消费者组、序列化器等。

5. 创建Kafka生产者

  1. 创建生产者配置类

@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
  1. 创建生产者服务类

@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(success -> System.out.println("Message sent successfully: " + message),failure -> System.err.println("Failed to send message: " + failure.getMessage()));}
}

通过KafkaTemplate发送消息。

6. 创建Kafka消费者

  1. 创建消费者服务类

@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}

使用@KafkaListener注解监听指定主题并接收消息。

7. 测试应用

  1. 创建一个控制器,用于发送消息:

@RestController
public class KafkaController {private final KafkaProducerService kafkaProducerService;public KafkaController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducerService.sendMessage("my-topic", message);return "Message sent";}
}
  1. 启动Spring Boot应用,通过访问http://localhost:8080/send?message=HelloKafka发送消息。

通过以上步骤,你可以在Spring Boot中成功集成并使用Kafka。

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dtcms.com/a/135970.html

相关文章:

  • 局域网内Docker镜像共享方法
  • 【李宏毅深度学习——分类模型的PyTorch架构】Homework 2:Phoneme Classification
  • Docker镜像迁移指南:从Windows构建到Ubuntu运行
  • halcon模板匹配(五)find_shape_model_clutter
  • Jetpack Compose 跨组件通信:全面指南与最佳实践
  • 数据库勒索病毒威胁升级:企业数据安全防线如何用安当RDM组件重构
  • 光刻机研发与市场现状分析报告
  • 关于k8s的部署
  • shell 编程之正则表达式与文本处理器
  • 【Web API系列】Web Shared Storage API之WorkletSharedStorage深度解析与实践指南
  • 下篇:《高阶排序算法:分治思想与性能突破》
  • 在多系统环境中实现授权闭环,Tetra Pak 借助CodeMeter打造食品工业的安全自动化体系
  • 使用 Azure AKS 保护 Kubernetes 部署的综合指南
  • 使用 PyTorch 构建 UNet 图像去噪模型:从数据加载到模型训练的完整流程
  • C++ 文件操作(文本文件)
  • 【Android学习记录】工具使用
  • DAY08:【pytorch】模型容器
  • 数据结构学习笔记 :基本概念、算法特性与线性表实现
  • PyTorch的benchmark模块
  • 基于量子扩散模型的3D场景实时生成:突破传统渲染的次世代技术
  • 【blender小技巧】使用blender的Cats Blender Plugin插件将3D人物模型快速绑定或者修复为标准的人形骨骼
  • 《解锁计算机专业:从入门到未来》
  • TextIn ParseX文档解析参数使用指南(第一期)
  • 点评项目回顾
  • 【linux】命令收集
  • 智能运维新范式
  • Redis字符串类型实战:解锁五大高频应用场景
  • MCP简介:重构人机交互底层逻辑
  • 【Linux网络与网络编程】11.数据链路层mac帧协议ARP协议
  • 博客文章文件名该怎么取?