当前位置: 首页 > news >正文

构建日志采集和分析平台

Flume环境搭建

操作步骤:

  1. 下载安装包

    • 访问地址 http://archive.apache.org/dist/flume/
    • 找到稳定版本(如 1.8.0)的目录,下载 apache-flume-1.8.0-bin.tar.gz 文件。
  2. 上传安装包

    • 使用 scp、FTP 工具或任何其他方式,将下载好的 apache-flume-1.8.0-bin.tar.gz 文件上传到服务器 hadoop01/home/hadoop/app 目录下。
  3. 登录服务器并解压

    • 使用 SSH 登录到 hadoop01 节点。
    • 执行以下命令:
# 1. 切换到目标目录
[hadoop@hadoop01 ~]$ cd /home/hadoop/app# 2. 查看目录内容,确认安装包已上传
[hadoop@hadoop01 app]$ ls
apache-flume-1.8.0-bin.tar.gz# 3. 解压安装包
[hadoop@hadoop01 app]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz# 4. (可选) 解压后删除压缩包以节省空间
[hadoop@hadoop01 app]$ rm -rf apache-flume-1.8.0-bin.tar.gz# 5. 创建一个软链接,方便以后使用和配置,避免路径因版本号而改变
[hadoop@hadoop01 app]$ ln -s apache-flume-1.8.0-bin flume# 6. 最终查看目录,确认解压和软链接创建成功
[hadoop@hadoop01 app]$ ls
apache-flume-1.8.0-bin flume

在这里插入图片描述

2. 修改 Flume 配置文件

操作步骤:

  1. 进入配置目录并准备配置文件
[hadoop@hadoop01 app]$ cd flume/conf
[hadoop@hadoop01 conf]$ ls
flume-conf.properties.template flume-env.sh.template ...
[hadoop@hadoop01 conf]$ mv flume-conf.properties.template flume-conf.properties
  1. 欣赏配置文件
[hadoop@hadoop01 conf]$ vim flume-conf.properties
  • 内容如下
# 定义Agent中各组件的名称
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink# 配置Source(数据源):这里使用一个序列生成器,它只会不断产生自增的数字事件,用于测试
agent.sources.seqGenSrc.type = seq
agent.sources.seqGenSrc.channels = memoryChannel# 配置Sink(输出目的地):这里使用logger,将事件内容记录到日志文件(或控制台)
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel# 配置Channel(缓冲通道):这里使用内存通道,性能好但断电会丢失数据
agent.channels.memoryChannel.type = memory
# 设置内存通道的容量(可存储的事件数量)
agent.channels.memoryChannel.capacity = 100

重要提示

  • 这个配置定义了一个最简单的 Flume Agent,它包含:
    • 一个Sourceseq (序列生成器),用于测试,它不停地产生数字。
    • 一个Channelmemory (内存通道),临时存储事件。
    • 一个Sinklogger (日志记录),将事件内容输出到日志。

3. 启动运行 Flume Agent

操作步骤:

  1. 返回到 Flume 的安装目录
cd /home/hadoop/app/flume
  1. 执行启动命令
bin/flume-ng agent \
-n agent \
-c conf \
-f conf/flume-conf.properties \
-Dflume.root.logger=INFO,console

命令行参数解释(严格按照您的说明):

  • flume-ng:Flume 的执行脚本。
  • agent:表示要启动一个 Flume Agent 进程。
  • -n agent-n 指定了 Agent 的名称。这个名称必须与配置文件中顶部的组件名称前缀(即 agent.sources 中的 agent)保持一致。
  • -c conf-c 指定了配置文件所在的目录(conf 是相对当前路径的目录)。
  • -f conf/flume-conf.properties-f 指定了具体的配置文件路径。
  • -Dflume.root.logger=INFO,console:这是一个 Java 系统属性,它覆盖了日志配置文件中的设置,强制将日志输出到控制台(console),并且日志级别为 INFO。这非常便于我们实时查看测试结果。

预期结果:
执行命令后,Flume Agent 会启动并开始工作。您将在控制台上看到大量日志输出,其中会夹杂着类似以下的内容,这证明 Flume 正在正常运行:

... INFO ... Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 Hello World! }
... INFO ... Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 21 Hello Flume! }

在这里插入图片描述

这些就是 loggerSink 打印出的事件内容。body 部分同时显示了字节序列(十六进制)和其对应的字符串格式。

停止 Flume:
要停止 Flume Agent,只需在控制台按下 Ctrl + C 组合键即可。

构建Flume集群

环境准备

