当前位置: 首页 > news >正文

kafka-日志收集平台部署项目

一、部署项目说明

       本文档部署的是基于 Kafka+Filebeat+Nginx+Flask+Celery+Redis 的日志收集与处理平台,核心功能是实时收集 Nginx 反向代理服务器的访问日志与错误日志,通过 Kafka 实现日志高可用传输存储,搭配 Flask 模拟业务场景,Celery+Redis 实现定时任务处理,适用于中小型企业日志监控分析需求。

核心组件作用
组件       版本                核心作用
Kafka    3.6.1              分布式消息队列,接收 Filebeat 日志并存储传输
Filebeat  7.x               轻量级日志采集工具,采集 Nginx 日志发送至 Kafka
Nginx     -                 反向代理 Flask 服务,生成日志供采集
Flask     -                 Python 后端 Web 服务,模拟业务系统
Celery    -                 分布式任务队列,基于 Redis 实现定时任务
Redis     -                 缓存与消息中间件,支撑 Celery 任务调度
JDK       11                Kafka 运行依赖的 Java 环境

二、kafka环境部署

机器需求:
3 台 Rocky9.6 服务器:kafka1(192.168.75.145)、kafka2(192.168.75.142)、kafka3(192.168.75.143),未特别说明则 3 台均执行

第一部分:环境准备(3 台服务器均执行)

1. 配置 Yum 源(替换为阿里云源)

# 进入Yum源目录并备份原有源
cd /etc/yum.repos.d
mkdir repo
mv *.repo repo/

# 下载阿里云Rocky9源
curl -o /etc/yum.repos.d/Rocky-Base.repo http://mirrors.aliyun.com/repo/Rocky-9.repo

# 清理并生成Yum缓存
yum clean all
yum makecache


2. 安装依赖软件(JDK、wget、vim 等)

# 安装EPEL扩展源
yum install epel-release -y

# 安装JDK11、wget、vim
yum install wget vim java-11-openjdk.x86_64 -y

# 验证JDK安装
java -version


3. 配置静态 IP (配置详情:Linux学习-CSDN博客)

4. Host映射(可以不配)

5. 关闭防火墙与 SELinux


第二部分:部署 Kafka 集群(3 台服务器均执行)


1. 下载并解压 Kafka

# 进入/opt目录
cd /opt

# 下载Kafka 3.6.1
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

# 解压安装包
tar xf kafka_2.13-3.6.1.tgz

# 进入Kafka目录
cd kafka_2.13-3.6.1

2. 修改 Kafka 配置文件(3 台分别修改node.id和listeners)

# 编辑Kraft配置文件
vim config/kraft/server.properties

kafka1具体配置:
node.id=1
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093 #这条是三个都一样的配置
listeners=PLAINTEXT://192.168.75.145:9092,CONTROLLER://192.168.75.145:9093  # 自己的ip
advertised.listeners=PLAINTEXT://192.168.75.145:9092   # 自己的ip
controller.listener.names=CONTROLLER

kafka2具体配置:
node.id=2
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093
listeners=PLAINTEXT://192.168.75.142:9092,CONTROLLER://192.168.75.142:9093
advertised.listeners=PLAINTEXT://192.168.75.142:9092
controller.listener.names=CONTROLLER

kafka3具体配置:
node.id=3
controller.quorum.voters=1@192.168.75.145:9093,2@192.168.75.142:9093,3@192.168.75.143:9093
listeners=PLAINTEXT://192.168.75.143:9092,CONTROLLER://192.168.75.143:9093
advertised.listeners=PLAINTEXT://192.168.75.143:9092
controller.listener.names=CONTROLLER

其他不动


# 创建日志存储目录(若修改了log.dirs,按以上教程下来是没有修改的,可以直接跳过)
mkdir -p /opt/kafka_2.13-3.6.1/datas

3. 初始化 Kafka 集群

# 步骤1:在1台服务器(如kafka1)生成集群UUID
cd /opt/kafka_2.13-3.6.1
bin/kafka-storage.sh random-uuid > tmp_random

# 查看生成的UUID
cat tmp_random

# 步骤2:3台服务器均执行初始化(替换UUID为实际生成值,并且三台机子使用同一个uuid)
bin/kafka-storage.sh format -t ZQOJlJMUTtOt09CFKzmX8A -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties
最终显示Formatting complete(初始化成功)

