Kafka集成Flume
目录
- Flume生产者
- Flume消费者
Flume生产者
启动kafka集群
启动kafka消费者
修改flume的conf目录下的log4j.properties
配置文件,配置日志文件路径
编写配置文件
vim file_to_kafka.conf
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
flume/bin/flume-ng agent -c flume/conf/ -n a1 -f file_to_kafka.conf
在监控目录下创建app.log
文件
使用echo
命令往该文件添加内容
可以看到kafka消费到了数据
Flume消费者
编辑配置文件
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = master:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = logger# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动Flume
flume/bin/flume-ng agent -c flume/conf/ -n a1 -f kafka_to_file.conf -Dflume.root.logger=INFO,console
使用kafka生产者发送数据:
使用tail -f
监控日志文件