当前位置: 首页 > news >正文

分布式专题——20 Kafka快速入门

1 Kafka 介绍

1.1 MQ 的作用

  • MQ(MessageQueue,消息队列)是一种遵循先进先出(FIFO)的数据结构,用于跨进程传递消息。典型的MQ系统中,生产者发送消息到MQ排队,再按序由消费者处理;

  • MQ主要有三个作用:

    • 异步:提高了系统的响应速度和吞吐量;

    • 解耦:一方面,服务间解耦可减少相互影响,提升系统整体稳定性与可扩展性;另一方面,解耦后能实现数据分发,生产者发消息后,可由一个或多个消费者消费,且消费者增减不影响生产者;

    • 削峰:让系统以稳定资源应对突发的流量冲击。

1.2 Kafka 产品介绍

  • Kafka是目前最具影响力的开源MQ产品,官网:Apache Kafka;

  • 它是一个开源的分布式事件流处理平台,被数千家公司用于高性能数据管道、流分析、数据集成以及关键任务应用,并且超过80%的《财富》100强企业都信赖并使用Kafka。从行业应用来看,制造业前十的企业都在使用,银行业前十中有七家,保险业前十全部使用,电信业前十中有八家在使用;

    在这里插入图片描述

  • Kafka最初由LinkedIn开发并于2011年开源,主要解决大规模数据的实时流式处理和数据管道问题。它是分布式的发布-订阅消息系统,能快速处理高吞吐量的数据流,并将数据实时分发给多个消费者。Kafka消息系统由多个broker(服务器)组成,这些broker可在多个数据中心之间分布式部署,以提供高可用性和容错性;

  • Kafka采用高效的数据存储和管理技术,能轻松处理TB级的数据量,具备高吞吐量、低延迟、可扩展性、持久性和容错性等优点。在企业级应用中,Kafka被广泛用于实时流处理、日志聚合、监控和数据分析等方面,还能与Hadoop、Spark和Storm等其他大数据工具集成,构建完整的数据处理生态系统。

1.3 Kafka 的特点

  • Kafka最初诞生于LinkedIn公司,核心作用是收集并处理庞大复杂的应用日志。典型日志聚合应用场景如下:

    • Application(应用)作为Producer(生产者),nginx通过Flume、log4j通过LogStash,将数据发送到Kafka集群;
    • 接着Flink、Spark、Streams等组件从Kafka获取数据,分别用于微服务链路追踪、SEO统计、用户行为分析等;
    • 最后处理后的数据存储到ES(Elasticsearch)、Hadoop等系统中;

    在这里插入图片描述

  • 业务场景决定了Kafka的产品特点,其最典型的特点有:

    • 数据吞吐量很大:需要快速收集各个渠道的海量日志;

    • 集群容错性高:允许集群中少量节点崩溃;

    • 功能不需要太复杂:设计目标是高吞吐、低延迟和可扩展,主要关注消息传递而非消息处理,所以不支持死信队列、顺序消息等高级功能;

    • 允许少量数据丢失:在海量应用日志中,少量日志丢失不影响结果,设计初衷允许少量数据丢失,不过Kafka也在不断优化数据安全问题。

2 快速上手 Kafka

