Kafka入门及实战应用指南
1、Kafka概述
Apache Kafka是由LinkedIn公司于2010年开发的一款分布式消息系统,旨在解决当时传统消息队列(如ActiveMQ、RabbitMQ)在高吞吐量和实时性场景下的性能瓶颈。随着LinkedIn内部对实时日志处理、用户行为追踪等需求的激增,Kafka逐渐演化为一个支持水平扩展、持久化存储的流数据平台。2011年,Kafka成为Apache基金会顶级开源项目,并在全球范围内被广泛应用于大数据、实时计算和微服务架构领域。
Kafka的设计哲学源于发布-订阅模型,但其创新性地引入了分布式存储和分区化处理机制,使得系统能够高效处理每秒百万级的消息吞吐。这一特性使其迅速成为现代数据管道(Data Pipeline)和流式处理(Stream Processing)的核心组件。
Kafka是一个开源的高吞吐量的分布式消息中间件,对比于其他
1、缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
2、解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
3、冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
4、健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
5、异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
2、核心组件
组件 | 说明 |
Producer | 消息生产者,向Kafka发送消息 |
Consumer | 消息消费者,从Kafka读取消息 |
Broker | Kafka服务器节点,组成Kafka集群 |
Topic | 消息类别/主题,生产者发送到特定Topic,消费者订阅特定Topic |
Partition | Topic的分区,实现并行处理和水平扩展 |
Offset | 消息在分区中的唯一标识(位移) |
ZooKeeper | 管理Kafka集群元数据(新版本已逐步移除ZooKeeper依赖) |
3、Kafka的特点与优势
1. 高吞吐量与低延迟
Kafka通过批处理、顺序磁盘I/O和零拷贝技术(Zero-Copy)优化数据传输效率。生产者(Producer)将消息批量发送至Broker,消费者(Consumer)按顺序拉取数据,避免了传统消息系统的频繁网络交互。实测中,单台Broker可轻松支持每秒数十万条消息的读写。
2. 水平扩展与容错性
Kafka集群由多个Broker(服务器节点)组成,支持动态扩容。每个主题(Topic)被划分为多个分区(Partition),分区可分布在不同Broker上,通过多副本(Replica)机制实现数据冗余。若某Broker宕机,其他副本会自动接管服务,确保系统的高可用性。
3. 持久化存储与回溯消费
消息在Kafka中默认保留7天(可配置为永久存储),消费者可随时重置偏移量(Offset)以重新消费历史数据。这一特性在数据重放、故障恢复等场景中至关重要。
4. 生态兼容性
Kafka与主流大数据工具(如Spark、Flink、Hadoop)深度集成,并提供了Connect API和Streams API,支持构建端到端的流处理管道。
4、Kafka 使用场景
- 实时流处理:用户行为追踪、实时推荐
- 日志收集:集中式日志系统
- 事件源:微服务间的事件驱动架构
- 消息队列:系统解耦、削峰填谷
- Metrics收集:监控数据聚合
5、什么是Zookeeper?
Zookeeper是一个高性能、高可靠的分布式协调服务,最初由雅虎开发,是Google Chubby的开源实现。它被广泛应用于分布式系统中,用于解决分布式应用中的协调问题,如配置管理、服务注册与发现、分布式锁等。Zookeeper的设计目标是封装复杂且容易出错的关键服务,为分布式应用提供简单易用的接口。
6、Zookeeper的应用场景
Zookeeper在分布式系统中扮演着重要的角色,其典型应用场景包括:
- 配置管理:Zookeeper可以作为分布式系统的配置中心,集中管理配置信息,确保所有节点能够获取到最新的配置。
- 服务注册与发现:分布式系统中的服务可以通过Zookeeper进行注册,客户端可以通过查询Zookeeper来发现所需服务。
- 分布式锁:Zookeeper提供了一种实现分布式锁的机制,确保多个节点对共享资源的访问是互斥的。
- 集群管理:Zookeeper能够监控集群中节点的状态,及时发现并处理节点故障。
- 消息队列:Zookeeper可以用于实现分布式消息队列中的协调功能。
7、Zookeeper核心特性
Zookeeper具有以下关键特性:
- 顺序一致性:客户端的更新操作按照其发送的顺序被应用到Zookeeper上,确保了操作的顺序性。
- 原子性:所有对Zookeeper的操作都是原子的,要么全部成功,要么全部失败。
- 单一系统映像:无论Zookeeper集群中有多少节点,客户端看到的都是一个单一的、一致的视图。
- 可靠性:Zookeeper通过副本机制和选举算法确保系统的高可用性。
- 实时性:Zookeeper能够实时监控节点的状态变化,并及时通知客户端。
8、ZooKeeper 的作用
ZooKeeper 是一个分布式协调服务,为 Kafka 提供以下关键功能:
功能 | 具体说明 |
集群管理 | 记录 Kafka Broker 的节点状态(存活/下线),维护 Broker 列表。 |
Controller 选举 | Kafka 集群需要一个主控制器(Controller)处理分区和副本管理,ZooKeeper 负责选举。 |
Topic 元数据存储 | 存储 Topic 的分区信息、副本分配、Leader 选举结果等元数据。 |
消费者组管理 | 记录消费者组的 offset(Kafka 2.8+ 已支持脱离 ZooKeeper,默认仍依赖)。 |
分布式锁 | 确保多个 Broker 或客户端操作时的数据一致性(如分区迁移)。 |
9、Kafka的安装
1、安装JAVA环境
yum -y install java-1.8.0-openjdk
2、安装ZooKeeper
mkdir -p /opt/kafka
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
cd apache-zookeeper-3.8.1-bin/conf
cp zoo_sample.cfg zoo.cfg
../bin/zkServer.sh start
3、安装Kafka
# 解压
tar xvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
ls
# 启动
bin/kafka-server-start.sh config/server.properties#后台启动
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &查看日志
tail -f kafka.log
单机启动,并且ZooKeeper也刚好在本机,所以我们默认不需要修改任何配置就可以直接启动。如果不在一起则修改配置文件:config/server.properties文件。
4、IDEA连接Kafka
cd /opt/kafka/kafka_2.13-2.8.2/configvim server.properties# 修改内容
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.142.131:9092
安装Kafka插件进行连接
10、SpringBoot整合Kafka
1、导入jar包
<!--kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>
2、编写配置
# Kafka 相关配置kafka:# Kafka 服务器地址bootstrap-servers: 192.168.142.131:9092# 生产者配置producer:# 序列化器,用于将键转换为字节key-serializer: org.apache.kafka.common.serialization.StringSerializer# 序列化器,用于将值转换为字节value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 消费者组ID,用于区分不同的消费者组group-id: my-application-group# 自动偏移量重置策略,当没有初始偏移量时从最早的偏移量开始消费auto-offset-reset: earliest# 禁用自动提交功能enable-auto-commit: false# 反序列化器,用于将字节转换为键key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 反序列化器,用于将字节转换为值value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 配置监听器的确认模式为手动立即确认listener:ack-mode: manual_immediate# 主题配置topic:default: default-topicmanual: manual-commit-topic
3、Kafka消息生产者
package com.lw.mqdemo.mq.kafka;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** Kafka消息生产者服务类*/
@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}/*** 发送消息到指定主题和分区* @param topic 主题名称* @param partition 分区号* @param key 消息键* @param message 消息内容*/public void sendMessage(String topic, Integer partition, String key, String message) {kafkaTemplate.send(topic, partition, key, message);}
}
4、Kafka消息消费者
package com.lw.mqdemo.mq.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;/*** Kafka 消费者服务*/
@Service
public class KafkaConsumerService {/*** 监听指定主题的消息(自动提交)* @param message 消息内容*/@KafkaListener(topics = "${spring.kafka.topic.default}", groupId = "${spring.kafka.consumer.group-id}")public void consumeAutoCommit(String message) {System.out.println("接收到自动提交消息: " + message);// 业务处理逻辑}/*** 监听指定主题的消息(手动提交)* 偏移量(Offset) 是 Kafka 为分区(Partition)中的每条消息分配的唯一序号(从 0 开始递增),用于标识消息在分区中的位置。* @param record 消息记录(包含元数据)* @param ack 确认对象*/@KafkaListener(topics = "${spring.kafka.topic.manual}", groupId = "${spring.kafka.consumer.group-id}")public void consumeManualCommit(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.println("接收到手动提交消息: key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());// 业务处理逻辑// 手动提交偏移量ack.acknowledge();} catch (Exception e) {// 处理异常,可以选择不提交偏移量以便重试System.err.println("消息处理失败: " + e.getMessage());}}
}
5、测试类
package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.kafka.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** Kafka 控制器*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}/*** 发送消息到指定的Kafka主题*/@PostMapping("/send")public String sendMessage(@RequestParam("topic") String topic,@RequestParam("message") String message) {producerService.sendMessage(topic, message);return "消息已发送到主题: " + topic;}
}