当前位置: 首页 > news >正文

Spring Boot整合Kafka详细指南(JDK 1.8)

1. 基础知识

Spring Boot 是一个用于简化Spring应用开发的框架,它提供了自动配置、内嵌服务器、依赖管理等功能,让开发者能够快速构建应用。

Kafka 是一个分布式的流处理平台,具有高吞吐量、持久化、可扩展等特性,常用于构建实时数据管道和流式应用。

整合两者的优势

  • Spring Boot提供了对Kafka的自动配置支持
  • 简化了Kafka的配置和使用
  • 提供了便捷的注解式编程模型
  • 易于集成到Spring生态系统中的其他组件

2. 环境准备

在开始之前,确保您已经准备好以下环境:

  1. JDK 1.8

    • 如未安装,从Oracle官网下载并安装
    • 确认安装成功:在命令行输入 java -version
  2. Maven或Gradle

    • 推荐使用Maven 3.5+
    • 确认安装成功:在命令行输入 mvn -v
  3. Kafka服务器

    • 在Windows上安装Kafka(参考前文"Windows系统下Kafka的安装指南")
    • 确保Zookeeper和Kafka服务器已启动
    • 默认Kafka服务地址:localhost:9092
    • 默认Zookeeper地址:localhost:2181
  4. IDE

    • 推荐使用IntelliJ IDEA或Spring Tool Suite
    • 确保IDE能够支持Spring Boot和Maven/Gradle项目

3. 创建Spring Boot项目

有两种方式可以创建Spring Boot项目:

方式一:使用Spring Initializr网站

  1. 访问 Spring Initializr
  2. 选择以下配置:
    • Project: Maven
    • Language: Java
    • Spring Boot: 2.7.x(兼容JDK 1.8)
    • Group: com.example(或您喜欢的包名)
    • Artifact: kafka-demo(或您喜欢的项目名)
    • Packaging: Jar
    • Java: 8
  3. 添加依赖:
    • Spring for Apache Kafka
    • Spring Web
    • Spring Boot DevTools(可选,用于开发时自动重启)
  4. 点击"Generate"下载项目压缩包
  5. 解压并用IDE导入项目

方式二:使用IDE创建

以IntelliJ IDEA为例:

  1. 选择 File -> New -> Project...
  2. 选择Spring Initializr,配置与上方相同
  3. 点击Next,然后Finish

4. 添加依赖

如果您的项目已经包含了所需依赖,可以跳过这一步。如果需要手动添加,请在pom.xml文件中添加以下内容:

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- Lombok (可选,用于简化代码) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- Spring Boot DevTools (可选) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    
    <!-- Spring Boot Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    
    <!-- Spring Kafka Test -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5. Kafka配置

application.yml(或application.properties)中添加Kafka相关配置:

YAML格式配置 (application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 键和值的序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 生产者确认机制 (0:不等待确认, 1:等待leader确认, all:等待全部副本确认)
      acks: all
      # 重试次数
      retries: 3
      # 批量大小
      batch-size: 16384
      # 请求的最大字节数
      buffer-memory: 33554432
    consumer:
      # 消费者组ID
      group-id: order-group
      # 自动提交偏移量
      enable-auto-commit: true
      # 自动提交间隔
      auto-commit-interval: 1000ms
      # 键和值的反序列化器
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当没有初始偏移量或当前偏移量不存在时的处理
      auto-offset-reset: earliest

# 应用自定义配置
app:
  kafka:
    order-topic: order-topic
    notification-topic: notification-topic

属性格式配置 (application.properties)

如果您更喜欢使用properties格式,可以使用以下配置:

# Kafka服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 消费者配置
spring.kafka.consumer.group-id=order-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000ms
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest

# 应用自定义配置
app.kafka.order-topic=order-topic
app.kafka.notification-topic=notification-topic

6. 实际应用案例:订单通知系统

我们将创建一个简单的订单通知系统,它包含以下功能:

  1. 接收新订单请求
  2. 将订单信息发送到Kafka
  3. 消费订单消息并生成通知
  4. 将通知发送到另一个Kafka
http://www.dtcms.com/a/99440.html

相关文章:

  • Flutter环境搭建
  • JDK1.8和Maven、Git安装教程自用成功
  • 【MySQL基础】函数之字符串函数详解
  • JVM Java类加载 isInstance instanceof 的区别
  • 洛谷题单1-P5703 【深基2.例5】苹果采购-python-流程图重构
  • JDBC的详细使用
  • 【零基础入门unity游戏开发——2D篇】2D物理关节 —— Joint2D相关组件
  • [Lc4_dfs] 解数独 | 单词搜索
  • PyQt6实例_批量下载pdf工具_界面开发
  • MDK中结构体的对齐、位域、配合联合体等用法说明
  • C#:第一性原理拆解属性(property)
  • 分享一个Pyside6实现web数据展示界面的效果图
  • Springboot学习笔记3.20
  • SmolDocling文档处理模型介绍
  • Python 循环全解析:从语法到实战的进阶之路
  • 人工智能之数学基础:矩阵的相似变换的本质是什么?
  • DeepSeek网络拓扑设计解密:如何支撑千卡级AI训练的高效通信?
  • 缓存 vs 分布式锁:高并发场景下的并发控制之道
  • 【C++】类和对象(二)默认成员函数之拷贝构造函数、运算符重载、赋值运算符重载
  • <tauri><rust><GUI>基于rust和tauri,实现一个大寰电爪PGHL(串口设备)定制化控制程序
  • java pom文件加入这个可以将打包好的jar 双击运行
  • 积分赛——光敏控制多功系统设计
  • 区块链赋能,为木材货场 “智” 造未来
  • vue在线录音系统
  • Redis延时队列在订单超时未报到场景的应用补充说明
  • 利用 VSCode 配置提升 vibe coding 开发效率
  • 找python大数据就业,我应该学习Java哪些知识
  • dav_pg8_vacuum
  • c#的.Net Framework 的console 项目找不到System.Window.Forms 引用
  • VMware中新建Ubuntu虚拟机系统,并安装Anaconda