2.1 快速搭建单机服务

  • Kafka 的运行环境非常简单,只要有 JVM 就可以运行,下面使用一台安装了 JDK1.8 的 CentOS9 机器作演示;

  • 下载 Kafka:Apache Kafka。选择kafka_2.13-3.8.0.tgz版本;

    • 其中 2.13 是开发 Kafka 所用 Scala 语言的版本,3.8.0 是 Kafka 应用版本;
    • Scala 运行于JVM,运行时只需要 JDK,选择什么版本都可以。但是调试源码就需要对应的 Scala 版本(因为 Scala 版本不向后兼容);
  • 下载 ZooKeeper:Apache ZooKeeper。Zookeeper 的版本没有强制要求,此处选择3.8.4版本;

    Kafka 的安装程序中自带了 Zookeeper,可以在 Kafka 的安装包的libs目录下查看到 ZooKeeper 的客户端 jar 包。但是,通常情况下,为了让应用更好维护,我们会使用单独部署的 ZooKeeper,而不使用 Kafka 自带的 ZooKeeper;

  • 将下载的 Kafka 和 ZooKeeper 工具包上传到服务器,解压后分别放到/app/kafka/app/zookeeper目录;配置KAFKA_HOME环境变量指向 Kafka 安装目录,再把两个组件部署目录下的bin目录路径配置到path环境变量中;

  • 启动 ZooKeeper:启动 Kafka 前需先启动 Zookeeper,这里就先使用 Kafka 自带的 Zookeeper,启动脚本在bin目录下:

    cd $KAFKA_HOME
    nohup bin/zookeeper - server - start.sh config/zookeeper.properties &
    
    • 要注意脚本是否有执行权限;
    • 可从nohup.out中看到 ZooKeeper 默认在2181端口启动,用jps指令可以看到 QuorumPeerMain 进程,进而可以确认启动成功;
  • 启动 Kafka:

    nohup bin/kafka - server - start.sh config/server.properties &
    
    • 启动后用jps指令看到 Kafka 进程,且服务默认在9092端口启动,即表示启动成功。

2.2 简单收发消息

  • 基础工作机制:消息发送者将消息发送到Kafka指定的topic,消息消费者从指定topic消费消息;

    在这里插入图片描述

  • 创建 Topic:名为test

    bin/kafka - topics.sh --create --topic test --bootstrap - server localhost:9092
    
  • 查看Topic

    bin/kafka - topics.sh --describe --topic test --bootstrap - server localhost:9092
    
  • 消息发送:启动消息发送者,往名为test的Topic发送消息,命令行出现>符号后可随意输入字符,按Ctrl + C退出命令行,完成发消息操作;

    bin/kafka - console - producer.sh --broker - list localhost:9092 --topic test
    

    若不提前创建Topic,第一次往不存在的Topic发消息时,消息能正常发送,但会抛出LEADER_NOT_AVAILABLE警告,这是因为Broker端创建主题后,会通知Clients端LEADER_NOT_AVAILABLE异常,Clients端收到后会主动更新元数据获取新主题信息;

  • 消息消费:启动消息消费者,从testTopic接收消息;

    • 生产者和消费者无需同时启动,二者相互解耦,没有生产者时消费者可正常工作,反之亦然;
    bin/kafka - console - consumer.sh --bootstrap - server localhost:9092 --topic test
    
  • 其他消费模式

    • 指定消费进度(从开头消费):若要消费之前发送的消息,可添加--from - beginning参数,命令如下:

      bin/kafka - console - consumer.sh --bootstrap - server localhost:9092 --from - beginning --topic test
      
    • 精确指定消费起始位置:如下,表示从第0号Partition上的第四个消息开始读取,后续会介绍Partition(分区)和Offset(偏移量)的概念:

      bin/kafka - console - consumer.sh --bootstrap - server localhost:9092 --partition 0 --offset 4 --topic test
      

2.3 消费者组

  • 消费者组的基本规则:每个消费者可指定所属的消费者组,Kafka 中的同一条消息只能被同一个消费者组里的某一个消费者消费,不属于该组的其他消费者也能消费这条消息。在kafka-console-consumer.sh脚本中,可通过--consumer-property group.id=testGroup指定消费者组,比如可启动三个消费者组验证分组消费机制;

    # 两个消费者实例属于同一个消费者组(执行两次下面的命令模拟两个实例)
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
    # 一个消费者实例属于不同的消费者组
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test
    
  • 查看消费者组的偏移量:使用kafka-consumer-groups.sh可观测消费者组情况,包括消费进度;

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
    

    在这里插入图片描述

    • 从执行结果能看到,Kafka 以消费者组为单位,分别记录每个Partition(分区)上的消息偏移量;
    • 增加新的消费者组,不会影响 Kafka 的消息数据,只需新增一条偏移量记录,所以 Kafka 的消息复读效率很高。

