当前位置: 首页 > news >正文

【Flume】Flume Learning

目录

  • 1 架构
    • Event
    • Agent
    • Source
    • Channel
    • Sink
  • 2 可靠性
  • 3 案例说明
  • 4 数据持久化
  • 5 日志文件监控
  • 6 多日志文件监控
  • 7 多个agent组合使用
  • 8 拦截器
    • Host Interceptor
    • Timestamp Interceptor
    • Static Interceptor
    • UUID Interceptor
    • Search and Replace Interceptor
    • 自定义拦截器
  • 9 Channel 选择器
    • Replicating Channel Selector
    • Multiplexing Channel Selector
  • 10 Sink处理器
    • Default Sink Processor
    • Failover Sink Processor
    • Load balancing Sink Processor
  • 11 导出数据到HDFS
  • 12 使用多个agent导出数据到hdfs
  • 13 自定义Source
  • 14 自定义Sink
  • 15 Ganglia安装
    • 中心节点的安装
    • 中心节点的配置
  • 16 使用Ganglia监控Flume

Flume是一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据、同时提供了对数据进行简单处理并写到各种数据接收方的能力

1 架构

在这里插入图片描述

Event

事件是Flume内部数据传输的最基本单元,将传输的数据进行封装,事件本身是由一个载有数据的字节数组和可选的headers头部信息构成,如下图所示,Flume以事件的形式将数据从源头传输到最终的目的地

在这里插入图片描述

Agent

Flume Agent是一个JVM进程,通过三个组件(source、channel、sink)将事件从一个外部数据源收集并发送给下一个目的地

Source

从数据发生器接收数据,并将数据以Flume的Event格式传递给一个或多个通道(Channel)
支持Source:

Avro Source
Exec Source
Kafka Source
NetCat TCP Source

Channel

一种短暂的存储容器,位于source和sink之间,起着桥梁的作用,channel将从source处接收到的event格式的数据缓存起来,当sink成功地将events发送到下一跳的channel或最终目的地后,events从channel移除

Channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,可以把channel看成一个FIFO(先进先出)队列,当数据的获取速率超过流出速率时,将Event保存到队列中,再从队列中一个个出来

有以下几种Channel

  • Memory Channel 事件存储在可配置容量的内存队列中,队列容量即为可存储最大事件数量,适用于高吞吐量场景,在agent出现错误时有可能会丢失部分数据
  • File Channel 基于文件系统的持久化存储

Sink

获取Channel暂时保存的数据并进行处理,sinkchannel中移除事件,并将其发送到下一个agent(简称下一跳)或者
事件的最终目的地,比如HDFS

Sink分类

  • HDFS Sink
  • Hive Sink
  • Logger Sink
  • Avro Sink
  • Thrift Sink
  • Kafka Sink

外部数据源将Flume可识别的Event发送到Source
Source收到Event事件后存储到一个或多个Channel通道中
Channel保留Event直到Sink将其处理完毕
SInk从Channel中取出数据,并将其传输至外部存储(HDFS)

2 可靠性

事件在每个agent的channel中短暂存储,然后事件被发送到下一个agent或者最终目的地,事件只有在存储在下一个channel或者最终存储后才从当前的channel中删除

Flume使用事务的方法来保证Events的可靠传递,Source和Sink分别被封装在事务中,事务由保存Event的存储或者Channel提供

3 案例说明

使用Flume监听某个端口,使用Netcat向这个端口发送数据,Flume将接收到的数据打印到控制台
Netcat是一款TCP/UDP测试工具,可以通过以下命令安装

yum install -y nc

开启服务端

nc -lk localhost 6666

默认就是localhost
在这里插入图片描述

开启客户端

nc localhost 6666

在这里插入图片描述

客户端发送数据

在这里插入图片描述

服务端就会接收到数据

在这里插入图片描述

这里使用NetCat TCP Source
Memory Channel

编写example.conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

bin/flume-ng agent -n a1 -f conf/example.conf -Dflume.root.logger=INFO,console

在这里插入图片描述

客户端发送数据

在这里插入图片描述

监听到数据

在这里插入图片描述

4 数据持久化

使用组件

File Channel

属性设置

在这里插入图片描述

创建目录

在这里插入图片描述

编辑 file-channel.conf,先只消费c1

a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = file
a1.channels.c2.checkpoint = /usr/local/soft/flume/checkpoint
a1.channels.c2.dataDirs = /usr/local/soft/flume/dataa1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

启动:

bin/flume-ng agent -n a1 -f file-channel.conf -Dflume.root.logger=INFO,console

可以把-Dflume.root.logger=INFO,console添加到环境变量中

