当前位置: 首页 > news >正文

基于 Fluent-Bit 和 Fluentd 的分布式日志采集与处理方案

#作者:任少近

文章目录

  • 需求描述
  • 系统目标
  • 系统组件
    • Fluent Bit
    • Fluentd
    • Kafka
  • 数据流与处理流程
    • 日志采集
    • 日志转发到 Fluentd
    • 日志处理与转发到 Kafka
    • Kafka 作为消息队列
  • 具体配置
    • Fluent-Bit的CM配置
    • Fluent-Bit的DS配置
    • Fluentd的CM配置
    • Fluentd的DS配置
    • Kafka查询结果

需求描述

Fluent Bit 将日志传输到 Fluentd,Fluentd 再将日志写入 Kafka
背景: 随着系统日志量的增加,尤其是在微服务架构下,日志的收集、处理和传输变得愈加复杂。为了实现高效的日志收集和处理,需要一个可靠且可扩展的日志管道。Fluent Bit、Fluentd 和 Kafka 的结合可以为这一需求提供强大的支持。

系统目标

Fluent Bit 作为日志收集器,从不同来源(如容器、应用程序、系统日志文件等)收集日志。
将收集到的日志实时转发到 Fluentd,Fluentd 对日志进行进一步的处理(如解析、过滤、增强等)。
Fluentd 将处理后的日志通过 Kafka 传输,Kafka 作为消息队列,提供日志的高吞吐量传输和存储。

系统组件

Fluent Bit

功能:负责日志的采集和转发,能够高效地从各种日志源收集日志数据,并将其发送到 Fluentd。
特点:轻量级、高效、低资源消耗,适用于边缘设备和容器环境。

Fluentd

功能:接收来自 Fluent Bit 的日志数据,对日志进行进一步的处理,如过滤、格式转换、增强等。
特点:支持丰富的插件生态系统,能够灵活地扩展和配置,适用于复杂的日志处理和存储需求。

Kafka

功能:作为日志数据的消息队列,提供高吞吐量、可靠的日志传输机制。Fluentd 将日志数据发送到 Kafka,Kafka 作为缓冲区存储和传递日志数据,确保日志的可靠性和可扩展性。
特点:高吞吐量、可扩展、容错能力强。

数据流与处理流程

日志采集

Fluent Bit 部署在日志源所在的节点或容器中,实时监控指定的日志文件(如 /var/log/test.log 等)。
Fluent Bit 使用 tail 插件采集日志,并将其转换为指定的格式(如 JSON)。

日志转发到 Fluentd

Fluent Bit 使用 forward 输出插件将日志数据转发到 Fluentd,通过指定 Fluentd 的 IP 地址和端口进行连接。

日志处理与转发到 Kafka

Fluentd 接收到日志后,可以进行各种处理(如过滤、解析、增强、格式转换等)。
处理后的日志通过 Kafka 输出插件将日志发送到指定的 Kafka 集群。
Kafka 将日志存储在其主题中,以便进行后续的分析、查询和处理。

Kafka 作为消息队列

Kafka 将日志数据持久化到其分区中,提供可靠的消息存储和高吞吐量的数据传输。
消费者 可以从 Kafka 中读取数据进行进一步处理或存储到其他系统(如 Elasticsearch、OpenSearch、数据库等)。

具体配置

Fluent-Bit的CM配置

 fluent-bit.conf: |
    [SERVICE]
        Flush 1
        Parsers_File parsers.conf
        HTTP_Server  On
        HTTP_Listen  0.0.0.0
        HTTP_PORT    3302
    [INPUT]
        Name         tail
        Tag          regex-fluent
        DB           /var/log/regex-fluent.db
        Read_from_Head true
        Path  /var/log/test.log
        Path_Key  pod_log_path
    [OUTPUT]
        Name        forward
        Match       *
        Host        fluentd-service.logging.svc
        Port        24224
        Retry_Limit 5

Fluent-Bit的DS配置

