当前位置: 首页 > news >正文

Kafka集成Flume

目录

  • Flume生产者
  • Flume消费者

Flume生产者

启动kafka集群

在这里插入图片描述

启动kafka消费者

在这里插入图片描述

修改flume的conf目录下的log4j.properties配置文件,配置日志文件路径
在这里插入图片描述

编写配置文件

vim file_to_kafka.conf

在这里插入图片描述

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume
flume/bin/flume-ng agent -c flume/conf/ -n a1 -f file_to_kafka.conf

在监控目录下创建app.log文件

使用echo命令往该文件添加内容
在这里插入图片描述

可以看到kafka消费到了数据

在这里插入图片描述

Flume消费者

编辑配置文件

在这里插入图片描述

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = master:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = logger# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

flume/bin/flume-ng agent -c flume/conf/ -n a1 -f kafka_to_file.conf -Dflume.root.logger=INFO,console

使用kafka生产者发送数据:

在这里插入图片描述

使用tail -f监控日志文件

在这里插入图片描述

http://www.dtcms.com/a/342096.html

相关文章:

  • 人工智能 -- 循环神经网络day1 -- 自然语言基础、NLP基础概率、NLP基本流程、NLP特征工程、NLP特征输入
  • 算法 之 拓 扑 排 序
  • LeetCode 回文链表
  • 桥梁设计模式
  • RabbitMQ事务消息原理是什么
  • RabbitMQ:延时消息(死信交换机、延迟消息插件)
  • 领域专用AI模型训练指南:医疗、法律、金融三大垂直领域微调效果对比
  • 28、工业网络资产漏洞扫描与风险评估 (模拟) - /安全与维护组件/industrial-network-scanner
  • 深度解析Atlassian 团队协作套件(Jira、Confluence、Loom、Rovo)如何赋能全球分布式团队协作
  • Whisk for Mac 网页编辑器 PHP开发
  • 牛客:链表的回文结构详解
  • NewsNow搭建喂饭级教程
  • SQL中对视图的操作命令汇总
  • STM32H750 CoreMark跑分测试
  • [最新]Dify v1.7.2版本更新:工作流可视化和节点搜索
  • 2025 年 8 月《GPT-5 家族 SQL 能力评测报告》发布
  • SQL视图、存储过程和触发器
  • OBCP第四章 OceanBase SQL 调优学习笔记:通俗解读与实践指南
  • CentOS 7安装FFmpeg
  • QT QProcess, WinExec, ShellExecute中文路径带空格程序或者脚本执行并带参数
  • Qt实现TabWidget通过addTab函数添加的页,页内控件自适应窗口大小
  • Qt文件压缩工具项目开发教程
  • 【Bug】CentOS 7 使用vim命令报错vim: command not found
  • 开源 C++ QT Widget 开发(四)文件--二进制文件查看编辑
  • Elasticsearch官方文档学习-未完待续
  • java项目:如何优化JVM参数?
  • 【深入理解 Linux 网络】收包原理与内核实现(下) 从 TCP 传输层到应用
  • 遥感机器学习入门实战教程|Sklearn案例⑤:集成学习方法全览
  • ES_flattened
  • Nacos部署微服务