4. Systemd 管理启动 Kafka 集群

# 创建Kafka服务配置文件(三台都做,一模一样)
vim /usr/lib/systemd/system/kafka.service
[Unit]
  Description=Apache Kafka server (KRaft mode)
  Documentation=http://kafka.apache.org/documentation.html
  After=network.target
  [Service]
  Type=forking
  User=root
  Group=root
  Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-2.el7_9.x86_64/bin/"
  ExecStart=/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
  ExecStop=/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
  Restart=on-failure
  [Install]
  WantedBy=multi-user.target

# 重新加载Systemd配置
systemctl daemon-reload

# 启动Kafka并设置开机自启
systemctl start kafka
systemctl enable kafka


5. 测试 Kafka 集群(在 kafka3 执行)

# 创建nginxlog主题
bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog

# 查看主题列表
bin/kafka-topics.sh --list --bootstrap-server kafka3:9092
 
# 启动生产者发送测试消息,这里选择的是kafka3为生产者
bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic nginxlog
# 输入test nginx log  # 这是测试信息,随便输什么都行

# 启动消费者接收消息,这里选择的是kafka1为消费者(三台随便选一个)
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic nginxlog --from-beginning
输出:test nginx log  # 正常接收到生产者的测试信息

第三部分:部署 Filebeat(在 kafka3 执行)

1. 安装 Filebeat

# 导入Elastic GPG密钥
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# 创建Filebeat Yum源
vim /etc/yum.repos.d/fb.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md


# 安装Filebeat
yum install filebeat -y

# 验证安装
filebeat version


2. 配置并验证 Filebeat

# 编辑Filebeat配置文件
vim /etc/filebeat/filebeat.yml
清空这个文件只需要添加以下配置(建议先备份原文件):
# ============================== Filebeat inputs ===============================
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
    - /var/log/nginx/error.log

# ============================== Filebeat outputs ==============================
output.kafka:
  hosts: ["192.168.75.145:9092", "192.168.75.142:9092", "192.168.75.143:9092"]
  topic: nginxlog
  keep_alive: 10s


# 验证配置格式
filebeat test config
输出:Config OK(配置格式正确)

3. 启动 Filebeat

# 启动并设置开机自启
systemctl start filebeat
systemctl enable filebeat

# 验证状态
systemctl status filebeat

第四部分:部署 Nginx 反向代理(在 kafka3 执行)


1. 安装 Nginx

# 安装Nginx
yum install nginx -y

# 验证安装
nginx -v

2. 配置并验证 Nginx

# 创建虚拟主机配置
vim /etc/nginx/conf.d/sc.conf
upstream flask {
        server 192.168.75.142:5000;  
        server 192.168.75.143:5000;  
  
}  
server {
        server_name www.mykafka-nginx.com;
        location / {
          proxy_pass http://flask;
      }
  
}

# 验证Nginx配置
nginx -t

# 重启Nginx并设置开机自启
systemctl restart nginx
systemctl enable nginx

# 验证状态
systemctl status nginx

第五部分:部署 Flask 后端服务(在 kafka2 和 kafka3 执行)

1. 安装 Flask 环境

# 安装Python3和pip3(若未安装)
yum install python3 python3-pip -y

# 升级pip3
pip3 install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple/

# 安装Flask
pip3 install flask -i https://pypi.tuna.tsinghua.edu.cn/simple

2. 编写并启动 Flask 程序

# 创建Flask目录并编写程序
mkdir -p /opt/python-flask
vim /opt/python-flask/app.py
from flask import Flask  
app = Flask(__name__)
@app.route("/")
def index():
    return "this is flask web kafka2"
app.run(host = "0.0.0.0")


# 后台启动Flask
nohup python3 /opt/python-flask/app.py > /opt/python-flask/app.log 2>&1 &

# 验证进程
ps aux | grep app.py

# 测试Flask服务
curl http://192.168.20.162:5000  # kafka2执行,输出this is flask web kafka2
curl http://192.168.20.163:5000  # kafka3执行,输出this is flask web kafka3

第六部分:部署 Celery+Redis(在 kafka1 执行)

1. 安装并配置 Redis

# 安装Redis
yum install redis -y

# 修改Redis配置
vim /etc/redis/redis.conf
bind 0.0.0.0   #监听本机任意ip

