kafka-日志收集平台部署项目
一、部署项目说明
本文档部署的是基于 Kafka+Filebeat+Nginx+Flask+Celery+Redis 的日志收集与处理平台,核心功能是实时收集 Nginx 反向代理服务器的访问日志与错误日志,通过 Kafka 实现日志高可用传输存储,搭配 Flask 模拟业务场景,Celery+Redis 实现定时任务处理,适用于中小型企业日志监控分析需求。
核心组件作用
组件 版本 核心作用
Kafka 3.6.1 分布式消息队列,接收 Filebeat 日志并存储传输
Filebeat 7.x 轻量级日志采集工具,采集 Nginx 日志发送至 Kafka
Nginx - 反向代理 Flask 服务,生成日志供采集
Flask - Python 后端 Web 服务,模拟业务系统
Celery - 分布式任务队列,基于 Redis 实现定时任务
Redis - 缓存与消息中间件,支撑 Celery 任务调度
JDK 11 Kafka 运行依赖的 Java 环境
二、kafka环境部署
机器需求:
3 台 Rocky9.6 服务器:kafka1(192.168.75.145)、kafka2(192.168.75.142)、kafka3(192.168.75.143),未特别说明则 3 台均执行
第一部分:环境准备(3 台服务器均执行)
1. 配置 Yum 源(替换为阿里云源)
# 进入Yum源目录并备份原有源
cd /etc/yum.repos.d
mkdir repo
mv *.repo repo/# 下载阿里云Rocky9源
curl -o /etc/yum.repos.d/Rocky-Base.repo http://mirrors.aliyun.com/repo/Rocky-9.repo# 清理并生成Yum缓存
yum clean all
yum makecache
2. 安装依赖软件(JDK、wget、vim 等)
# 安装EPEL扩展源
yum install epel-release -y# 安装JDK11、wget、vim
yum install wget vim java-11-openjdk.x86_64 -y# 验证JDK安装
java -version
3. 配置静态 IP (配置详情:Linux学习-CSDN博客)
4. Host映射(可以不配)
5. 关闭防火墙与 SELinux
第二部分:部署 Kafka 集群(3 台服务器均执行)
1. 下载并解压 Kafka
# 进入/opt目录
cd /opt# 下载Kafka 3.6.1
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz# 解压安装包
tar xf kafka_2.13-3.6.1.tgz# 进入Kafka目录
cd kafka_2.13-3.6.1
2. 修改 Kafka 配置文件(3 台分别修改node.id和listeners)
# 编辑Kraft配置文件
vim config/kraft/server.propertieskafka1具体配置:
node.id=1
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093 #这条是三个都一样的配置
listeners=PLAINTEXT://192.168.75.145:9092,CONTROLLER://192.168.75.145:9093 # 自己的ip
advertised.listeners=PLAINTEXT://192.168.75.145:9092 # 自己的ip
controller.listener.names=CONTROLLERkafka2具体配置:
node.id=2
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093
listeners=PLAINTEXT://192.168.75.142:9092,CONTROLLER://192.168.75.142:9093
advertised.listeners=PLAINTEXT://192.168.75.142:9092
controller.listener.names=CONTROLLERkafka3具体配置:
node.id=3
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093
listeners=PLAINTEXT://192.168.75.143:9092,CONTROLLER://192.168.75.143:9093
advertised.listeners=PLAINTEXT://192.168.75.143:9092
controller.listener.names=CONTROLLER其他不动
# 创建日志存储目录(若修改了log.dirs,按以上教程下来是没有修改的,可以直接跳过)
mkdir -p /opt/kafka_2.13-3.6.1/datas
3. 初始化 Kafka 集群
# 步骤1:在1台服务器(如kafka1)生成集群UUID
cd /opt/kafka_2.13-3.6.1
bin/kafka-storage.sh random-uuid > tmp_random# 查看生成的UUID
cat tmp_random# 步骤2:3台服务器均执行初始化(替换UUID为实际生成值,并且三台机子使用同一个uuid)
bin/kafka-storage.sh format -t ZQOJlJMUTtOt09CFKzmX8A -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties
最终显示Formatting complete(初始化成功)
4. Systemd 管理启动 Kafka 集群
# 创建Kafka服务配置文件(三台都做,一模一样)
vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (KRaft mode)
Documentation=http://kafka.apache.org/documentation.html
After=network.target
[Service]
Type=forking
User=root
Group=root
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-2.el7_9.x86_64/bin/"
ExecStart=/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
ExecStop=/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target# 重新加载Systemd配置
systemctl daemon-reload# 启动Kafka并设置开机自启
systemctl start kafka
systemctl enable kafka
5. 测试 Kafka 集群(在 kafka3 执行)
# 创建nginxlog主题
bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog# 查看主题列表
bin/kafka-topics.sh --list --bootstrap-server kafka3:9092
# 启动生产者发送测试消息,这里选择的是kafka3为生产者
bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic nginxlog
# 输入test nginx log # 这是测试信息,随便输什么都行# 启动消费者接收消息,这里选择的是kafka1为消费者(三台随便选一个)
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic nginxlog --from-beginning
输出:test nginx log # 正常接收到生产者的测试信息
第三部分:部署 Filebeat(在 kafka3 执行)
1. 安装 Filebeat
# 导入Elastic GPG密钥
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch# 创建Filebeat Yum源
vim /etc/yum.repos.d/fb.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
# 安装Filebeat
yum install filebeat -y# 验证安装
filebeat version
2. 配置并验证 Filebeat
# 编辑Filebeat配置文件
vim /etc/filebeat/filebeat.yml
清空这个文件只需要添加以下配置(建议先备份原文件):
# ============================== Filebeat inputs ===============================
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log# ============================== Filebeat outputs ==============================
output.kafka:
hosts: ["192.168.75.145:9092", "192.168.75.142:9092", "192.168.75.143:9092"]
topic: nginxlog
keep_alive: 10s
# 验证配置格式
filebeat test config
输出:Config OK(配置格式正确)
3. 启动 Filebeat
# 启动并设置开机自启
systemctl start filebeat
systemctl enable filebeat# 验证状态
systemctl status filebeat
第四部分:部署 Nginx 反向代理(在 kafka3 执行)
1. 安装 Nginx
# 安装Nginx
yum install nginx -y# 验证安装
nginx -v
2. 配置并验证 Nginx
# 创建虚拟主机配置
vim /etc/nginx/conf.d/sc.conf
upstream flask {
server 192.168.75.142:5000;
server 192.168.75.143:5000;
}
server {
server_name www.mykafka-nginx.com;
location / {
proxy_pass http://flask;
}
}# 验证Nginx配置
nginx -t# 重启Nginx并设置开机自启
systemctl restart nginx
systemctl enable nginx# 验证状态
systemctl status nginx
第五部分:部署 Flask 后端服务(在 kafka2 和 kafka3 执行)
1. 安装 Flask 环境
# 安装Python3和pip3(若未安装)
yum install python3 python3-pip -y# 升级pip3
pip3 install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple/# 安装Flask
pip3 install flask -i https://pypi.tuna.tsinghua.edu.cn/simple
2. 编写并启动 Flask 程序
# 创建Flask目录并编写程序
mkdir -p /opt/python-flask
vim /opt/python-flask/app.py
from flask import Flask
app = Flask(__name__)
@app.route("/")
def index():
return "this is flask web kafka2"
app.run(host = "0.0.0.0")
# 后台启动Flask
nohup python3 /opt/python-flask/app.py > /opt/python-flask/app.log 2>&1 &# 验证进程
ps aux | grep app.py# 测试Flask服务
curl http://192.168.20.162:5000 # kafka2执行,输出this is flask web kafka2
curl http://192.168.20.163:5000 # kafka3执行,输出this is flask web kafka3
第六部分:部署 Celery+Redis(在 kafka1 执行)
1. 安装并配置 Redis
# 安装Redis
yum install redis -y# 修改Redis配置
vim /etc/redis/redis.conf
bind 0.0.0.0 #监听本机任意ip# 启动并设置开机自启
systemctl start redis
systemctl enable redis# 验证Redis
redis-cli -h 192.168.20.161 ping
输出PONG(Redis 连接正常)
2. 安装 Celery 依赖
# 安装Celery和Redis库
pip3 install celery redis -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple# 验证Celery
celery --version
3. 配置 Celery 定时任务
# 创建Celery目录结构
mkdir -p /opt/monitor/celery_app# 创建config.py、__init__.py、task.py(分别编辑)
vim /opt/monitor/celery_app/config.py
from celery.schedules import crontab
BROKER_URL = 'redis://192.168.20.161:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://192.168.20.161:6379/1' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个
'celery_app.task',
)
CELERYBEAT_SCHEDULE = {
'celery_app.task.test': {
'task': 'celery_app.task.test',
'schedule': crontab(minute='*/1'),
'args': (-3, 10)
}
}vim /opt/monitor/celery_app/__init__.py
from celery import Celery
app = Celery('task')
app.config_from_object('celery_app.config')
vim /opt/monitor/celery_app/task.py
from . import app
@app.task
def test(a,b):
print("task test start ...")
result = abs(a) + abs(b)
print("task test end....")
return result
4. 启动 Celery(打开两个终端,由于电脑配置不行,所以可以直接打开两个kafka1进行部署)
终端 1:启动 Celery Beat(调度器)
cd /opt/monitor
celery -A celery_app beat
输出调度器启动日志,示例:
plaintext
celery beat v5.3.6 (emerald-rush) is starting.
__ - ... __ - _
LocalTime -> 2024-05-20 15:30:00
Configuration ->
. broker -> redis://192.168.20.161:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 seconds (5s)
[2024-05-20 15:30:00,000: INFO/MainProcess] Scheduler: Sending due task celery_app.task.test (celery_app.task.test)终端 2:启动 Celery Worker(执行器)
bash
cd /opt/monitor
celery -A celery_app worker -l info -c 4
输出 Worker 启动与任务执行日志,终端每分钟输出任务执行日志,稳定显示:
[2024-05-20 15:30:00,000: INFO/MainProcess] Connected to redis://192.168.20.161:6379/0
[2024-05-20 15:30:00,001: INFO/MainProcess] mingle: searching for neighbors
[2024-05-20 15:30:01,002: INFO/MainProcess] mingle: all alone
[2024-05-20 15:30:01,003: INFO/MainProcess] celery@kafka1 ready.
[2024-05-20 15:31:00,000: INFO/MainProcess] Received task: celery_app.task.test[12345678-1234-1234-1234-1234567890ab]
[2024-05-20 15:31:00,001: WARNING/ForkPoolWorker-1] task test start ...
[2024-05-20 15:31:00,001: WARNING/ForkPoolWorker-1] task test end.... result: 13
[2024-05-20 15:31:00,002: INFO/ForkPoolWorker-1] Task celery_app.task.test[12345678-1234-1234-1234-1234567890ab] succeeded in 0.001s: 13
二. 简单效果图
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 客户端浏览器 │ → 访问→│ Nginx反向代理 │ → 代理→│ Flask服务集群 │
│ (192.168.20.1)│ │(192.168.20.163)│ │(kafka2/kafka3:5000)│
│ 输出:交替显示 │ │ 输出:access.log日志 │ │ 输出:curl返回指定内容 │
└───────────────┘ └───────┬───────┘ └───────────────┘
│
▼
┌───────────────┐ ┌───────────────┐
│ Filebeat日志采集 │ → 发送→│ Kafka集群 │
│(192.168.20.163)│ │(3节点高可用) │
│ 输出:无显式日志 │ │ 输出:消费者接收日志 │
└───────────────┘ └───────┬───────┘
│
▼
┌───────────────┐
│ Kafka消费者 │
│ 输出:JSON格式日志 │
└───────────────┘
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Redis消息中间件 │ ← 调度→│ Celery Beat │ │ 定时任务配置 │
│(192.168.20.161)│ │(任务调度器) │ │(每分钟执行) │
│ 输出:ping返回PONG │ │ 输出:调度任务日志 │ │ 输出:无显式内容 │
└───────┬───────┘ └───────────────┘ └───────────────┘
│
▼
┌───────────────┐ ┌───────────────┐
│ Celery Worker │ ← 执行→│ 任务结果输出 │
│(4进程处理任务)│ │(result:13) │
│ 输出:任务执行日志 │ │ 输出:Worker终端显示 │
└───────────────┘ └───────────────┘
3. 总结
整套平台实现 “日志采集→传输→存储” 与 “业务服务→反向代理→定时任务” 全流程,每步均通过显式输出验证正确性,可基于此扩展日志分析(Elasticsearch+Kibana)、服务监控(Prometheus+Grafana)等功能。
全流程逻辑:
- 步骤 1:用户触发业务本地浏览器访问
www.mykafka-nginx.com
→ 请求被解析到 Nginx(kafka3:80)。 - 步骤 2:Nginx 代理与日志生成Nginx 按负载均衡规则,将请求转发到 kafka2 或 kafka3 的 Flask 服务(5000 端口)→ Flask 返回对应内容(如 “this is flask web kafka2”)→ Nginx 生成访问日志(access.log)或错误日志(error.log)。
- 步骤 3:Filebeat 采集日志Filebeat(kafka3)实时监听 Nginx 日志文件 → 读取新日志并封装为消息 → 发送到 Kafka 集群的
nginxlog
主题(3 副本存储,确保不丢失)。 - 步骤 4:Celery 定时任务调度Celery Beat(kafka1)按 “每分钟” 规则生成任务指令 → 发送到 Redis(kafka1:6379/0)→ Celery Worker(kafka1)从 Redis 读取任务 → 执行任务(如计算 abs (-3)+abs (10)=13)→ 结果回存到 Redis(6379/1)。
- 步骤 5:日志消费扩展(可选)后续可新增 Kafka 消费者(如 Logstash)→ 从 Kafka 读取
nginxlog
日志 → 清洗后写入 Elasticsearch 用于分析。