当前位置: 首页 > news >正文

SpringBoot整合Kafka

SpringBoot整合Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Windows版本下载地址:https://kafka.apache.org/downloads

启动服务器

Kafka服务器的功能相当于RocketMQ中broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,他的作用是注册中心,相关知识请自行百度zookeeper,后续我也会补充zookeeper学习心得。

zookeeper-server-start.bat ..\..\config\zookeeper.properties   #启动zookeeper
kafka-server-start.bat ..\..\config\server.properties          #启动Kafka

运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口:2181。

运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口:9092。

创建主题

和之前操作其他MQ产品相似,Kafka也是基于主题操作,操作之前需要先初始化topic。

#创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
#删除topic
kafka-topic.bat --delete --zookeeper localhost:2181 --topic test
测试服务器启动状态

Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。

kafka-console-producer.bat --broker-list localhost:9092 --topic test    #测试生产消费
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning   #测试消息消费
Spring Boot整合Kafka
1、引入spring boot整合Kafka的依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
2、配置Kafka的服务器地址
spring:
	kafka:
		bootstrap-servers: localhost:9092   #配置服务器地址
		consumer:
			group-id: order                 #配置消费组
3、使用KafkaTemplate操作Kafka
@Service
public class KafkaMessageServiceImpl implements MessageService{
    @Autowried
    private KafkaTemplate<String, String> template;
    
    //同步消息生产推送
    @Override
    void sendMessage(String id){
        System.out.println("准备推送订单消息,已纳入队列 :" + id);
        //定义topic主题test
        template.send( "test", id);
    }
}
4、使用KafkaListener监听器监听消息,处理消息消费.
//消息监听器
@Component
class KafkaMessageListener {
    
    //Kafka监听,监听对应的topic主题
    @KafkaListener(topics="test")
    void onMessage(ConsumerRecord<String, String> record){
        //消费业务处理
        System.out.println("已经完成业务消息消费,消费的数据是:" + record.value());
    }
}

相关文章:

  • 【一句话经验】ubuntu vi/vim 模式自动设置为paste
  • 网络安全之tcpdump工具
  • Spring Boot3整合Knife4j(4.5.0)
  • 一、docker的安装
  • LVTTL(Low Voltage Transistor-Transistor Logic)电平详解
  • Torch 模型 model => .onnx => .trt 及利用 TensorTR 在 C++ 下的模型部署教程
  • opencv-显示图片
  • python连接数据库速查示例
  • 【python实战】-- 选择解压汇总mode进行数据汇总20250310更新
  • 探讨如何在AS上构建webrtc(3)分享WebRTC-CMakeBuild轻量化工程
  • C语言处理字符串的十个函数(附带大量实例)
  • 前端生成二维码 + 条形码
  • Python 异步编程入门:从零到上手 asyncio 协程(附实战案例)
  • Taro 面试题
  • 前端项目Axios封装Vue3详细教程(附源码)
  • 基于 Qwen2.5-14B + Elasticsearch RAG 的大数据知识库智能问答系统
  • 23种设计模式
  • 用物理信息神经网络(PINN)解决实际优化问题:全面解析与实践
  • 途游游戏25届AI算法岗内推
  • AMIS低代码构建系统,定制界面
  • 网站建设ppt模板/武汉seo和网络推广
  • 积分商城网站建设/什么是网络营销的核心
  • 上海做网站设计/怎样建立网站平台
  • 自己做一个模版网站是怎么做的/佛山网站建设模板
  • 360检测网站开发语言的工具/网络整合营销4i原则
  • 城管网站建设材料/网络营销推广的方法