2.4 Kafka 的消息传递机制

  • 从上面的例子中可以看到, Kafka 的消息发送者和消息消费者是通过 Topic 来进行业务沟通。但实际上,所有的消息是存在服务端的Partition这样⼀个数据结构中;

    在这里插入图片描述

  • 关键概念

    • 客户端(Client):包含消息生产者和消息消费者,之前的操作有简单涉及;
    • 消费者组:每个消费者可指定所属消费者组,同组消费者构成逻辑消费者组。一条消息会被多个感兴趣的消费者组消费,但在同一个消费者组内部,一条消息只会被消费一次;
    • 服务端Broker:一个 Kafka 服务器就是一个 Broker;
    • 话题(Topic):逻辑概念,被视为业务含义相同的一组消息,客户端通过绑定Topic来生产或消费感兴趣的消息;
    • 分区(Partition)Topic是逻辑概念,Partition是实际存储消息的组件,每个Partition是一个FIFO(先进先出)的队列结构,所有消息按此顺序保存在Partition分区中。

3 Kafka 的集群工作机制

3.1 Kafka 集群

  • 集群架构

    • Kafka集群包含多个Broker(Kafka服务器),每个Topic下的Partition会分布在不同Broker上,且每个Partition有Leader(主节点)和Follower(从节点);
    • Zookeeper集群(如ZK1、ZK2、ZK3)负责Broker的注册节点以及Partition的Leader选举等工作,消息生产者(Producer)向Topic发送消息,不同消费者组(如groupA、groupB)从Topic消费消息;

    在这里插入图片描述

  • 使用集群的原因:即使单机版的 Kafka 已具备高性能(TPS可达百万级TPS),但在实际工作中使用时,单机搭建的 Kafka 会有很大的局限性;

    • 解决海量消息存储问题

      • Kafka 的设计目标是处理海量消息,但单机存储能力有限,难以承载一个 Topic 下的大量消息;
      • 集群模式通过将 Topic 的消息分割为多个 Partition(分区),并将这些 Partition 分布到不同的 Broker(服务器)上,实现了数据的分布式存储;
      • 每个 Broker 只需存储部分数据,大幅提升了整体存储容量,而 Partition 的数量即为分区数,可根据业务需求灵活配置;
    • 保障数据安全性与服务稳定性

      • 单机服务存在单点故障风险:一旦服务崩溃,数据可能永久丢失;
      • 集群模式为每个 Partition 配置一个或多个 Follower (从节点)作为备份。同时,借助 Zookeeper 集群进行 Leader (主节点)选举:
        • Leader 节点负责处理客户端的读写请求并保存消息;
        • Follower 节点实时同步 Leader 的数据,作为备用;
        • 当 Leader 故障时,Zookeeper 会从 Follower 中选举新的 Leader,确保服务不中断,数据不丢失;
    • 提升集群整体可用性

      • Kafka 集群的关键元数据(如 Broker 信息、Partition 的 Leader/Follower 分布、选举状态等)统一存储在 Zookeeper 集群中;
      • 即使部分 Broker 崩溃,Zookeeper 仍能维护集群的核心信息,保证剩余节点可继续协作,避免整个集群因局部故障而瘫痪,从而维持服务的持续可用;

    Kafka 也提供了另外一种不需要 ZooKeeper 的集群机制,即 Kraft 集群。这种方式会在后面进行介绍。

