当前位置: 首页 > news >正文

202534 | KafKa简介+应用场景+集群搭建+快速入门

Apache Kafka 简介


一、什么是 Kafka?

Apache Kafka 是一个高吞吐量、分布式、可扩展的流处理平台,用于构建实时数据管道和流应用程序。它最初由 LinkedIn 开发,并于 2011 年开源,目前由 Apache 软件基金会进行维护。

Kafka 具备以下核心特性:

  • 发布-订阅消息系统:支持生产者向主题(Topic)发送消息,消费者从主题中读取消息。
  • 高吞吐量与低延迟:可处理百万级每秒消息,延迟低于几毫秒。
  • 持久化存储:消息以日志形式存储在磁盘上,可设定保留时间。
  • 可水平扩展:通过分区(Partition)机制轻松扩展读写能力。
  • 高容错性:副本机制保障在节点故障时依旧能够正常运行。

Kafka 不仅是一个消息队列,更是一个用于流数据处理的统一平台


二、Kafka 的应用场景

Kafka 在大数据和分布式系统领域具有广泛应用,主要包括:

1. 日志收集与传输

Kafka 可统一收集来自不同服务或服务器的日志,作为日志系统的核心组件,将数据传输至后端处理系统(如 Hadoop、Elasticsearch 等)。

2. 实时数据分析

结合 Apache Flink、Spark Streaming 等流处理框架,Kafka 可用于构建实时分析平台,实现实时用户行为分析、实时监控等。

3. 事件驱动架构(EDA)

Kafka 作为微服务架构中的事件总线,使服务之间通过事件解耦,从而提高系统灵活性与可维护性。

4. 数据管道(Data Pipeline)

Kafka 能将数据从数据库、日志系统等源系统传输到数据仓库或数据湖,是构建高效可靠数据管道的核心工具。

5. 替代传统消息队列

在对吞吐量、可扩展性有更高要求的系统中,Kafka 可替代传统消息中间件(如 RabbitMQ、ActiveMQ)作为消息传递通道。


三、Kafka 的诞生背景

Kafka 的诞生源于 LinkedIn 内部对于日志处理和数据传输系统的性能瓶颈

  • LinkedIn 的业务快速增长,系统需要处理海量的用户行为数据与日志。
  • 传统的消息队列系统难以满足高吞吐量与高可用性的要求。
  • 工程团队设计了一种新的架构,将“分布式日志”作为核心思想,构建出一个同时支持日志收集、传输与处理的统一平台。

Kafka 在设计上融合了以下理念:

  • 以持久化日志为核心数据结构:每条消息即为一条日志记录,可重复读取。
  • 分布式架构支持高可用性与高扩展性:通过集群部署和分区副本机制实现。
  • 支持批处理和流处理双模式:既可用于数据采集与离线分析,也适合实时流处理。

这一创新架构为 Kafka 后来的广泛应用打下了坚实基础,也推动了现代数据架构的演进。

好的!以下是 Kafka Java 快速入门指南,适合初学者快速了解如何在 Java 程序中使用 Kafka 实现消息的生产与消费。


明白了!以下是使用 Mermaid 格式图形 重新整理的 Kafka 集群搭建指南,清晰展示了 Kafka + ZooKeeper 的集群结构和搭建步骤。


Kafka 集群搭建指南(ZooKeeper 模式)


一、Kafka 集群架构图(Mermaid 格式)

Kafka集群
ZooKeeper集群
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092
ZooKeeper 节点1
192.168.1.101:2181
ZooKeeper 节点2
192.168.1.102:2181
ZooKeeper 节点3
192.168.1.103:2181
Kafka 客户端

Topic 分区布局
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Partition 0
Leader: Kafka-1
Replica: Kafka-2, Kafka-3
Partition 1
Leader: Kafka-2
Replica: Kafka-1, Kafka-3
Partition 2
Leader: Kafka-3
Replica: Kafka-1, Kafka-2
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092

二、准备工作

1. 系统要求

  • Linux/CentOS/Ubuntu(或容器)
  • Java 8+(推荐 Java 11)
  • 至少 3 台机器或虚拟节点