修改flume-env.sh,添加如下代码

export JAVA_OPTS="-Dflume.root.logger=INFO,console"

在这里插入图片描述

这样启动的时候就不需要添加-Dflume.root.logger=INFO,console了,默认就有了

客户端发送数据

在这里插入图片描述

在这里插入图片描述

修改配置文件,消费c2

在这里插入图片描述

再次启动flume

在这里插入图片描述

可以直接消费到数据

5 日志文件监控

编辑exec-log.conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = exec
a1.sources.r1.command = tail -f app.loga1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6 多日志文件监控

Flume在1.7版本之后提供了Taildir Source

编辑dir-log.conf

# 定义这个agent各个组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2# 这里要写绝对路径
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.json# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

7 多个agent组合使用

可以将多个Flume agent程序连接在一起,其中一个agent的sink将数据发送到另一个agent的source,avro文件格式是使用flume通过网络发送数据的标准方法

从多个web服务器收集日志,发送

编辑配置文件

编写avro.conf

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

编写dir-log.conf

# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 8888# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

先启动dir-log.conf,再启动avro.conf

8 拦截器

拦截器可以修改或者丢弃事件,Flume支持链式调用拦截器,拦截器定义在source中

Host Interceptor

这个拦截器将运行agent的hostname或者ip地址写入到事件的headers

属性名默认值说明
type-host
preserveExistingfalse如果header已经存在host,是否要保留 - true保留原始的,false写入当前机器
userIPtruetrue为IP地址,false为hostname
hostHeaderhostheader中key的名称

编写代码vim dir-log.conf

# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 7777# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

编写vim avro.conf

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = hosta1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

先启动dir-log.conf,再启动avro.conf

客户端发送数据

在这里插入图片描述

可以看到header中有主机信息
在这里插入图片描述

在这里插入图片描述

Timestamp Interceptor

vim avro.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestampa1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
vim dir-log.conf
# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 7777# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述
此时头信息里面就会包含时间戳

Static Interceptor

运行用户对所有的事件添加固定的header

在这里插入图片描述

在这里插入图片描述

UUID Interceptor

在这里插入图片描述

在这里插入图片描述

Search and Replace Interceptor

查找和替换

可用于脱敏(脱离敏感)

在这里插入图片描述

在这里插入图片描述

自定义拦截器

新建工程,添加pom引用

        <dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>

编写代码

package interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class MyInterceptor implements Interceptor {private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);private String name;// 初始化操作public void initialize() {this.name = "";}public Event intercept(Event event) {Map<String, String> headers = event.getHeaders();if("192.168.206.128".equals(headers.get("host"))){headers.put("key","自定义拦截器");}return event;}public List<Event> intercept(List<Event> events) {List<Event> eventList = new ArrayList<Event>();for (Event event : events) {Event intercept = intercept(event);if(event != null){eventList.add(event);}}return eventList;}public void close() {}public static class Builder implements Interceptor.Builder {public Interceptor build() {return new MyInterceptor();}public void configure(Context context) {}}
}

打包,将生成的jar包上传到flume的lib目录下

编辑配置文件

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

9 Channel 选择器

Replicating Channel Selector

复制选择器,如果没有指定,这个为默认选择器

可选属性如下:

属性名默认值说明
selector.typereplicatingreplicating
selector.optional-optional

在这里插入图片描述
上面的配置中,c3是一个可选的channel,写入c3失败的话会被忽略,c1和c2没有标记为可选,如果写入c1和c2失败会导致事务的失败

Multiplexing Channel Selector

多路channel选择器,可选属性如下

属性名默认值说明
selector.typereplicatingmultiplexing
selector.headerflume.selector.header键值key
selector.default-
selector.mapping.*-路由

编辑agent1.conf

# 定义 Agent 名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4# 配置 SourceNetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4# 配置 Sink 1Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# c2 通道:同 c1 配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# c3 通道:同 c1 配置
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100# c4 通道:同 c1 配置
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100# 组件绑定关系:SourceChannelSinkChannel 关联
# 源 r1 同时向 c1、c2、c3、c4 四个通道写入数据
a1.sources.r1.channels = c1 c2 c3 c4# 各个 Sink 分别绑定到对应的 Channel
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4

编辑agent2.conf

# 定义 Agent 名称及组件
a2.sources = r1
a2.sinks = k1
a2.channels = c1# 配置 SourceAvro 类型,接收来自其他 Agent 的数据)
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.206.128
a2.sources.r1.port = 4040# 配置 SinkLogger 类型,用于控制台输出数据)
a2.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# 绑定组件关系(SourceChannelSink)
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