3.2 搭建 Kafka 集群

  • 服务器准备:需三台安装好JDK且关闭防火墙的CentOS服务器;

    # 停止防火墙服务
    service firewalld stop
    # 禁用防火墙开机自启
    systemctl disable firewalld
    
  • 机器名配置:分别将三台机器命名为worker1、worker2、worker3;

    vi /etc/hosts192.168.232.128 worker1
    192.168.232.129 worker2
    192.168.232.130 worker3
    
  • 部署Zookeeper集群(因为 Zookeeper 的选举机制,所以建议部署奇数台机器,三台机器上都要做以下操作)

    • 解压与配置文件准备:使用之前单独下载的 Zookeeper,将其解压到/app/zookeeper目录。进入conf目录,将zoo_sample.cfg复制为zoo.cfg,修改关键配置:

      # 设置Zookeeper本地数据目录,避免用默认/tmp(临时目录易被清理)
      dataDir=/app/zookeeper/data
      # 指定Zookeeper对客户端开放的服务端口
      clientPort=2181
      # 集群节点配置。server.x中x是节点在集群中的myid,2888是集群内部数据传输端口,3888是集群内部选举端口
      server.1=192.168.232.128:2888:3888
      server.2=192.168.232.129:2888:3888
      server.3=192.168.232.130:2888:3888
      
    • 分发与myid文件创建

      # 进入/app/zookeeper/data目录
      cd /app/zookeeer/data
      # 生成myid文件,myid内容对应zoo.cfg中配置的server.id
      echo 1 > myid
      
    • 启动与验证:在三台机器上都启动Zookeeper服务,启动后用jps指令查看,若有QuorumPeerMain进程则启动成功

      bin/zkServer.sh --config conf start
      
    • 三台机器都启动后,用bin/zkServer.sh status查看集群状态,Modeleader表示主节点,follower表示从节点;

      [root@hadoop02 zookeeper-3.5.8]# bin/zkServer.sh status
      ZooKeeper JMX enabled by default
      Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
      Client port found: 2181. Client address: localhost.
      Mode: leader
      
  • 部署Kafka集群(三台机器上都要做以下操作)

    • 解压与配置文件修改:Kafka服务无需选举,无奇数台服务建议。将Kafka解压到/app/kafka目录,进入config目录修改server.properties,关键配置有:

      # Broker的全局唯一编号,每个服务器上需不同,分发时要修改
      broker.id=0
      # 服务监听地址
      listeners=PLAINTEXT://worker1:9092
      # 数据文件地址,避免默认使用 /tmp 目录
      log.dirs=/app/kafka/logs
      # 默认每个Topic的分区数
      num.partitions=1
      # Zookeeper的服务地址,若为集群用逗号连接
      zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
      

      下面是server.properties文件中比较重要的核心配置:

      PropertyDefaultDescription
      broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可
      log.dirs/tmp/kafka-logsKafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行
      listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置Kafka本机ip即可
      zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是ZooKeeper集群,用逗号连接
      log.retention.hours168每个日志文件删除之前保存的时间
      num.partitions1创建topic的默认分区数
      default.replication.factor1自动创建topic的默认副本数量
      min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
      delete.topic.enablefalse是否允许删除主题
    • 启动与验证:多个Kafka服务注册到同一个Zookeeper集群节点会自动组成集群。启动Kafka服务(-daemon表示后台启动),启动后可以用jps指令查看Kafka进程;

      bin/kafka-server-start.sh -daemon config/server.properties
      

3.3 服务端的Topic、Partition和Broker

  • Kafka 集群下基本操作:

    # 创建一个分布式Topic(disTopic)
    # --partitions指定分区数为 4,意味着该 Topic 下的消息会被划分为 4 个部分
    # --replication-factor指定每个分区有 2 个备份。大部分创建参数可在配置文件中指定默认值
    [oper@worker1 bin]$ ./kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
    Created topic disTopic.# 列出所有Topic
    [oper@worker1 bin]$ ./kafka-topics.sh --bootstrap-server worker1:9092 --list
    __consumer_offsets
    disTopic# 查看disTopic的详细信息
    # Partition:列出 4 个分区及编号,用于标识分区
    # Leader:每个分区的主节点,负责响应客户端请求,这样可分散客户端请求
    # Replicas:分区的多个备份所在的 Broker(对应配置的broker.id),是逻辑上的分配情况,即使节点故障也会列出节点 ID,也称为 AR
    # ISR:分区的实际分配情况,是 AR 的子集,只列出当前存活且能正常同步数据的 Broker 节点
    [oper@worker1 bin]$ ./kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic
    Topic: disTopic TopicId: vX4ohhIER6aDpDZgTy10tQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824Topic: disTopic Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1Topic: disTopic Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0Topic: disTopic Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2Topic: disTopic Partition: 3    Leader: 2       Replicas: 2,0   Isr: 2,0
    
  • 进入配置Kafka集群时指定的log.dirs(日志目录),可看到每个Broker的实际数据承载情况。Broker上的一个Partition对应日志目录中的一个目录,该Partition的所有消息就存储在对应目录中;

    在这里插入图片描述

  • 整体逻辑与物理关系Topic是数据集合的逻辑单元,同一Topic的数据实际存储在Partition(数据存储的物理单元)中,BrokerPartition的物理载体,Partition会尽量均匀分配到不同Broker上,offset是每个消息在Partition上的偏移量;

    在这里插入图片描述

  • 这样的额设计解决了什么问题?

    • 支持海量数据与扩展吞吐量:Kafka 需处理海量数据,单个 Broker 存不下,拆分成多个Partition,每个 Broker 存部分数据,极大扩展了集群吞吐量;

    • 保证数据安全与提高并发读:每个Partition有部分消息副本,若只在一个 Broker 上易出现单点故障,设计Follower节点进行数据备份保证数据安全,多备份的Partition设计也提高了读取消息的并发度;

    • 请求响应与数据分发:同一Topic的多个Partition会产生一个Leader Partition,负责响应客户端请求并将数据分发给其他Partition