# 启动并设置开机自启
systemctl start redis
systemctl enable redis

# 验证Redis
redis-cli -h 192.168.20.161 ping
输出PONG(Redis 连接正常)

2. 安装 Celery 依赖

# 安装Celery和Redis库
pip3 install celery redis -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

# 验证Celery
celery --version

3. 配置 Celery 定时任务

# 创建Celery目录结构
mkdir -p /opt/monitor/celery_app

# 创建config.py、__init__.py、task.py(分别编辑)
vim /opt/monitor/celery_app/config.py
from celery.schedules import crontab
BROKER_URL = 'redis://192.168.20.161:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://192.168.20.161:6379/1' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'celery_app.task',
)
  
CELERYBEAT_SCHEDULE = {
      'celery_app.task.test': {
          'task': 'celery_app.task.test',
          'schedule': crontab(minute='*/1'),
          'args': (-3, 10)
      }
}

vim /opt/monitor/celery_app/__init__.py
from celery import Celery
app = Celery('task')
app.config_from_object('celery_app.config')


vim /opt/monitor/celery_app/task.py
from . import app
@app.task
def test(a,b):
    print("task test start ...")
    result = abs(a) + abs(b)
    print("task test end....")
    return result

4. 启动 Celery(打开两个终端,由于电脑配置不行,所以可以直接打开两个kafka1进行部署)


终端 1:启动 Celery Beat(调度器)
cd /opt/monitor
celery -A celery_app beat
输出调度器启动日志,示例:
plaintext
celery beat v5.3.6 (emerald-rush) is starting.
__    -    ... __   -        _
LocalTime -> 2024-05-20 15:30:00
Configuration ->
  . broker -> redis://192.168.20.161:6379/0
  . loader -> celery.loaders.app.AppLoader
  . scheduler -> celery.beat.PersistentScheduler
  . db -> celerybeat-schedule
  . logfile -> [stderr]@%WARNING
  . maxinterval -> 5.00 seconds (5s)
[2024-05-20 15:30:00,000: INFO/MainProcess] Scheduler: Sending due task celery_app.task.test (celery_app.task.test)

终端 2:启动 Celery Worker(执行器)
bash
cd /opt/monitor
celery -A celery_app worker -l info -c 4
输出 Worker 启动与任务执行日志,终端每分钟输出任务执行日志,稳定显示:
[2024-05-20 15:30:00,000: INFO/MainProcess] Connected to redis://192.168.20.161:6379/0
[2024-05-20 15:30:00,001: INFO/MainProcess] mingle: searching for neighbors
[2024-05-20 15:30:01,002: INFO/MainProcess] mingle: all alone
[2024-05-20 15:30:01,003: INFO/MainProcess] celery@kafka1 ready.
[2024-05-20 15:31:00,000: INFO/MainProcess] Received task: celery_app.task.test[12345678-1234-1234-1234-1234567890ab]
[2024-05-20 15:31:00,001: WARNING/ForkPoolWorker-1] task test start ...
[2024-05-20 15:31:00,001: WARNING/ForkPoolWorker-1] task test end.... result: 13
[2024-05-20 15:31:00,002: INFO/ForkPoolWorker-1] Task celery_app.task.test[12345678-1234-1234-1234-1234567890ab] succeeded in 0.001s: 13

二. 简单效果图


┌───────────────┐        ┌───────────────┐        ┌───────────────┐
│  客户端浏览器  │  → 访问→│  Nginx反向代理 │  → 代理→│  Flask服务集群 │
│  (192.168.20.1)│        │(192.168.20.163)│        │(kafka2/kafka3:5000)│
│  输出:交替显示  │        │  输出:access.log日志 │        │  输出:curl返回指定内容 │
└───────────────┘        └───────┬───────┘        └───────────────┘
                                  │
                                  ▼
                          ┌───────────────┐        ┌───────────────┐
                          │  Filebeat日志采集 │  → 发送→│  Kafka集群     │
                          │(192.168.20.163)│        │(3节点高可用) │
                          │  输出:无显式日志 │        │  输出:消费者接收日志 │
                          └───────────────┘        └───────┬───────┘
                                                          │
                                                          ▼
                                                  ┌───────────────┐
                                                  │  Kafka消费者  │
                                                  │  输出:JSON格式日志 │
                                                  └───────────────┘