2. 下载 Kafka 安装包(每台机器)

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

三、配置 ZooKeeper 集群

1. 修改 config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

2. 设置每个节点的 myid

echo 1 > /tmp/zookeeper/myid  # 节点1
echo 2 > /tmp/zookeeper/myid  # 节点2
echo 3 > /tmp/zookeeper/myid  # 节点3

3. 启动 ZooKeeper(每台执行)

bin/zookeeper-server-start.sh config/zookeeper.properties

四、配置 Kafka Broker

每台机器修改 config/server.properties,示例:

broker.id=1                           # 每个节点唯一(如 1、2、3)
listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

启动 Kafka:

bin/kafka-server-start.sh config/server.properties

五、验证集群

1. 创建主题

bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 --partitions 3 \
--topic test-topic

2. 查看主题分布

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 192.168.1.101:9092

六、发送与消费消息

生产者:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.1.101:9092

消费者:

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.1.102:9092 --from-beginning

七、常见问题

问题原因
Kafka 启动失败broker.id 重复或端口冲突
消息无法消费ZooKeeper 未正常连接,主题未正确创建
节点日志混乱或冲突log.dirs 配置重复,broker.id 没有区分
ZooKeeper 单点故障节点不足,推荐部署奇数个节点(3/5)

Kafka Java 快速入门指南


一、准备工作

1. 添加 Maven 依赖

在你的项目的 pom.xml 中添加以下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>
</dependencies>

二、Kafka Producer 示例(生产者)

1. 编写 KafkaProducerDemo.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 5; i++) {String message = "Hello Kafka " + i;producer.send(new ProducerRecord<>("test-topic", message));System.out.println("Sent: " + message);}producer.close();}
}

三、Kafka Consumer 示例(消费者)

1. 编写 KafkaConsumerDemo.java

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group"); // 消费组props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从头开始消费// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, offset=%d%n",record.key(), record.value(), record.offset());}}}
}

四、运行顺序建议

  1. 启动 Kafka 服务(本地或远程)
  2. 先运行消费者 KafkaConsumerDemo(等待监听)
  3. 再运行生产者 KafkaProducerDemo(发送消息)

五、调试小技巧

  • 确保 Kafka 服务正常运行,端口默认为 9092
  • 主题 test-topic 必须提前创建,或在 Kafka 开启 auto.create.topics.enable=true 的情况下自动创建。
  • 消费者默认是“只消费一次”,再次运行需更改 group.id 或开启重复读取逻辑。

相关文章:

  • kafka的安装及简单使用
  • [sklearn机器学习概述]机器学习-part3
  • 运算符与表达式 -《Go语言实战指南》
  • Scala与Go的异同教程
  • 【计算机视觉】OpenCV项目实战:基于OpenCV的图像分割技术深度解析与实践指南
  • 5.1 神经网络: 层和块
  • 电子电器架构 --- 车载以太网拓扑
  • k8s删除pv和pvc后,vg存储没释放分析
  • word换行符和段落标记
  • 2024年AI发展趋势全面解析:从多模态到AGI的突破
  • Python 从 SQLite 数据库中批量提取图像数据
  • 深拷贝与浅拷贝:理解 Python 中的对象复制机制
  • 数据格式(Data Format)设计
  • python3环境安装
  • redis八股--1
  • Redis 主从同步与对象模型(四)
  • JavaScript中对象和数组的常用方法
  • rust-candle学习笔记13-实现多头注意力
  • 嵌入式STM32学习——继电器
  • 大模型微调算法原理:从通用到专用的桥梁
  • 让“五颜六色”面孔讲述上海故事,2025年上海城市推荐官开启选拔
  • 印方称所有敌对行动均得到反击和回应,不会升级冲突
  • 绿城约13.93亿元竞得西安浐灞国际港港务片区地块,区内土地楼面单价首次冲破万元
  • 中国驻俄大使张汉晖人民日报撰文:共襄和平伟业,续谱友谊新篇
  • 短剧剧组在贵州拍戏突遇极端天气,演员背部、手臂被冰雹砸伤
  • 商务部:自5月7日起对原产于印度的进口氯氰菊酯征收反倾销税