4 总结:Kafka 集群的消息流转模型

在这里插入图片描述

  • Topic的逻辑作用:Topic是逻辑概念,生产者(Producer)和消费者(Consumer)通过Topic进行业务沟通,它是两者之间业务交互的“桥梁”;

  • Topic与Partition的关系:

    • Topic本身不存储数据,其下的数据被分为多组Partition,且这些Partition尽量平均分散到各个Broker上;
    • 每组Partition包含Topic下的一部分消息,同时每组Partition有一个Leader Partition,还有若干Follower Partition进行备份,每组Partition的备份个数称为备份因子(replica factor);
  • 消息生产与消费的进度记录:生产者将消息发送到对应的Partition上,消费者通过Partition上的Offset(偏移量),记录自己所属消费者组(Group)在当前Partition上消费消息的进度;

  • 消息对消费者组的投递规则:

    • 生产者发送给一个Topic的消息,会由Kafka推送给所有订阅了这个Topic的消费者组进行处理;
    • 但在每个消费者组内部,同一条消息只会有一个消费者实例进行处理;
  • Broker集群与Controller:

    • Kafka的Broker通过Zookeeper组成集群。在这些Broker中,需要选举产生一个担任Controller角色的Broker;
    • Controller的主要任务是负责Topic的分配以及后续的管理工作,在上面搭建的集群中,这个Controller是通过ZooKeeper产生的。
http://www.dtcms.com/a/403693.html

相关文章:

  • SSH公钥私钥!进阶!SSH与Git!
  • 网站必须兼容哪些浏览器中核正式员工年收入
  • 珠海网站品牌设计公司简介网络新闻专题做的最好的网站
  • keepalived服务器
  • AI写的超级好用的课堂互动系统
  • 山东建设机械协会网站课程网站建设的设计报告
  • 第四部分:Netty核心源码解析(下)
  • 攻克 大 Excel 上传难题:从异步处理到并发去重的全链路解决方案
  • 【双光相机配准】红外相机与可见光相机配准方案
  • 中国建设银行网站个人客户wordpress 主题显示
  • 开源超级终端PuTTY改进之:增加点对点网络协议IocHub,实现跨网段远程登录
  • 帮别人做网站如何备案wordpress video plugin
  • 118. 杨辉三角(dp)
  • 济宁网站开发招聘威海建设集团官方网站
  • 【QT】QPainter的使用
  • 北京代理网站备案成都市建设工程交易中心网站
  • PyTorch 数据处理工具箱与可视化工具
  • python的高阶函数
  • Python请求示例JD商品评论API接口,json数据返回
  • Json格式化处理碰到的问题
  • 驱动开发(4)|鲁班猫rk356x镜像编译,及启用SPI控制器驱动
  • Rust语言了解
  • 深圳成交型网站建设天元建设集团有限公司企业号
  • 织梦系统做的网站忘记登录密码semir是什么品牌
  • Python实现ETF网格自动化交易集成动量阈值判断
  • 使用c语言连接数据库
  • 网站在百度找不到了王占山人物简介
  • Windows Server 定时备份 MySQL 数据升级版:单表备份 + 压缩功能 + 运维统计
  • gpt-4o+deepseek+R生成热力图表
  • 管理系统前端模板河北seo网络推广