【Flume】Flume Learning
目录
- 1 架构
- Event
- Agent
- Source
- Channel
- Sink
- 2 可靠性
- 3 案例说明
- 4 数据持久化
- 5 日志文件监控
- 6 多日志文件监控
- 7 多个agent组合使用
- 8 拦截器
- Host Interceptor
- Timestamp Interceptor
- Static Interceptor
- UUID Interceptor
- Search and Replace Interceptor
- 自定义拦截器
- 9 Channel 选择器
- Replicating Channel Selector
- Multiplexing Channel Selector
- 10 Sink处理器
- Default Sink Processor
- Failover Sink Processor
- Load balancing Sink Processor
- 11 导出数据到HDFS
- 12 使用多个agent导出数据到hdfs
- 13 自定义Source
- 14 自定义Sink
- 15 Ganglia安装
- 中心节点的安装
- 中心节点的配置
- 16 使用Ganglia监控Flume
Flume是一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据、同时提供了对数据进行简单处理并写到各种数据接收方的能力
1 架构
Event
事件是Flume内部数据传输的最基本单元,将传输的数据进行封装,事件本身是由一个载有数据的字节数组和可选的headers
头部信息构成,如下图所示,Flume
以事件的形式将数据从源头传输到最终的目的地
Agent
Flume Agent是一个JVM进程,通过三个组件(source、channel、sink)将事件从一个外部数据源收集并发送给下一个目的地
Source
从数据发生器接收数据,并将数据以Flume的Event格式传递给一个或多个通道(Channel)
支持Source:
Avro Source
Exec Source
Kafka Source
NetCat TCP Source
…
Channel
一种短暂的存储容器,位于source和sink之间,起着桥梁的作用,channel将从source处接收到的event格式的数据缓存起来,当sink成功地将events发送到下一跳的channel或最终目的地后,events从channel移除
Channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,可以把channel看成一个FIFO(先进先出)队列,当数据的获取速率超过流出速率时,将Event保存到队列中,再从队列中一个个出来
有以下几种Channel
- Memory Channel 事件存储在可配置容量的内存队列中,队列容量即为可存储最大事件数量,适用于高吞吐量场景,在agent出现错误时有可能会丢失部分数据
- File Channel 基于文件系统的持久化存储
…
Sink
获取Channel
暂时保存的数据并进行处理,sink
从channel
中移除事件,并将其发送到下一个agent
(简称下一跳)或者
事件的最终目的地,比如HDFS
Sink
分类
- HDFS Sink
- Hive Sink
- Logger Sink
- Avro Sink
- Thrift Sink
- Kafka Sink
…
外部数据源将Flume可识别的Event发送到Source
Source收到Event事件后存储到一个或多个Channel通道中
Channel保留Event直到Sink将其处理完毕
SInk从Channel中取出数据,并将其传输至外部存储(HDFS)
2 可靠性
事件在每个agent的channel中短暂存储,然后事件被发送到下一个agent或者最终目的地,事件只有在存储在下一个channel或者最终存储后才从当前的channel中删除
Flume使用事务的方法来保证Events的可靠传递,Source和Sink分别被封装在事务中,事务由保存Event的存储或者Channel提供
3 案例说明
使用Flume监听某个端口,使用Netcat向这个端口发送数据,Flume将接收到的数据打印到控制台
Netcat是一款TCP/UDP测试工具,可以通过以下命令安装
yum install -y nc
开启服务端
nc -lk localhost 6666
默认就是localhost
开启客户端
nc localhost 6666
客户端发送数据
服务端就会接收到数据
这里使用NetCat TCP Source
Memory Channel
编写example.conf
文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动
bin/flume-ng agent -n a1 -f conf/example.conf -Dflume.root.logger=INFO,console
客户端发送数据
监听到数据
4 数据持久化
使用组件
File Channel
属性设置
创建目录
编辑 file-channel.conf
,先只消费c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = file
a1.channels.c2.checkpoint = /usr/local/soft/flume/checkpoint
a1.channels.c2.dataDirs = /usr/local/soft/flume/dataa1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
启动:
bin/flume-ng agent -n a1 -f file-channel.conf -Dflume.root.logger=INFO,console
可以把-Dflume.root.logger=INFO,console
添加到环境变量中
修改flume-env.sh
,添加如下代码
export JAVA_OPTS="-Dflume.root.logger=INFO,console"
这样启动的时候就不需要添加-Dflume.root.logger=INFO,console
了,默认就有了
客户端发送数据
修改配置文件,消费c2
再次启动flume
可以直接消费到数据
5 日志文件监控
编辑exec-log.conf
文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = exec
a1.sources.r1.command = tail -f app.loga1.sinks.k1.type = loggera1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
6 多日志文件监控
Flume在1.7版本之后提供了Taildir Source
编辑dir-log.conf
# 定义这个agent各个组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2# 这里要写绝对路径
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.json# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
7 多个agent组合使用
可以将多个Flume agent程序连接在一起,其中一个agent的sink将数据发送到另一个agent的source,avro文件格式是使用flume通过网络发送数据的标准方法
从多个web服务器收集日志,发送
编辑配置文件
编写avro.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
编写dir-log.conf
# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 8888# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1
先启动dir-log.conf
,再启动avro.conf
8 拦截器
拦截器可以修改或者丢弃事件,Flume支持链式调用拦截器,拦截器定义在source中
Host Interceptor
这个拦截器将运行agent的hostname
或者ip
地址写入到事件的headers
中
属性名 | 默认值 | 说明 |
---|---|---|
type | - | host |
preserveExisting | false | 如果header已经存在host,是否要保留 - true保留原始的,false写入当前机器 |
userIP | true | true为IP地址,false为hostname |
hostHeader | host | header中key的名称 |
编写代码vim dir-log.conf
# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 7777# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1
编写vim avro.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = hosta1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
先启动dir-log.conf
,再启动avro.conf
客户端发送数据
可以看到header
中有主机信息
Timestamp Interceptor
vim avro.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestampa1.sinks.k1.type = loggera1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
vim dir-log.conf
# 定义这个agent各个组件的名字
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1# 描述和配置source组件:r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/soft/apache-flume-1.9.0-bin/conf/logs/.*log
a1.sources.r1.positionFile = /usr/local/soft/apache-flume-1.9.0-bin/conf/position.jsona1.sources.r2.type = avro
a1.sources.r2.bind = localhost
a1.sources.r2.port = 7777# 描述和配置sink组件:k1
a1.sinks.k1.type = logger# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1
此时头信息里面就会包含时间戳
Static Interceptor
运行用户对所有的事件添加固定的header
UUID Interceptor
Search and Replace Interceptor
查找和替换
可用于脱敏(脱离敏感)
自定义拦截器
新建工程,添加pom
引用
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>
编写代码
package interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class MyInterceptor implements Interceptor {private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);private String name;// 初始化操作public void initialize() {this.name = "";}public Event intercept(Event event) {Map<String, String> headers = event.getHeaders();if("192.168.206.128".equals(headers.get("host"))){headers.put("key","自定义拦截器");}return event;}public List<Event> intercept(List<Event> events) {List<Event> eventList = new ArrayList<Event>();for (Event event : events) {Event intercept = intercept(event);if(event != null){eventList.add(event);}}return eventList;}public void close() {}public static class Builder implements Interceptor.Builder {public Interceptor build() {return new MyInterceptor();}public void configure(Context context) {}}
}
打包,将生成的jar包上传到flume的lib目录下
编辑配置文件
9 Channel 选择器
Replicating Channel Selector
复制选择器,如果没有指定,这个为默认选择器
可选属性如下:
属性名 | 默认值 | 说明 |
---|---|---|
selector.type | replicating | replicating |
selector.optional | - | optional |
上面的配置中,c3是一个可选的channel,写入c3失败的话会被忽略,c1和c2没有标记为可选,如果写入c1和c2失败会导致事务的失败
Multiplexing Channel Selector
多路channel
选择器,可选属性如下
属性名 | 默认值 | 说明 |
---|---|---|
selector.type | replicating | multiplexing |
selector.header | flume.selector.header | 键值key |
selector.default | - | |
selector.mapping.* | - | 路由 |
编辑agent1.conf
# 定义 Agent 名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4# 配置 Source(NetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4# 配置 Sink 1(Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2(Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3(Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4(Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# c2 通道:同 c1 配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# c3 通道:同 c1 配置
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100# c4 通道:同 c1 配置
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100# 组件绑定关系:Source 与 Channel、Sink 与 Channel 关联
# 源 r1 同时向 c1、c2、c3、c4 四个通道写入数据
a1.sources.r1.channels = c1 c2 c3 c4# 各个 Sink 分别绑定到对应的 Channel
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4
编辑agent2.conf
# 定义 Agent 名称及组件
a2.sources = r1
a2.sinks = k1
a2.channels = c1# 配置 Source(Avro 类型,接收来自其他 Agent 的数据)
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.206.128
a2.sources.r1.port = 4040# 配置 Sink(Logger 类型,用于控制台输出数据)
a2.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# 绑定组件关系(Source → Channel → Sink)
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
agent3.conf
# 定义 Agent 名称及组件
a3.sources = r1
a3.sinks = k1
a3.channels = c1# 配置 Source(Avro 类型,接收来自其他 Agent 的数据)
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.206.128
a3.sources.r1.port = 4041# 配置 Sink(Logger 类型,用于控制台输出数据)
a3.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# 绑定组件关系(Source → Channel → Sink)
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
agent4.conf
# 定义 Agent 名称及组件
a4.sources = r1
a4.sinks = k1
a4.channels = c1# 配置 Source(Avro 类型,接收来自其他 Agent 的数据)
a4.sources.r1.type = avro
a4.sources.r1.bind = 192.168.206.128
a4.sources.r1.port = 4042# 配置 Sink(Logger 类型,用于控制台输出数据)
a4.sinks.k1.type = logger# 配置 Channel(内存型,作为数据临时缓冲区)
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100# 绑定组件关系(Source → Channel → Sink)
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
此时,默认只有c1
和c4
可以收到
在agent1.conf
中添加如下拦截器
再次执行
此时只有c1和c3可以接收到
通道选择器流程分析
10 Sink处理器
可以将多个sink放入到一个组中,sink处理器能够对一个组中所有的sink进行负载均衡,在一个sink出现临时错误时进行故障转移
必须设置属性
属性名 | 默认值 | 说明 |
---|---|---|
sinks | - | 组中多个sink使用空格分隔 |
processor.type | default | default,failover或load_balance |
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
Default Sink Processor
默认的Sink处理器只支持单个Sink
Failover Sink Processor
故障转移处理器维护了一个带有优先级的sink列表,故障转移机制将失败的sink放入到一个冷却池中,如果sink成功发送了事件,将其放入到活跃池中,sink可以设置优先级,数字越高,优先级越高,如果一个sink发送事件失败,下一个有更高优先级的sink将被用来发送事件,比如,优先级100的比优先级80的先被使用,如果没有设置优先级,按配置文件中配置的顺序决定
修改agent1.conf
# 定义 Agent 名称
a1.sources = r1
a1.sinks = k2 k3 k4 k1
a1.channels = c1# 配置 Source(NetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000# 配置 Sink 1(Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2(Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3(Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4(Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1
由于agent4
优先级最高,agent4消费到数据
关闭agent4
后再发送消息
此时agent3
消费
Load balancing Sink Processor
负载均衡处理器,可以通过轮询或者随机的方式进行负载均衡,也可以通过继承AbstractSinkSelector
自定义负载均衡,设置属性如下:
轮询,编辑agent1.conf
# 定义 Agent 名称
a1.sources = r1
a1.sinks = k2 k3 k4 k1
a1.channels = c1# 配置 Source(NetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin# 配置 Sink 1(Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2(Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3(Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4(Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1
随机
# 定义 Agent 名称
a1.sources = r1
a1.sinks = k2 k3 k4 k1
a1.channels = c1# 配置 Source(NetCat 类型,监听本地 44444 端口)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k2 k3 k4 k1
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random# 配置 Sink 1(Logger 类型,用于日志输出)
a1.sinks.k1.type = logger# 配置 Sink 2(Avro 类型,发送到指定主机和端口)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.206.128
a1.sinks.k2.port = 4040# 配置 Sink 3(Avro 类型,发送到指定主机和端口)
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.206.128
a1.sinks.k3.port = 4041# 配置 Sink 4(Avro 类型,发送到指定主机和端口)
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.206.128
a1.sinks.k4.port = 4042# 配置 Channel(内存型,统一设置队列容量和事务大小)
# c1 通道:内存类型,最大容量 1000 个事件,事务最大处理 100 个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
a1.sinks.k1.channel = c1
11 导出数据到HDFS
数据导出到HDFS需要使用HDFS Sink,需要配置属性如下
注:使用HDFS Sink需要用到Hadoop的多个包,可以在装有Hadoop的主机上运行Flume,如果是单独部署的Flume,可以通过多个Agent的形式将单独部署的Flume Agent日志数据发送到装有Hadoop的Flume Agent上
创建hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flumesink
a1.sinks.k1.hdfs.fileType = DataStreama1.channels.c1.type = memory
a.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
12 使用多个agent导出数据到hdfs
编写agent1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9999a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 4040a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
编写agent2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 4040a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://192.168.206.128:9000/avroFlume
a2.sinks.k1.hdfs.fileType = DataStreama2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
13 自定义Source
14 自定义Sink
15 Ganglia安装
Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond(监控守护进程)、gmetad(元数据守护进程)以及一个Web前端。主要是用来监控系统性能,如:cpu、mem、硬盘利用率,I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。
中心节点的安装
- epel包的安装:yum install -y epel-release
(解决不能yum安装某些安装包的问题) - gmetad的安装:yum install -y ganglia-gmetad
- gmond的安装:yum install -y ganglia-gmond
- rrdtool的安装:yum install -y rrdtool
- httpd服务器的安装:yum install -y httpd
- ganglia-web及php安装:yum install -y ganglia-web php
中心节点的配置
安装目录说明
- ganglia配置文件目录:
/etc/ganglia
- rrd数据库存放目录:
/var/lib/ganglia/rrds
- ganglia-web安装目录:
/usr/share/ganglia
- ganglia-web配置目录:
/etc/httpd/conf.d/ganglia.conf
相关配置文件修改
将ganglia-web的站点目录连接到httpd主站点目录
$ ln -s /usr/share/ganglia /var/www/html
修改httpd主站点目录下ganglia站点目录的访问权限
将ganglia站点目录访问权限改为apache:apache,否则会报错
$ chown -R apache:apache /var/www/html/ganglia
$ chmod -R 755 /var/www/html/ganglia
修改rrd数据库存放目录访问权限
将rrd数据库存放目录访问权限改为nobody:nobody,否则会报错
$ chown -R nobody:nobody /var/lib/ganglia/rrds
修改ganglia-web的访问权限:
修改/etc/httpd/conf.d/ganglia.conf
将local
改为all granted
修改 dwoo 下面的权限
chmod 777 /var/lib/ganglia/dwoo/compiled
chmod 777 /var/lib/ganglia/dwoo/cache
tips:
如果没有dwoo目录
可以手动创建
mkdir -p /var/lib/ganglia/dwoo/{compiled,cache}
chown -R apache:apache /var/lib/ganglia/dwoo
chmod -R 755 /var/lib/ganglia/dwoo
配置/etc/ganglia/gmetad.conf
配置/etc/ganglia/gmond.conf
中心节点启动
systemctl start httpd.service
systemctl start gmetad.service
systemctl start gmond.service
systemctl enable httpd.service
systemctl enable gmetad.service
systemctl enable gmond.service
关闭selinux
vi /etc/selinux/config,把SELINUX=enforcing改成SELINUX=disable;该方法需要重启机器。
可以使用命令setenforce 0来关闭selinux而不需要重启,刷新页面,即可访问;不过此法只是权宜之计,如果想永久修改selinux设置,还是要使用第一种方法
开启防火墙(如果没有防火墙就不需要这一步了)
firewall-cmd --zone=public --add-port=80/tcp --permanent
firewall-cmd --reload
web端查看
192.168.206.128/ganglia
如果查看ganglia web页面报错:There was an error collecting ganglia data (127.0.0.1:8652): fsockopen error: Permission denied
临时关闭 SELinux 验证(是上面修改/etc/selinux/config后未重启导致)
setenforce 0
16 使用Ganglia监控Flume
../bin/flume-ng agent -n a1 -f example.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.206.128:8649
可将参数写入配置,运行时就可以不用添加