【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统
Kafka从入门到实战:构建高吞吐量分布式消息系统
一、Kafka概述
Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。
Kafka核心特性
- 高吞吐量:即使是非常普通的硬件,Kafka也能支持每秒数百万条消息
- 可扩展性:集群可以无缝扩展,无需停机
- 持久性:消息持久化到磁盘,并支持数据备份防止数据丢失
- 分布式:天然支持分布式部署,具有容错能力
- 实时性:消息产生后立即对消费者可见
二、Kafka核心概念
1. 基本组件
- Producer:消息生产者,向Kafka集群发送消息
- Consumer:消息消费者,从Kafka集群读取消息
- Broker:Kafka服务器节点,组成Kafka集群
- Topic:消息类别,生产者发送消息到特定Topic,消费者订阅特定Topic
- Partition:Topic物理上的分组,一个Topic可以分为多个Partition
- Replica:Partition的副本,保证数据可靠性
- Consumer Group:一组Consumer实例共同消费一个Topic
2. 消息存储机制
Kafka采用顺序写入磁盘的方式存储消息,这种设计使得Kafka即使使用普通磁盘也能实现很高的吞吐量。每个Partition是一个有序的、不可变的消息序列,新消息被追加到Partition末尾。
3. 消息传递语义
- 至少一次(At least once):消息不会丢失,但可能被重复消费
- 至多一次(At most once):消息可能丢失,但不会被重复消费
- 精确一次(Exactly once):消息恰好被消费一次
三、Kafka环境搭建
1. 单机版安装
# 下载Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0# 启动Zookeeper(新版本Kafka已内置,无需单独安装)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动Kafka
bin/kafka-server-start.sh config/server.properties
2. 集群部署
修改config/server.properties
文件:
# 每个broker必须有唯一的id
broker.id=1# 监听地址
listeners=PLAINTEXT://:9092# 日志目录
log.dirs=/tmp/kafka-logs# Zookeeper连接地址
zookeeper.connect=localhost:2181# 副本数量
default.replication.factor=3# 分区数量
num.partitions=3
在不同节点上启动多个Broker实例即组成集群。
四、Kafka基础操作
1. Topic管理
# 创建Topic
bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \
--bootstrap-server localhost:9092# 删除Topic
bin/kafka-topics.sh --delete --topic test-topic \
--bootstrap-server localhost:9092
2. 生产消费消息
# 启动生产者
bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:9092# 启动消费者
bin/kafka-console-consumer.sh --topic test-topic \
--bootstrap-server localhost:9092 --from-beginning
五、Java客户端开发
1. 添加依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
2. 生产者示例
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 可选配置props.put("acks", "all"); // 确保所有副本都收到消息props.put("retries", 3); // 发送失败重试次数props.put("linger.ms", 1); // 发送延迟Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);producer.