当前位置: 首页 > news >正文

flume抽取kafka数据到kafka,数据无法从topicA抽取到topicB

最近做一个项目的时候,发现一个特别诡异的问题,kafka中有topicA,topicB两个主题,然后我通过flume抽取kafka中topicA中的数据,能够成功,flume的conf如下:

tier1.sources = source1
tier1.channels = c1
tier1.sinks = k1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = c1
tier1.sources.source1.batchSize = 100
tier1.sources.source1.kafka.bootstrap.servers = bigdata01:9092
tier1.sources.source1.kafka.topics = topicA
tier1.sources.source1.kafka.consumer.group.id = yuekaotier1.sources.source1.kafka.consumer.auto.offset.reset=earliesttier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = com.bigdata.YueKaoInterceptor$BuilderEventtier1.channels.c1.type = memory
tier1.channels.c1.capacity = 10000
tier1.channels.c1.transactionCapacity = 1000tier1.sinks.k1.type = logger
tier1.sinks.k1.channel = c1

然后,接着将结果抽取到topicB中,配置文件如下:

tier1.sources = source1
tier1.channels = c1
tier1.sinks = k1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = c1
tier1.sources.source1.batchSize = 100
tier1.sources.source1.kafka.bootstrap.servers = bigdata01:9092
tier1.sources.source1.kafka.topics = topicA
tier1.sources.source1.kafka.consumer.group.id = yuekao3tier1.sources.source1.kafka.consumer.auto.offset.reset=latesttier1.channels.c1.type = memory
tier1.channels.c1.capacity = 100000
tier1.channels.c1.transactionCapacity = 10000
tier1.channels.c1.byteCapacityBufferPercentage = 20
tier1.channels.c1.byteCapacity = 800000tier1.sinks.k1.channel = c1
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.kafka.topic = topicB
tier1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092

发现一个问题:

flume将topicA中的数据抽取出来之后,继续发送给topicA,topicA自循环啦,topicB中压根没数据,于是发现官网中有这么一段解释:

也就是说如果想把结果发送topicB, 可以在event的header中添加  topic=topicB,

那就只剩两个办法啦,一个是自定义拦截器,一个是使用静态拦截器,自定义拦截器,步骤长,麻烦,可以使用静态拦截器,于是conf修改如下:

tier1.sources = source1
tier1.channels = c1
tier1.sinks = k1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = c1
tier1.sources.source1.batchSize = 100
tier1.sources.source1.kafka.bootstrap.servers = bigdata01:9092
tier1.sources.source1.kafka.topics = topicA
tier1.sources.source1.kafka.consumer.group.id = yuekao3tier1.sources.source1.kafka.consumer.auto.offset.reset=latesttier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = static
tier1.sources.source1.interceptors.i1.key = topic
tier1.sources.source1.interceptors.i1.preserveExisting = false
tier1.sources.source1.interceptors.i1.value = topicBtier1.channels.c1.type = memory
tier1.channels.c1.capacity = 100000
tier1.channels.c1.transactionCapacity = 10000
tier1.channels.c1.byteCapacityBufferPercentage = 20
tier1.channels.c1.byteCapacity = 800000tier1.sinks.k1.channel = c1
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.kafka.topic = topicB
tier1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092

相当于添加了如下方法:

tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = static
tier1.sources.source1.interceptors.i1.key = topic
tier1.sources.source1.interceptors.i1.preserveExisting = false
tier1.sources.source1.interceptors.i1.value = topicB

问题解决!!!!

http://www.dtcms.com/a/613488.html

相关文章:

  • 基于最小权限原则的云计算Amazon VPC多层应用安全架构设计
  • 11.2 FastGPT部署指南:Docker一键部署企业级RAG框架
  • 网站建设结课总结贵阳网络推广优化
  • 网络安全态势报告,网络安全风险评估报告文档
  • R包fastEnrich预开发一 -- 快速GO富集分析、自动化报告、优化气泡图
  • 企业做网站设置哪些模块网站存储空间
  • Java 集合面试核心:ArrayList/LinkedList 底层数据结构,HashMap扩容机制详解
  • 突破AI视频一致性瓶颈:“无废话”四步电影级工作流
  • Python 编程实战 · 实用工具与库 — Django 项目结构简介
  • Dify-SSE流式及速率限制实现
  • 【ros2】ROS2中添加资源文件(图片、声音、视频等)的完整指南
  • 数据分析笔记08:Python编程基础-数据类型与变量
  • 北仑网站网页建设个人网站实现与设计论文
  • 网站在线留言怎么做做网站大优惠
  • 温州做网站哪家比较好镇江公司网站建设
  • QT/C++使用QMessageBox实现一个简单的登陆窗口
  • 前端CI/CD 流程
  • Visual Basic 挑选颜色
  • Java性能调优工具篇:JMH基准测试与Profiler(JProfiler/Async-Profiler)使用指南
  • ASC学习笔记0020:用于定义角色或Actor的默认属性值
  • 第十篇 扫雷游戏 下(末版·精简)
  • 开发者获取Claude API Key 申请指南:从注册到 Python 调用的实战教程
  • pyinstaller 介绍
  • 建设网站与服务器专业网页设计哪家好
  • 【大语言模型 125】开放域对话实战:自然流畅的闲聊系统完全指南
  • FastAPI基础项目:实现用户管理系统,实现基本的搜索和增删改查功能
  • 小众做的好的网站手机下载工具app
  • Qt for HarmonyOS 3D图片轮播组件开源鸿蒙开发实战
  • Evolution_07_环境
  • MinIO 不再“开放”,RustFS 能否成为更优选择?