agent3.conf

# 定义 Agent 名称及组件
a3.sources = r1
a3.sinks = k1
a3.channels = c1# 配置 SourceAvro 类型,接收来自其他 Agent 的数据)
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.206.128
a3.sources.r1.port = 4041# 配置 SinkLogger 类型,用于控制台输出数据)
a3.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# 绑定组件关系(SourceChannelSink)
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

agent4.conf

# 定义 Agent 名称及组件
a4.sources = r1
a4.sinks = k1
a4.channels = c1# 配置 SourceAvro 类型,接收来自其他 Agent 的数据)
a4.sources.r1.type = avro
a4.sources.r1.bind = 192.168.206.128
a4.sources.r1.port = 4042# 配置 SinkLogger 类型,用于控制台输出数据)
a4.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100# 绑定组件关系(SourceChannelSink)
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

此时,默认只有c1c4可以收到

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

agent1.conf中添加如下拦截器

在这里插入图片描述

再次执行

此时只有c1和c3可以接收到

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

通道选择器流程分析

在这里插入图片描述

10 Sink处理器

可以将多个sink放入到一个组中,sink处理器能够对一个组中所有的sink进行负载均衡,在一个sink出现临时错误时进行故障转移

必须设置属性

属性名默认值说明
sinks-组中多个sink使用空格分隔
processor.typedefaultdefault,failover或load_balance
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover

Default Sink Processor

默认的Sink处理器只支持单个Sink

Failover Sink Processor

故障转移处理器维护了一个带有优先级的sink列表,故障转移机制将失败的sink放入到一个冷却池中,如果sink成功发送了事件,将其放入到活跃池中,sink可以设置优先级,数字越高,优先级越高,如果一个sink发送事件失败,下一个有更高优先级的sink将被用来发送事件,比如,优先级100的比优先级80的先被使用,如果没有设置优先级,按配置文件中配置的顺序决定

在这里插入图片描述

修改agent1.conf

# 定义 Agent 名称
a1.sources = r1
a1.sinks =  k2 k3 k4 k1
a1.channels = c1# 配置 SourceNetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks =  k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000# 配置 Sink 1Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1

在这里插入图片描述
由于agent4优先级最高,agent4消费到数据

在这里插入图片描述

关闭agent4后再发送消息

在这里插入图片描述

此时agent3消费
在这里插入图片描述

Load balancing Sink Processor

负载均衡处理器,可以通过轮询或者随机的方式进行负载均衡,也可以通过继承AbstractSinkSelector自定义负载均衡,设置属性如下:

在这里插入图片描述

轮询,编辑agent1.conf

# 定义 Agent 名称
a1.sources = r1
a1.sinks =  k2 k3 k4 k1
a1.channels = c1# 配置 SourceNetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks =  k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin# 配置 Sink 1Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1

随机

# 定义 Agent 名称
a1.sources = r1
a1.sinks =  k2 k3 k4 k1
a1.channels = c1# 配置 SourceNetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks =  k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random# 配置 Sink 1Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1

11 导出数据到HDFS

数据导出到HDFS需要使用HDFS Sink,需要配置属性如下

在这里插入图片描述

注:使用HDFS Sink需要用到Hadoop的多个包,可以在装有Hadoop的主机上运行Flume,如果是单独部署的Flume,可以通过多个Agent的形式将单独部署的Flume Agent日志数据发送到装有Hadoop的Flume Agent上

创建hdfs.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flumesink
a1.sinks.k1.hdfs.fileType = DataStreama1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

12 使用多个agent导出数据到hdfs

编写agent1.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9999a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 4040a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

编写agent2.conf

a2.sources = r1
a2.sinks = k1
a2.channels = c1a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 4040a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://192.168.206.128:9000/avroFlume
a2.sinks.k1.hdfs.fileType = DataStreama2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

13 自定义Source

14 自定义Sink

15 Ganglia安装

Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond(监控守护进程)、gmetad(元数据守护进程)以及一个Web前端。主要是用来监控系统性能,如:cpu、mem、硬盘利用率,I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。

中心节点的安装

  • epel包的安装:yum install -y epel-release
    (解决不能yum安装某些安装包的问题)
  • gmetad的安装:yum install -y ganglia-gmetad
  • gmond的安装:yum install -y ganglia-gmond
  • rrdtool的安装:yum install -y rrdtool
  • httpd服务器的安装:yum install -y httpd
  • ganglia-web及php安装:yum install -y ganglia-web php

中心节点的配置

