当前位置: 首页 > news >正文

spring boot3 kafka集群搭建到使用

首先自行安装docker,通过docker容器安装kafka
CentOS 系统 docker安装地址

 1.pom.xml和application.properties或者application.yml文件配置

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    bootstrap-servers: [fafka地址1,fafka地址2,....]
#    producer序列化设置
    producer:
      #key序列化设置,设置成json对象
#      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#    val序列化设置,设置成json对象
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

2.博主安装了kafka ui插件,就直接创建主题了

当前一个集群,因为博主只搭建了一台服务器,也可以称为一个节点

创建主题

没有安装kafka ui,就再main那里启动项目时创建

package com.atguigu.boot3_08_kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;

@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(Boot308KafkaApplication.class, args);
        TopicBuilder.name("my-new-topic")//主题
                .partitions(3)//分区
                .replicas(2)//副本
                .build();
    }

}

副本就是备份,有几节点就可以创建几个副本,副本数量一般采取分区数量-1,只有一个节点就N分区1副本


 3.在main 加上这个注解@EnableKafka

package com.atguigu.boot3_08_kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(Boot308KafkaApplication.class, args);

    }

}

4.生产者发送消息

package com.atguigu.boot3_08_kafka.controller;

import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("/jjj") 
    public String hello() {
        kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主题", 分区号,"key","val")
        return "ok";
    }

    @GetMapping("/odj")
    public String odj() {
        kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//对象json需要序列化,可用配置文件配置,也可以在对象中序列化对象
        return "OK";
    }
}

5.消费者监听消息

package com.atguigu.boot3_08_kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
public class MykafkaListener {

    /**
     * 默认的监听是从最后一个消息开始拿,也就是只会拿新消息,不会拿历史的
     * @KafkaListener(topics = "主题",groupId = "用户组")
     * ConsumerRecord 消费者从 Kafka 获取消息的各种元数据和实际的消息
     * @param record
     */
    @KafkaListener(topics = "tach",groupId = "teach")
    public void listen(ConsumerRecord<?, ?> record) {
        Object key = record.key();
        Object val = record.value();
        System.out.println("收到值key:"+key+"收到值val:"+val);
    }

    /**
     *  想要到历史的消息或者全部消息,只能设置偏移量
     *  @KafkaListener(groupId = "用户组" ,topicPartitions = {设置分区,设置偏移量})
     *  @TopicPartition(topic = "主题" ,partitionOffsets 设置偏移量)
     *  @PartitionOffset(partition = "哪个分区", initialOffset = "从第几个偏移量开始")
     *
     * @param record
     */
    @KafkaListener(groupId = "teach" ,topicPartitions = {
            @TopicPartition(topic = "tach" ,partitionOffsets = {
                    @PartitionOffset(partition = "0", initialOffset = "0")
            })
    })
    public void listens(ConsumerRecord<?, ?> record) {
        Object key = record.key();
        Object val = record.value();
        System.out.println("收到值key:"+key+"收到值val:"+val);
    }
}

最后查看结果


最后补充一个小知识

groupId = "用户组"

组里的成员是竞争模式

用户组和用户组之间是发布/订阅模式

由zookeeper分配管理

好了可以和面试官吹牛逼了


课外话

如果是传对象json需要序列化,创建对象时序列化,不推荐太原始重要是很占资源

因为开始我们都配置好了,有对象就会自动序列化

package com.atguigu.boot3_08_kafka.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推荐implements Serializable 
    private Long id;
    private String name;
    private Integer age;
}

相关文章:

  • AI大模型测试用例生成平台
  • mysql 到 doris 挪移数据
  • IDEA中链接使用mysql数据库
  • 如何在 React 中实现错误边界?
  • 一场因黄焖鸡引发的技术方案大作战
  • js版本之ES12(2021)、ES13(2022)新特性(九)
  • 封装Axios拦截器实现用户无感刷新AccessToken实践指南
  • 简单创建一个Django项目并配置neo4j数据库
  • Scratch 3.0安装包,支持Win7/10/11、Mac电脑手机平板、少儿便编程的启蒙软件。
  • SQL99 多表查询
  • 成功破解加密机制,研究人员解锁LinuxESXi Akira勒索软件
  • 单片机技术
  • C++复试笔记(三)
  • flutter实践:断点调试踩坑
  • 循环遍历 Java 集合中元素的方法总结
  • 前端开发:混合技术栈的应用
  • 基于异构特征融合与轻量级集成学习的软件漏洞挖掘方案设计与Python实现
  • Spring Boot + InfluxDB 实现高效数据存储与查询
  • 总结 HTTPS 的加密流程
  • markdown 转 word 工具 ‌Pandoc‌
  • 鸿海下调全年营收展望:AI服务器业务强劲,预计今年营收增超50%
  • 30平米的无障碍酒吧里,我们将偏见折叠又摊开
  • 气候多米诺:厄尔尼诺与东南亚跨境害虫或威胁中国粮食安全
  • GDP逼近五千亿,向海图强,对接京津,沧州剑指沿海经济强市
  • 四部门:到2025年底,全国行政村5G通达率超过90%
  • 秦洪看盘|交易新逻辑,银行股成A股稳定器