假设有三台服务器:

  • hadoop01: Web服务器/采集节点
  • hadoop02: 聚合节点1
  • hadoop03: 聚合节点2

分发:

# 分发 Flume 安装包到 hadoop02 的 /home/hadoop/app 目录
scp -r /home/hadoop/app/apache-flume-1.8.0-bin hadoop02:/home/hadoop/app/# 分发 Flume 软链接到 hadoop02(避免后续路径不一致)
scp -r /home/hadoop/app/flume hadoop02:/home/hadoop/app/# 同理,分发到 hadoop03
scp -r /home/hadoop/app/apache-flume-1.8.0-bin hadoop03:/home/hadoop/app/
scp -r /home/hadoop/app/flume hadoop03:/home/hadoop/app/

1. 在 hadoop01(采集节点)配置

1.1 创建配置文件
cd /home/hadoop/app/flume/conf/vim taildir-file-selector-avro.properties
1.2 配置内容

将以下配置复制到文件中:

# 定义Source、Channel、Sink的名称
agent1.sources = taildirSource
agent1.channels = fileChannel
agent1.sinkgroups = g1
agent1.sinks = k1 k2# 配置TAILDIR Source
agent1.sources.taildirSource.type = TAILDIR
agent1.sources.taildirSource.positionFile = /home/hadoop/data/flume/taildir_position.json
agent1.sources.taildirSource.filegroups = f1
agent1.sources.taildirSource.filegroups.f1 = /home/hadoop/data/flume/logs/sogou.log
agent1.sources.taildirSource.channels = fileChannel# 配置File Channel
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.fileChannel.dataDirs = /home/hadoop/data/flume/dataDirs# 配置Sink组
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000# 配置第一个Sink(发送到hadoop02)
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = fileChannel
agent1.sinks.k1.batchSize = 1
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 1234# 配置第二个Sink(发送到hadoop03)
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = fileChannel
agent1.sinks.k2.batchSize = 1
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 1234

在这里插入图片描述

1.3 创建必要的目录
# 创建日志文件目录
mkdir -p /home/hadoop/data/flume/logs# 创建Channel数据目录
mkdir -p /home/hadoop/data/flume/checkpointDir
mkdir -p /home/hadoop/data/flume/dataDirs# 创建初始日志文件
touch /home/hadoop/data/flume/logs/sogou.log

在这里插入图片描述

2. 在 hadoop02 和 hadoop03(聚合节点)配置

2.1 创建配置文件

在两台节点上执行相同操作:

# 进入 Flume 配置目录
[hadoop@hadoop02 ~]$ cd /home/hadoop/app/flume/conf/
[hadoop@hadoop02 conf]$ vim avro-file-selector-logger.properties
2.2 配置内容

将以下配置复制到文件中:

# 定义Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1# 配置avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234# 配置File Channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs# 配置logger Sink
agent1.sinks.k1.type = logger
agent1.sinks.k1.channel = c1

在这里插入图片描述

2.3 创建必要的目录

在两台节点上执行:

# 创建Channel数据目录
[hadoop@hadoop02 ~]$ mkdir -p /home/hadoop/data/flume/checkpointDir
[hadoop@hadoop02 ~]$ mkdir -p /home/hadoop/data/flume/dataDirs

3. 启动 Flume 集群服务

3.1 先启动聚合服务(hadoop02 和 hadoop03)

在 hadoop02 节点:

cd /home/hadoop/app/flume
bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-logger.properties -Dflume.root.logger=INFO,console

在这里插入图片描述

在 hadoop03 节点:

cd /home/hadoop/app/flume
bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-logger.properties -Dflume.root.logger=INFO,console

在这里插入图片描述

3.2 再启动采集服务(hadoop01)

在 hadoop01 节点:

cd /home/hadoop/app/flume
bin/flume-ng agent -n agent1 -c conf -f conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

4. 测试 Flume 集群

4.1 生成测试数据
# 在 hadoop01 节点添加测试数据
cd /home/hadoop/data/flume/logs
echo '00:00:10 0971413028304674 [火炬传递路线时间] 1 2 www.olympic.cn/news/beijing/2008-03-19/1417291.html' >> sogou.log
echo '00:00:11 19400215479348182 [天津工业大学\] 1 65 www.tjpu.edu.cn/' >> sogou.log
echo '00:00:12 30567412345678901 [测试数据] 1 3 www.example.com/' >> sogou.log
4.2 验证结果

在 hadoop02 和 hadoop03 的控制台,您应该能看到类似这样的输出:

Event: { headers:{} body: 00:00:10 0971413028304674 [火炬传递路线时间] 1 2 www.olym... }
Event: { headers:{} body: 00:00:11 19400215479348182 [天津工业大学\] 1 65 www.tjpu... }