安装目录说明

  • ganglia配置文件目录:/etc/ganglia
  • rrd数据库存放目录:/var/lib/ganglia/rrds
  • ganglia-web安装目录:/usr/share/ganglia
  • ganglia-web配置目录:/etc/httpd/conf.d/ganglia.conf

相关配置文件修改

将ganglia-web的站点目录连接到httpd主站点目录

$ ln -s /usr/share/ganglia /var/www/html

修改httpd主站点目录下ganglia站点目录的访问权限
将ganglia站点目录访问权限改为apache:apache,否则会报错

$ chown -R apache:apache /var/www/html/ganglia
$ chmod -R 755 /var/www/html/ganglia

修改rrd数据库存放目录访问权限
将rrd数据库存放目录访问权限改为nobody:nobody,否则会报错

$ chown -R nobody:nobody /var/lib/ganglia/rrds

修改ganglia-web的访问权限:
修改/etc/httpd/conf.d/ganglia.conf

local改为all granted

在这里插入图片描述

修改 dwoo 下面的权限

chmod 777 /var/lib/ganglia/dwoo/compiled
chmod 777 /var/lib/ganglia/dwoo/cache

tips:
如果没有dwoo目录
可以手动创建

mkdir -p /var/lib/ganglia/dwoo/{compiled,cache}

chown -R apache:apache /var/lib/ganglia/dwoo
chmod -R 755 /var/lib/ganglia/dwoo

配置/etc/ganglia/gmetad.conf

在这里插入图片描述

在这里插入图片描述

配置/etc/ganglia/gmond.conf

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

中心节点启动

systemctl start httpd.service
systemctl start gmetad.service
systemctl start gmond.service
systemctl enable httpd.service
systemctl enable gmetad.service
systemctl enable gmond.service

关闭selinux
vi /etc/selinux/config,把SELINUX=enforcing改成SELINUX=disable;该方法需要重启机器。
可以使用命令setenforce 0来关闭selinux而不需要重启,刷新页面,即可访问;不过此法只是权宜之计,如果想永久修改selinux设置,还是要使用第一种方法

开启防火墙(如果没有防火墙就不需要这一步了)

firewall-cmd --zone=public --add-port=80/tcp --permanent
firewall-cmd --reload

web端查看

192.168.206.128/ganglia

在这里插入图片描述

如果查看ganglia web页面报错:There was an error collecting ganglia data (127.0.0.1:8652): fsockopen error: Permission denied

临时关闭 SELinux 验证(是上面修改/etc/selinux/config后未重启导致)

setenforce 0

16 使用Ganglia监控Flume

../bin/flume-ng agent -n a1 -f example.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.206.128:8649

在这里插入图片描述

可将参数写入配置,运行时就可以不用添加

在这里插入图片描述

http://www.dtcms.com/a/503682.html

相关文章:

  • 和网站设计人员谈价要注意什么那些网站是asp做的
  • 网站视频提取软件app天河做网站设计
  • GPU服务器存储选型指南:SFF与LFF硬盘深度解析与实战策略
  • 学校网站建设介绍吉林网络推广代运营
  • 集合性能基准测试报告:ArrayList vs LinkedList、HashMap vs TreeMap、并发 Map 四兄弟
  • 黄石做网站公司二手商品网站制作
  • c2c网站功能关于网站建设的英文歌
  • k8s(十)Helm详解
  • 建设部执业资格注册中心网站查询免签约收款WordPress
  • 百度推广用户注册单页网站如何优化
  • 数据库--视图、索引
  • 硅基计划5.0 MySQL 叁 E-R关系图联合/多表查询三大连接子查询合并查询
  • 网站设计连接数据库怎么做如何做好网站推广营销
  • Langgraph译文2:多智能体系统
  • 太原云起时网站建设工作室logo设计免费生成
  • 基于Trae/Whisper/FFmpeg与Knowledge Graph MCP技术开发语音生成会议纪要智能应用
  • [嵌入式系统-144]:“智能体机器人”操作系统
  • iis网站开发教程河西区做网站的公司
  • FFmpeg 基本API av_find_input_format函数内部调用流程分析
  • 制作网页的网站2023电商排行榜前十名
  • 【课堂笔记】复变函数-5
  • nas可以做视频网站吗深圳律师网站建设
  • 抑制高电压浪涌芯片LT4356
  • LLMs之RAG之Benchmark:面向真实场景的检索嵌入基准(RTEB)—理论、设计与实践指南
  • 从网址怎么看网站的域名有模板怎么做网站
  • 做建网站的公司wordpress点播主题
  • ES6 箭头函数
  • [FIH][GMS] 2025-04 Google announcement Part1
  • 建站论坛北京有什么网上推广的网站吗
  • mbedtls(not finished)