当前位置: 首页 > news >正文

[0689].第04节:Kafka与第三方的集成 – Kafka集成SpringBoot

Kafka笔记大纲


SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者
在这里插入图片描述


一、SpringBoot 环境准备

1.1.创建一个 Spring Initializr

在这里插入图片描述

1.2.引入场景启动器:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <groupId>com.jianqun</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-kafka</name>
    <description>springboot-kafka</description>


    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.jianqun.springbootkafka.SpringbootKafkaApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

1.3.编写配置文件:

# 应用服务 WEB 访问端口
server.port=8080

# 应用名称
spring.application.name=jianqun_springboot_kafka

# 指定 kafka 的地址
spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 生产者配置
# 指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer


# 消费者配置
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定消费者组的 group_id
spring.kafka.consumer.group-id=jianqun
# =========消费者配置结束=========

二、编写生产端消息:

  • 1.编写controller接口,当收到请求的时候,模式发送一条数据到kafka中
package com.jianqun.springbootkafka.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;

public class ProducerController {

    // Kafka 模板用来向 kafka 发送数据
    @Autowired
    KafkaTemplate<String, String> kafka;

    @RequestMapping("/a")
    public String data() {
        String msg = "生产端消息";
        kafka.send("firstopic", msg);
        return "ok";
    }
}

三、编写消费者端消息:

  • 1.消费者代码:
package com.jianqun.springbootkafka.kafka;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics = "firstopic")
    public void consumeTopic(String msg) {
        // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }
}

四、测试:

在这里插入图片描述

http://www.dtcms.com/a/20101.html

相关文章:

  • 《Spring实战》(第6版)第1章 Spring起步
  • 【数据结构】(9) 优先级队列(堆)
  • floodfill算法系列一>太平洋大西洋水流问题
  • 【第3章:卷积神经网络(CNN)——3.6 CNN的高级特性与优化策略】
  • 如何使用 DeepSeek 和 Dexscreener 构建免费的 AI 加密交易机器人?
  • EasyExcel 复杂填充
  • DeepSeek接入网络安全领域,AI高效驱动,重新定义网络防御边界!
  • UniApp 中制作一个横向滚动工具栏
  • MyBatis:动态SQL高级标签使用方法指南
  • 数据管理的四大基石:通俗解读数据中台、数据仓库、数据治理和主数据
  • 《千多桃花一世开》:南胥月为何爱暮悬铃
  • 输电杆塔沉降智能监测系统:如何用数据守护电网安全
  • ASP.NET Core SixLabors.ImageSharp v1.0 的图像实用程序类 web示例
  • uniapp可视化-活动报名表单系统-代码生成器
  • 【云安全】云原生- K8S kubeconfig 文件泄露
  • java中map实现函数式编程的原理
  • LVDS接口总结--(5)IDELAY3仿真
  • CEF132 编译指南 Linux 篇 - 开启 CEF 之旅:框架概览(一)
  • 微信小程序的请求函数封装(ts版本,uniapp开发)
  • 【电路笔记】-双向计数器
  • 【VUE】前端工程化与前端工程化与webpack
  • css简介
  • 哈希表(C语言版)
  • 【stm32】定时器输出PWM波形(hal库)
  • Qt的QPushButton样式设置
  • 半遮挡检测算法 Detecting Binocular Half-Occlusions
  • 19.4.2 -19.4.4 新增、修改、删除数据
  • SpringBoot服务器的采购上,服务器的数量和性能配置如何取舍【无标题】
  • Day2 25/2/15 SAT
  • electron打包基本教程