在这里插入图片描述
在这里插入图片描述

5. 关键点说明

  1. 启动顺序:必须先启动聚合节点(hadoop02、hadoop03),再启动采集节点(hadoop01)

  2. 负载均衡:配置中的 load_balanceround_robin 会让数据轮询发送到两个聚合节点

  3. 目录权限:确保所有创建的目录对 hadoop 用户有读写权限

  4. 网络连通:确保三台服务器之间的网络连通,特别是 1234 端口

  5. 故障处理:如果某个聚合节点宕机,采集节点会自动将数据发送到另一个正常节点

6. 停止服务

在任何节点的控制台按 Ctrl + C 即可停止该节点的 Flume 服务。

停止顺序建议:先停止采集节点,再停止聚合节点。

Flume与Kafka集成测试

1. 配置 Flume 聚合服务(Kafka Sink)

在 hadoop02 和 hadoop03 节点上操作:

# 进入 Flume 配

文章转载自:

http://nypR6c5H.jjzrh.cn
http://3R6ylcBt.jjzrh.cn
http://X3SBLgB6.jjzrh.cn
http://qvjYNQcR.jjzrh.cn
http://2KvMJ4Kg.jjzrh.cn
http://fMgWaaOy.jjzrh.cn
http://DkllCP7F.jjzrh.cn
http://i1M11r6S.jjzrh.cn
http://Lf6CFgq9.jjzrh.cn
http://uQr4dIyD.jjzrh.cn
http://Gxpz9387.jjzrh.cn
http://5iEMcfKW.jjzrh.cn
http://SXO9oFNa.jjzrh.cn
http://sT590mzK.jjzrh.cn
http://cG50G400.jjzrh.cn
http://egmvMbNq.jjzrh.cn
http://lIVcun1y.jjzrh.cn
http://yXU1dpBA.jjzrh.cn
http://lyyBAUf1.jjzrh.cn
http://DJOFELp6.jjzrh.cn
http://rtk1k8Od.jjzrh.cn
http://OJ9WjS0Z.jjzrh.cn
http://goZDZJiE.jjzrh.cn
http://GsRLz5EZ.jjzrh.cn
http://6WIsfKbE.jjzrh.cn
http://TriCsU9l.jjzrh.cn
http://DUpm1zr6.jjzrh.cn
http://ujYW9pJl.jjzrh.cn
http://4EvujyQd.jjzrh.cn
http://pgd2bnWX.jjzrh.cn
http://www.dtcms.com/a/383659.html

相关文章:

  • 《Unity+腾讯云TRTC故障排查指南:从日志盲区到线程死锁的全链路解析》
  • 笔记25.9.14(QueryWrapper,Builder ,Stream流处理,forEach)
  • 深入理解MySQL主从架构中的Seconds_Behind_Master指标
  • systemverilog如何解决不能使用变量索引来进行位选择的范围指定
  • 多语言编码Agent解决方案(1)-项目概述与架构
  • 【深度学习踩坑实录】从 Checkpoint 报错到 TrainingArguments 精通:QNLI 任务微调全流程复盘
  • 【愚公系列】《人工智能70年》019-语音识别的历史性突破(铲平技术高门槛)
  • webpack 配置文件中 mode 有哪些模式?
  • AI推理范式:从CoT到ReAct再到ToT的进化之路
  • webpack和Module Federation区别分析
  • Knockout.js Virtual Elements 详解
  • 【JavaSE五天速通|第三篇】常用API与日期类篇
  • JavaWeb-Session和ServletContext
  • HTML 编码规范
  • 深度学习(九):逻辑回归
  • 【LeetCode 每日一题】36. 有效的数独
  • 单表查询要点概述
  • 【Trans2025】计算机视觉|即插即用|WSC:即插即用!WSC模块,高光谱图像分类新SOTA!
  • Java面试小册(3)
  • 微服务项目测试接口一次成功一次失败解决办法
  • GPIO 之 EMIO 按键控制 LED 实验
  • centos安装 GNOME 桌面环境
  • 高并发投票功能设计
  • (B2B/工业/医疗行业)GEO优化服务商有哪些?哪家好?供应商推荐
  • unordered_map使用MFC的CString作为键值遇到C2056和C2064错误
  • MFC_Install_Create
  • 大数据知识框架思维导图(构造知识学习框架)
  • Spring Boot 集成第三方 API 时,常见的超时与重试机制设计
  • 设计模式——创建型模式
  • Nginx_Tomcat综合案例