┌───────────────┐        ┌───────────────┐        ┌───────────────┐
│  Redis消息中间件 │  ← 调度→│  Celery Beat  │        │  定时任务配置  │
│(192.168.20.161)│        │(任务调度器)  │        │(每分钟执行)  │
│  输出:ping返回PONG │        │  输出:调度任务日志 │        │  输出:无显式内容 │
└───────┬───────┘        └───────────────┘        └───────────────┘
        │
        ▼
┌───────────────┐        ┌───────────────┐
│  Celery Worker │  ← 执行→│  任务结果输出  │
│(4进程处理任务)│        │(result:13)   │
│  输出:任务执行日志 │        │  输出:Worker终端显示 │
└───────────────┘        └───────────────┘

3. 总结
整套平台实现 “日志采集→传输→存储” 与 “业务服务→反向代理→定时任务” 全流程,每步均通过显式输出验证正确性,可基于此扩展日志分析(Elasticsearch+Kibana)、服务监控(Prometheus+Grafana)等功能。

全流程逻辑:

  1. 步骤 1:用户触发业务本地浏览器访问 www.mykafka-nginx.com → 请求被解析到 Nginx(kafka3:80)。
  2. 步骤 2:Nginx 代理与日志生成Nginx 按负载均衡规则,将请求转发到 kafka2 或 kafka3 的 Flask 服务(5000 端口)→ Flask 返回对应内容(如 “this is flask web kafka2”)→ Nginx 生成访问日志(access.log)或错误日志(error.log)。
  3. 步骤 3:Filebeat 采集日志Filebeat(kafka3)实时监听 Nginx 日志文件 → 读取新日志并封装为消息 → 发送到 Kafka 集群的 nginxlog 主题(3 副本存储,确保不丢失)。
  4. 步骤 4:Celery 定时任务调度Celery Beat(kafka1)按 “每分钟” 规则生成任务指令 → 发送到 Redis(kafka1:6379/0)→ Celery Worker(kafka1)从 Redis 读取任务 → 执行任务(如计算 abs (-3)+abs (10)=13)→ 结果回存到 Redis(6379/1)。
  5. 步骤 5:日志消费扩展(可选)后续可新增 Kafka 消费者(如 Logstash)→ 从 Kafka 读取 nginxlog 日志 → 清洗后写入 Elasticsearch 用于分析。

http://www.dtcms.com/a/411107.html

相关文章:

  • 郑州建站推广公司太原市制作网站
  • 学习:uniapp全栈微信小程序vue3后台(28)
  • 如何提高网站流量公众号推广代理
  • 怎么自己做淘宝客网站吗.net响应式网站模板
  • AI投资决策Agent系列——沃伦·巴菲特Agent
  • 网站开发流程知乎深圳线上注册公司
  • PSG数据集概述
  • 《考研408数据结构》第二章《线性表(顺序表、链表)》复习笔记
  • 网站程序语言那个好网页设计的就业和发展前景
  • SpringBoot 日志报错 No static resource favicon.ico
  • TOGAF® 与新兴技术:区块链、物联网与量子计算
  • 提升网站访问量wordpress %postname%
  • 环评登记表在哪个网站做做网站和淘宝美工 最低电脑
  • C++ QT 实现自定义事件
  • 郑州做网站的企业wordpress插件内链
  • 安卓接入Kwai广告源
  • 专业建站lhznkj怎么做下载网站吗
  • 机器学习/深度学习名词理解
  • 无人机散热系统技术要点与难点
  • 使用Weston(Wayland 显示服务器的参考实现)小记
  • 可以制作网站的软件是什么房地产行业网站建设报价方案
  • 潍坊企业免费建站网站建设 应酷
  • Django + Vue3 前后端分离技术实现自动化测试平台从零到有系列 <第三章> 之 基础架构搭建
  • 深入解析:什么是矩阵系统源码搭建定制化开发,支持OEM贴牌
  • Nginx高并发原理与Tomcat实战全解析:从IO模型到HTTPS配置与故障排查(第七周)
  • 网站推广一般在哪个网做百度快照优化推广
  • STM32 外设驱动模块:Tracking 循迹模块
  • 新版发布!“零讯”微信小程序版本更新
  • 广西建设厅微信网站网站正在建设中的
  • 模板板网站wordpress前台登录插件