当前位置: 首页 > news >正文

kafka快速部署、集成、调优

kafka快速部署、集成、调优

1.部署

1.1 docker部署

参考:https://blog.csdn.net/taotao_guiwang/article/details/135508643
在这里插入图片描述

1.2 kafka部署

  • 资源见,百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac
    在这里插入图片描述

  • 修改配置文件,说明要在那个机器上部署:
    在这里插入图片描述

  • 执行kafka_zk_install.sh,开始部署:

# 赋权,在kafka_zk_install目录,执行:
chmod -R 777 .
# 执行
./kafka_zk_install.sh

在这里插入图片描述

2. spring boot集成

核心代码,KafkaController.java:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}

MyConsumer.java:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}

application.yml:

server:port: 8080spring:kafka:bootstrap-servers: 10.86.97.210:9092,10.86.97.210:9093,10.86.97.210:9094producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

浏览器访问:
http://localhost:8080/send

控制台打印:
在这里插入图片描述

3.相关资源

百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac

在这里插入图片描述

http://www.dtcms.com/a/309106.html

相关文章:

  • 超越 ChatGPT:智能体崛起,开启全自主 AI 时代
  • 中英混合的语音识别XPhoneBERT 监督的音频到音素的编码器结合 f0 特征LID
  • 阿里云微服务引擎 MSE 及 API 网关 2025 年 7 月产品动态
  • 单变量单步时序预测:CNN-LSTM卷积神经网络结合长短期记忆神经网络
  • MybatisPlus如何用wrapper语句灵活连接多查询条件
  • SpringBoot+LangChain4j解析pdf文档,不使用默认解析器
  • 解决VScode加载慢、保存慢,git加载慢,windows11系统最近异常卡顿的问题
  • 高端房产管理小程序
  • 【Ubuntu】安装使用pyenv - Python版本管理
  • ORACLE函数
  • JVM垃圾回收算法和分代收集算法的区别
  • 插件升级:Chat/Builder 合并,支持自定义 Agent、MCP、Rules
  • 深度学习(鱼书)day08--误差反向传播(后三节)
  • Day 28:类的定义和方法
  • 属性的运用和理解
  • 赛博算命之八字测算事业运势的Java实现(四柱、五行、十神、流年、格局详细测算)
  • Redisson实现Redis分布式锁的原理
  • Windows和Linux的tree工具
  • 【智能协同云图库】第七期:基于AI调用阿里云百炼大模型,实现AI图片编辑功能
  • 渗透测试报告通常包含哪些关键内容?
  • redis快速部署、集成、调优
  • Linux通用SPI作为Master——回环测试
  • Redis学习-----Redis的基本数据类型
  • Dify版本升级实操
  • Edge中如何找到原IE浏览器的Internet选项
  • 基于html,css,jquery,django,lstm,cnn,tensorflow,bert,推荐算法,mysql数据库
  • 8月1日RED指令强制生效,您的设备准备好了吗?
  • uniapp 开发微信小程序,获取经纬度(uni.getLocation)并且转化详细地址(‌高德地图逆地理编码API、‌腾讯地图逆地理编码)
  • 【华为机试】127. 单词接龙
  • Python match-case 模式匹配详解