当前位置: 首页 > news >正文

kafka服务端和springboot中使用

kafka服务端和springboot中使用

一、kafka-sever安装使用

下载kafka-server

https://kafka.apache.org/downloads.html

启动zookeeper
zookeeper-server-start.bat config\zookeeper.properties	
启动kafka-server
kafka-server-start.bat config\server.properties
创建主题
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看主题列表
kafka-topics.bat --list --bootstrap-server localhost:9092
创建生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic test

在生产者控制台可以发消息

创建消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning   

在消费者端可以收消息

查看主题详情
kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test
删除主题
kafka-delete-records.bat --bootstrap-server localhost:9092 --offset-json-file d:\delete_script.json

二、springboot中使用kafka

在Spring Boot中使用Kafka,你可以通过集成spring-kafka库来实现。这个库提供了对Apache Kafka的全面支持,包括生产者和消费者的功能。

添加依赖

首先,在你的pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
配置Kafka属性

在application.properties或application.yml文件中配置Kafka的相关属性。这里以application.properties为例:

# Kafka Producer Configuration
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# Kafka Consumer Configuration
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
创建消息生产者

创建一个类用于发送消息到Kafka主题:

package space.goldchen.kafka.kafka;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/*** kafka生产者* @author 2021* @create 2025-04-16 16:27*/
@Service
@Slf4j
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息* @param topic 主题* @param message 要发送的消息*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);// 用log.info重写下边的日志log.info("Message sent to topic: {}, Message:{}" , topic, message);}
}
创建消息消费者

创建一个类用于从Kafka主题接收消息:

package space.goldchen.kafka.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/*** kafka消费者** @author 2021* @create 2025-04-16 16:28*/@Service
@Slf4j
public class KafkaConsumer {/*** my-topic 消费者消费消息* @param message*/@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")public void consume(String message) {log.info("[my-topic]Received message: {}", message);}
}
测试Kafka集成

最后,编写一个简单的测试类来验证Kafka的生产和消费功能:

package space.goldchen.kafka.kafka;/*** kafka接口,调用生产者发送消息* @author 2021* @create 2025-04-16 16:28*/import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Resourceprivate KafkaProducer kafkaProducer;/*** 使用KafkaProducer发送消息* @param message* @return*/@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent: " + message;}
}

以上代码展示了如何在Spring Boot应用程序中配置和使用Kafka。确保你已经在本地或其他环境中启动了Kafka服务器,并且监听端口为9092(或者根据实际情况调整配置)

http://www.dtcms.com/a/136136.html

相关文章:

  • Excel数据自动填充到Word自定义表格
  • OpenCV day4
  • ESP-ADF外设子系统深度解析:esp_peripherals组件架构与核心设计(输入类外设之按键Button)
  • Spark-SQL核心编程3
  • Python爬虫第15节-2025今日头条街拍美图抓取实战
  • jupyter 文件浏览器,加强版,超好用,免费exe
  • 工业数据治理范式革新:时序数据库 TDengine虚拟表技术解析
  • 【Web APIs】JavaScript 操作多个元素 ③ ( 鼠标经过高亮显示 | onmouseover 事件设置 | onmouseout 事件设置 )
  • docker 安装TDengine 时序数据库
  • ARINC818协议(二)
  • 并查集(力扣1971)
  • 如何在爬虫中合理使用海外代理?在爬虫中合理使用海外ip
  • SpringCloud Alibaba微服务工程搭建
  • HOW - 企业团队自建 npm 仓库
  • 大模型——理解Crawl4AI 中的爬取请求参数
  • 【数据分享】全球1200多个城市的建筑高度矢量数据(不包括中国/免费获取)
  • Sonatype Nexus Repository Docker部署
  • Windows环境下,Jenkins+Gitee的CICD
  • Servlet简单示例
  • idea报错java: 非法字符: ‘\ufeff‘解决方案
  • 如何在 IntelliJ IDEA 中安装通义灵码 - AI编程助手提升开发效率
  • Python(17)Python字符编码完全指南:从存储原理到乱码终结实战
  • VScode+OpenOCD+HTOS栈回溯在国产mcu芯片上完全调试
  • 十倍开发效率 - IDEA 插件之RestfulBox - API
  • LangGraph中预构件,creat_react_agent的实现流程
  • 解决 Kubernetes 调度器启动报错:缺少 Bind 插件
  • PFDF-SPWM(并联续流-倍频正弦脉宽调制)
  • 尚硅谷-react[1-6集]
  • Vue 3 中 ref和reactive的详细使用场景
  • 使用阿里云创建公司官网(使用wordpress)