当前位置: 首页 > news >正文

java kafka

安装

安装下载

在这里插入图片描述
导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>kafka</artifactId><groupId>com.tt</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>kafkaProducer</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version> <!-- 根据需要选择合适的版本 --></dependency></dependencies></project>

创建producer项目

配置文件

# application.yaml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: myGroup#auto.offset.reset = latest 的含义:#若一个 Topic 有历史数据,但消费者组是首次启动且未提交过 offset,则不会消费历史消息,只会接收启动后新产生的数据。#earliest:无 offset 时从头消费,适合需处理全量数据的场景。#none:无 offset 时抛出异常,需手动处理,适合对重复消费敏感的业务。auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerapplication:name: producer
server:port: 9124

发送

package com.tt.control;import com.tt.common.Rs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/test")public Rs tt(){kafkaTemplate.send("tetete","6");return Rs.success();}}

创建consumer

依赖一样,配置文件一样
消费

package com.tt.control;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class HelloKafka {@KafkaListener(topics = "tetete")public void onMessage(String data){System.out.println(data);}
}

可视化工具

Kafka Tool

http://www.dtcms.com/a/152271.html

相关文章:

  • servlet-优化
  • 数据仓库是什么?数据仓库架构有哪些?
  • C++/Qt中QActionGroup类用法
  • 6.第六章:数据分类的技术体系
  • 形象解释 HTTP 的四种常见请求方式及其中的区别联系
  • DDD领域驱动与传统CRUD
  • Datawhale AI春训营——用AI帮助老人点餐
  • 前端跨端框架的开发以及IOS和安卓的开发流程和打包上架的详细流程
  • (done) 吴恩达版提示词工程 3. 迭代 (控制输出长度、提取特定细节、输出 HTML 格式)
  • Ubuntu下软件运行常见异常退出问题汇总分析
  • Qt本地化 - installTranslator不生效
  • HarmonyOs @hadss/hmrouter路由接入
  • 外观模式:简化复杂系统接口的设计模式
  • RS232转ProfibusDP网关:连接未来传感器的关键
  • 4.1 融合架构设计:LLM与Agent的协同工作模型
  • 2025上海车展:光峰科技全球首发“灵境”智能车载光学系统
  • 倚光科技:柱面透镜加工工艺详解,解锁光学新境界
  • 构建企业官方网站有哪些必备因素?
  • vue3--手写手机屏组件
  • java Springboot使用扣子Coze实现实时音频对话智能客服
  • dockercompose文件仓库
  • Ubuntu22学习记录
  • 部署本地deepseek并在调用的详细步骤以及解决一些可能出现的问题(Windows,Linux, WSL)
  • 【数据可视化-30】Netflix电影和电视节目数据集可视化分析
  • 【记录手贱bug日常】IDEA 配置vmoptions后打不开,重新安装,删注册表均无用
  • ESP32_IDF_VScode安装多版本共存
  • 解决VSCode每次SSH连接服务器时,都需要下载vscode-server
  • HTML5 详细学习笔记
  • 【AI】基于OllamaSharp与.NET Core API的高效LLM查询实现
  • Wan2.1和HunyuanVideo文生视频模型算法解析与功能体验丨前沿多模态模型开发与应用实战第六期