注意:日志目录的挂载

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: logging
  labels:
    k8s-app: fluent-bit-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluent-bit-logging
  template:
    metadata:
      labels:
        k8s-app: fluent-bit-logging
        version: v1
        kubernetes.io/cluster-service: "true"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "2020"
        prometheus.io/path: /api/v1/metrics/prometheus
    spec:
      nodeSelector:
        zk-app: app
      containers:
      - name: fluent-bit
        image: registry.cn-hangzhou.aliyuncs.com/ali_cloud_images/fluent-bit:1.9
        imagePullPolicy: Never
        ports:
          - containerPort: 2020
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc/
        - name: db
          mountPath: /tail-db/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config
      - name: db
        hostPath:
          path: /home/chb/hundun/fluent-bit
          type: Directory
      serviceAccountName: fluent-bit
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      - operator: "Exists"
        effect: "NoExecute"
      - operator: "Exists"
        effect: "NoSchedule"

Fluentd的CM配置

 fluent.conf: |-
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
      tag test
    </source>
    <match test>
      @type kafka2
      brokers 192.168.123.100:9092  # Kafka broker 地址
      topic fluentd_topic
      <format>
       @type json
      </format>
    </match>

Fluentd的DS配置

注意:端口的暴露

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-kafka
  namespace: logging
  labels:
    k8s-app: fluentd-kafka
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-kafka
  template:
    metadata:
      labels:
        k8s-app: fluentd-kafka
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      initContainers:
        - name: init-permission
          image: busybox
          imagePullPolicy: Never
          command: ["sh", "-c", "mkdir -p /var/log/td-agent && chown -R 1000:1000 /var/log/td-agent"]
      nodeSelector:
        zk-app: app
      serviceAccountName: fluentd-kafka
      containers:
      - name: fluentd-kafka
        image: fluentd-kafka:latest
        imagePullPolicy: Never
        securityContext:
          runAsUser: 0
        ports: 
          - containerPort: 24224
            name: forward-port
        volumeMounts:
          - name: fluentd-config-volume
            mountPath: /opt/bitnami/fluentd/conf/fluentd.conf
            subPath: fluent.conf
          - name: varlog
            mountPath: /var/log
          - name: pos
            mountPath: /var/log/td-agent
      volumes:
      - name: fluentd-config-volume
        configMap:
          name: fluentd-config
      - name: varlog
        hostPath:
          path: /home/chb/test
      - name: pos
        hostPath:
          path: /var/log/td-agent
      imagePullSecrets:
        - name: default-secret
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: fluentd-service
  namespace: logging
spec:
  selector:
    k8s-app: fluentd-kafka
  ports:
    - protocol: TCP
      port: 24224
      targetPort: 24224
  clusterIP: None

Kafka查询结果

在这里插入图片描述

相关文章:

  • 【零基础入门unity游戏开发——2D篇】SpriteMask精灵遮罩组件
  • 【蓝桥杯】单片机设计与开发,温度传感器DS18B20
  • TPS入门DAY01 服务器篇
  • US112S-ASEMI智能家居专用US112S
  • 深入理解 IntersectionObserver:让前端滚动监听更高效
  • [AI] 如何将 ComfyUI 的作图能力融合到 OpenWebUI
  • Scala:大数据时代的多面手
  • stm32面试
  • Go+Gin实现安全多文件上传:带MD5校验的完整解决方案
  • 使用Java爬虫按关键字搜索淘宝商品?
  • 用matlab探索卷积神经网络(Convolutional Neural Networks)-3
  • 2025年- G33-Lc107-150. 评估逆波兰表示法--java版
  • 电脑办公之文件(夹)操作
  • CentOS-查询实时报错日志-查询前1天业务报错gz压缩日志
  • 当AI开始“思考“:揭秘大语言模型的文字认知三部曲题
  • 使用RKNN进行yolo11-cls部署
  • Java的Stream流
  • 大量意图识别方案
  • 目标跟踪综合知识
  • 前端抽象化,打破框架枷锁:Http请求也许该一样
  • 面对非专业人士,科学家该如何提供建议
  • 巴基斯坦军方:印度导弹袭击巴首都附近空军基地
  • “80后”赵亮出任上海普陀区委副书记
  • 47本笔记、2341场讲座,一位普通上海老人的阅读史
  • 中美“第二阶段”贸易协定是否会在会谈中提出?商务部回应
  • 动物只有在被认为对人类有用时,它们的建筑才会被特别设计