Kafka 从入门到精通完整指南
📑 目录
- 第一章:Kafka 基础概念
- 第二章:环境搭建
- 第三章:生产者开发
- 第四章:消费者开发
- 第五章:高级特性
- 第六章:性能优化
- 第七章:监控与运维
- 第八章:Spring Boot 集成
- 第九章:面试要点
- 第十章:常见问题与解决方案
第一章:Kafka 基础概念
1.1 什么是 Kafka
Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。
核心特点:
- 高吞吐量:每秒可处理数百万条消息
- 持久化:消息持久化到磁盘
- 分布式:天然支持集群部署
- 容错性:通过副本机制保证高可用
1.2 核心组件
Broker(代理服务器)
- Kafka 集群中的服务器节点
- 负责存储和转发消息
- 一个集群通常由多个 Broker 组成
Topic(主题)
- 消息的逻辑分类
- 类似数据库中的表
- 生产者发送消息到特定 Topic
Partition(分区)
- Topic 的物理分组
- 每个 Partition 是一个有序的消息队列
- 提供并行处理能力和数据冗余
Producer(生产者)
- 向 Kafka Topic 发送消息的客户端应用
Consumer(消费者)
- 从 Kafka Topic 订阅并消费消息的客户端应用
Consumer Group(消费者组)
- 多个消费者的逻辑组合
- 同一组内的消费者共同消费 Topic 的所有分区
- 实现负载均衡和故障转移
Offset(偏移量)
- 消息在分区中的唯一标识(递增的整数)
- 消费者通过 Offset 追踪消费进度
1.3 架构图
┌─────────────┐
│ Producer │
└──────┬──────┘│ 发送消息▼
┌─────────────────────────────────┐
│ Kafka Cluster │
│ ┌─────────┐ ┌─────────┐ │
│ │ Broker1 │ │ Broker2 │ ... │
│ └─────────┘ └─────────┘ │
│ │
│ Topic: my-topic │
│ ├─ Partition 0 (Leader) │
│ ├─ Partition 1 (Leader) │
│ └─ Partition 2 (Leader) │
└─────────────────────────────────┘│ 拉取消息▼
┌──────────────────┐
│ Consumer Group │
│ ┌──────────────┐ │
│ │ Consumer 1 │ │
│ │ Consumer 2 │ │
│ └──────────────┘ │
└──────────────────┘
第二章:环境搭建
2.1 Maven 依赖
<dependencies><!-- Kafka 客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency><!-- JSON 序列化(可选) --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.7</version></dependency>
</dependencies>
2.2 Docker Compose 快速启动
创建 docker-compose.yml 文件:
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
启动命令:
docker-compose up -d
2.3 验证安装
# 进入 Kafka 容器
docker exec -it kafka bash# 创建测试 Topic
kafka-topics --create --topic test-topic \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1# 查看 Topic 列表
kafka-topics --list --bootstrap-server localhost:9092
第三章:生产者开发
3.1 基础生产者示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class BasicProducer {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2. 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {// 3. 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);// 异步发送并处理回调producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("消息发送成功 - Topic: %s, Partition: %d, Offset: %d%n",metadata.topic(), metadata.partition(), metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}});}} finally {// 4. 关闭生产者producer.close();}}
}
3.2 同步发送 vs 异步发送
同步发送
public class SyncProducer {public void sendSync(KafkaProducer<String, String> producer) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");try {// 同步发送,等待结果RecordMetadata metadata = producer.send(record).get();System.out.println("发送成功,Offset: " + metadata.offset());} catch (Exception e) {System.err.println("发送失败: " + e.getMessage());}}
}
异步发送
public class AsyncProducer {public void sendAsync(KafkaProducer<String, String> producer) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");// 异步发送,立即返回producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("发送成功");} else {System.err.println("发送失败: " + exception.getMessage());}}});}
}
3.3 生产者重要配置
public class ProducerConfiguration {public static Properties getReliableConfig() {Properties props = new Properties();// 基础配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 可靠性配置props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性// 性能配置props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小 16KBprops.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟 10msprops.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区 32MBreturn props;}
}
配置说明:
| 配置项 | 说明 | 推荐值 |
|---|---|---|
| acks | 确认机制:0-不等待,1-等待Leader,all-等待所有ISR | all |
| retries | 重试次数 | 3 或 Integer.MAX_VALUE |
| batch.size | 批量大小(字节) | 16384 (16KB) |
| linger.ms | 等待时间(毫秒) | 10-100 |
| compression.type | 压缩类型:none/gzip/snappy/lz4/zstd | lz4 或 snappy |
3.4 自定义分区器
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionCountForTopic(topic);if (key == null) {return 0; // key 为空发送到分区 0}// 根据 key 的哈希值选择分区return Math.abs(key.hashCode()) % numPartitions;}@Overridepublic void close() {// 清理资源}@Overridepublic void configure(Map<String, ?> configs) {// 初始化配置}
}// 使用自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
3.5 事务生产者
public class TransactionalProducer {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务配置props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务producer.initTransactions();try {// 开始事务producer.beginTransaction();// 发送消息producer.send(new ProducerRecord<>("topic1", "key1", "value1"));producer.send(new ProducerRecord<>("topic2", "key2", "value2"));// 提交事务producer.commitTransaction();System.out.println("事务提交成功");} catch (Exception e) {// 回滚事务producer.abortTransaction();System.err.println("事务回滚: " + e.getMessage());} finally {producer.close();}}
}
⚠️ 生产者注意事项
- 必须关闭生产者:使用 try-finally 或 try-with-resources 确保资源释放
- 异步发送需要回调:异步发送时必须处理回调,否则无法感知失败
- 幂等性配置:开启幂等性可防止重复消息,但会略微降低性能
- 批量大小:batch.size 过小影响吞吐量,过大增加延迟
- 序列化器选择:根据数据类型选择合适的序列化器
第四章:消费者开发
4.1 基础消费者示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;public class BasicConsumer {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 2. 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));try {// 4. 循环拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息 - Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}} finally {// 5. 关闭消费者consumer.close();}}
}
4.2 手动提交 Offset
同步提交
public class ManualCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-commit-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 处理消息processMessage(record);}// 同步提交 offsetconsumer.commitSync();}} catch (CommitFailedException e) {System.err.println("提交失败: " + e.getMessage());} finally {consumer.close();}}private static void processMessage(ConsumerRecord<String, String> record) {System.out.println("处理消息: " + record.value());}
}
异步提交
public class AsyncCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "async-commit-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {processMessage(record);}// 异步提交consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("提交失败: " + exception.getMessage());} else {System.out.println("提交成功: " + offsets);}});}} finally {consumer.close();}}private static void processMessage(ConsumerRecord<String, String> record) {System.out.println("处理消息: " + record.value());}
}
4.3 指定分区和 Offset 消费
public class PartitionConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "partition-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 手动指定分区TopicPartition partition0 = new TopicPartition("test-topic", 0);TopicPartition partition1 = new TopicPartition("test-topic", 1);consumer.assign(Arrays.asList(partition0, partition1));// 从指定 offset 开始消费consumer.seek(partition0, 100);// 从最早开始消费// consumer.seekToBeginning(Collections.singletonList(partition0));// 从最新开始消费// consumer.seekToEnd(Collections.singletonList(partition0));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("消费消息: " + record.value());}}} finally {consumer.close();}}
}
4.4 消费者重要配置
public class ConsumerConfiguration {public static Properties getOptimizedConfig() {Properties props = new Properties();// 基础配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// Offset 配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest/latest/none// 性能配置props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取 1KBprops.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最多等待 500msprops.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取 500 条// 会话管理props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 会话超时 30sprops.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 心跳间隔 3sprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 最大处理时间 5minreturn props;}
}
配置说明:
| 配置项 | 说明 | 推荐值 |
|---|---|---|
| group.id | 消费者组 ID | 必填 |
| enable.auto.commit | 是否自动提交 | false(手动提交) |
| auto.offset.reset | Offset 不存在时的策略 | earliest |
| max.poll.records | 每次拉取的最大记录数 | 500 |
| session.timeout.ms | 会话超时时间 | 30000 (30s) |
| heartbeat.interval.ms | 心跳间隔 | 3000 (3s) |
4.5 多线程消费
import java.util.concurrent.*;public class MultiThreadConsumer {private static final int THREAD_COUNT = 3;public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);// 创建多个消费者线程for (int i = 0; i < THREAD_COUNT; i++) {executorService.submit(new ConsumerWorker(i));}// 优雅关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("开始关闭...");executorService.shutdown();try {executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}));}static class ConsumerWorker implements Runnable {private final int workerId;public ConsumerWorker(int workerId) {this.workerId = workerId;}@Overridepublic void run() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-thread-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (!Thread.currentThread().isInterrupted()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("Worker %d 消费: %s%n", workerId, record.value());}}} finally {consumer.close();}}}
}
⚠️ 消费者注意事项
- Consumer 非线程安全:不能在多线程中共享同一个 Consumer 实例
- 心跳超时:确保 poll 方法调用频率足够高,避免心跳超时被踢出组
- 消息丢失:自动提交可能导致消息丢失,建议手动提交
- 重复消费:手动提交失败会导致重复消费,需要业务幂等性
- Rebalance:频繁 Rebalance 会影响性能,合理配置 session.timeout.ms
第五章:高级特性
5.1 自定义序列化器(JSON)
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.*;// JSON 序列化器
public class JsonSerializer<T> implements Serializer<T> {private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}try {return objectMapper.writeValueAsBytes(data);} catch (Exception e) {throw new RuntimeException("JSON 序列化失败", e);}}@Overridepublic void close() {// 清理资源}
}// JSON 反序列化器
public class JsonDeserializer<T> implements Deserializer<T> {private final ObjectMapper objectMapper = new ObjectMapper();private Class<T> type;public JsonDeserializer(Class<T> type) {this.type = type;}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 可以从 configs 中读取类型信息}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}try {return objectMapper.readValue(data, type);} catch (Exception e) {throw new RuntimeException("JSON 反序列化失败", e);}}@Overridepublic void close() {// 清理资源}
}// 使用示例
public class User {private String id;private String name;private int age;// getters and setters
}// 生产者使用
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());KafkaProducer<String, User> producer = new KafkaProducer<>(props);
User user = new User("1", "张三", 25);
producer.send(new ProducerRecord<>("user-topic", user.getId(), user));
5.2 消息拦截器
import org.apache.kafka.clients.producer.*;public class MessageInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 发送前拦截,可以修改消息String timestamp = String.valueOf(System.currentTimeMillis());String newValue = record.value() + " [时间戳: " + timestamp + "]";return new ProducerRecord<>(record.topic(), record.partition(), record.key(), newValue, record.headers());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 收到响应时触发if (exception == null) {System.out.println("消息发送成功 - Offset: " + metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}}@Overridepublic void close() {// 清理资源}@Overridepublic void configure(Map<String, ?> configs) {// 初始化配置}
}// 配置拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MessageInterceptor.class.getName());
5.3 Exactly-Once 语义
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;public class ExactlyOnceExample {public static void main(String[] args) {// 生产者配置Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-producer");producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 消费者配置Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once-group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 重要!KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);producer.initTransactions();consumer.subscribe(Collections.singletonList("input-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if (!records.isEmpty()) {producer.beginTransaction();try {// 处理消息并发送到输出 topicfor (ConsumerRecord<String, String> record : records) {String processedValue = processMessage(record.value());producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));}// 将消费者的 offset 也作为事务的一部分提交Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());producer.commitTransaction();System.out.println("事务提交成功");} catch (Exception e) {producer.abortTransaction();System.err.println("事务回滚: " + e.getMessage());}}}} finally {producer.close();consumer.close();}}private static String processMessage(String value) {// 业务处理逻辑return value.toUpperCase();}private static Map<TopicPartition, OffsetAndMetadata> getOffsets(ConsumerRecords<String, String> records) {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);long offset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition, new OffsetAndMetadata(offset + 1));}return offsets;}
}
5.4 Kafka Streams 流处理
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;public class KafkaStreamsExample {public static void main(String[] args) {// 配置Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();// 读取输入流KStream<String, String> source = builder.stream("input-topic");// 处理流数据KStream<String, String> processed = source.filter((key, value) -> value != null && value.length() > 5) // 过滤.mapValues(value -> value.toUpperCase()) // 转换.peek((key, value) -> System.out.println("处理: " + value)); // 查看// 输出到目标 topicprocessed.to("output-topic");// 构建并启动流处理应用KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();// 优雅关闭Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}
5.5 聚合与窗口操作
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;public class AggregationExample {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StreamsBuilder builder = new StreamsBuilder();KStream<String, String> stream = builder.stream("input-topic");// 1. 按键分组并计数KTable<String, Long> counts = stream.groupByKey().count();// 2. 时间窗口聚合(5 分钟窗口)TimeWindowedKStream<String, String> windowedStream = stream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)));KTable<Windowed<String>, Long> windowedCounts = windowedStream.count();// 3. 会话窗口(间隔超过 10 分钟视为新会话)SessionWindowedKStream<String, String> sessionStream = stream.groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)));KTable<Windowed<String>, Long> sessionCounts = sessionStream.count();// 输出结果counts.toStream().to("count-output");KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}
第六章:性能优化
6.1 生产者性能优化
public class OptimizedProducer {public static Properties getHighThroughputConfig() {Properties props = new Properties();// 基础配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 吞吐量优化props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB 批量大小props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 延迟 20ms 等待批量props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 使用 LZ4 压缩props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB 缓冲区// 可靠性优化(根据需求调整)props.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader 确认即可props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 超时配置props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);return props;}
}
6.2 消费者性能优化
public class OptimizedConsumer {public static Properties getHighThroughputConfig() {Properties props = new Properties();// 基础配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 性能优化props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小 1KBprops.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最多等待 500msprops.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 每次拉取 1000 条// 提交策略props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交// 会话管理props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30sprops.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3sprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5minreturn props;}
}
6.3 批量处理优化
import java.util.*;public class BatchProcessingConsumer {private static final int BATCH_SIZE = 100;public static void main(String[] args) {Properties props = OptimizedConsumer.getHighThroughputConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));List<ConsumerRecord<String, String>> buffer = new ArrayList<>();try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {buffer.add(record);// 达到批量大小时处理if (buffer.size() >= BATCH_SIZE) {processBatch(buffer);consumer.commitSync();buffer.clear();}}// 处理剩余消息if (!buffer.isEmpty()) {processBatch(buffer);consumer.commitSync();buffer.clear();}}} finally {consumer.close();}}private static void processBatch(List<ConsumerRecord<String, String>> batch) {System.out.println("批量处理 " + batch.size() + " 条消息");// 批量写入数据库// batchInsertToDatabase(batch);// 或批量处理业务逻辑for (ConsumerRecord<String, String> record : batch) {// 处理单条消息}}
}
6.4 性能调优建议
生产者调优:
- 增大批量大小:
batch.size=32768(32KB) - 增加延迟时间:
linger.ms=20(20ms) - 启用压缩:
compression.type=lz4 - 增大缓冲区:
buffer.memory=67108864(64MB) - 降低 acks:
acks=1(根据可靠性需求)
消费者调优:
- 增加拉取量:
max.poll.records=1000 - 增加分区数:提高并行度
- 增加消费者数:不超过分区数
- 批量处理:积累一批消息后统一处理
- 异步处理:使用线程池处理消息
Broker 调优(server.properties):
# 增加网络和 IO 线程
num.network.threads=8
num.io.threads=8# 增加缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400# 日志段大小
log.segment.bytes=1073741824# 刷盘策略(性能优先)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
第七章:监控与运维
7.1 JMX 监控指标
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;public class MetricsMonitoring {public static void printProducerMetrics(KafkaProducer<String, String> producer) {Map<MetricName, ? extends Metric> metrics = producer.metrics();System.out.println("=== 生产者指标 ===");for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {MetricName name = entry.getKey();Metric metric = entry.getValue();// 重要指标if (name.name().equals("record-send-rate") ||name.name().equals("record-error-rate") ||name.name().equals("request-latency-avg") ||name.name().equals("batch-size-avg")) {System.out.printf("%s: %.2f%n", name.name(), metric.metricValue());}}}public static void printConsumerMetrics(KafkaConsumer<String, String> consumer) {Map<MetricName, ? extends Metric> metrics = consumer.metrics();System.out.println("=== 消费者指标 ===");for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {MetricName name = entry.getKey();Metric metric = entry.getValue();// 重要指标if (name.name().equals("records-consumed-rate") ||name.name().equals("fetch-latency-avg") ||name.name().equals("records-lag-max")) {System.out.printf("%s: %.2f%n", name.name(), metric.metricValue());}}}
}
关键监控指标:
生产者指标:
record-send-rate:消息发送速率(条/秒)record-error-rate:消息错误率request-latency-avg:平均请求延迟(ms)batch-size-avg:平均批量大小(字节)compression-rate-avg:压缩比率
消费者指标:
records-consumed-rate:消息消费速率(条/秒)fetch-latency-avg:拉取延迟(ms)records-lag-max:最大消费延迟(条数)commit-latency-avg:提交延迟(ms)
Broker 指标(JMX):
UnderReplicatedPartitions:未充分复制的分区数OfflinePartitionsCount:离线分区数ActiveControllerCount:活跃控制器数(应该为 1)BytesInPerSec:入站字节速率BytesOutPerSec:出站字节速率
7.2 管理操作
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaAdminOperations {private final AdminClient adminClient;public KafkaAdminOperations(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);}// 创建 Topicpublic void createTopic(String topicName, int partitions, short replicationFactor) throws ExecutionException, InterruptedException {NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));result.all().get();System.out.println("Topic 创建成功: " + topicName);}// 删除 Topicpublic void deleteTopic(String topicName) throws ExecutionException, InterruptedException {DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton(topicName));result.all().get();System.out.println("Topic 删除成功: " + topicName);}// 列出所有 Topicpublic void listTopics() throws ExecutionException, InterruptedException {ListTopicsResult result = adminClient.listTopics();Set<String> topics = result.names().get();System.out.println("所有 Topics: " + topics);}// 查看 Topic 详情public void describeTopics(String... topicNames) throws ExecutionException, InterruptedException {DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicNames));Map<String, TopicDescription> descriptions = result.all().get();for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {TopicDescription desc = entry.getValue();System.out.printf("Topic: %s, Partitions: %d%n", desc.name(), desc.partitions().size());for (TopicPartitionInfo partition : desc.partitions()) {System.out.printf(" Partition %d: Leader=%s, Replicas=%s%n",partition.partition(), partition.leader().id(),partition.replicas());}}}// 列出消费者组public void listConsumerGroups() throws ExecutionException, InterruptedException {ListConsumerGroupsResult result = adminClient.listConsumerGroups();Collection<ConsumerGroupListing> groups = result.all().get();System.out.println("=== 消费者组列表 ===");for (ConsumerGroupListing group : groups) {System.out.println("消费者组: " + group.groupId());}}// 查看消费者组详情public void describeConsumerGroup(String groupId) throws ExecutionException, InterruptedException {DescribeConsumerGroupsResult result = adminClient.describeConsumerGroups(Collections.singleton(groupId));ConsumerGroupDescription description = result.all().get().get(groupId);System.out.println("=== 消费者组详情 ===");System.out.println("消费者组: " + description.groupId());System.out.println("状态: " + description.state());System.out.println("成员数: " + description.members().size());}public void close() {adminClient.close();}public static void main(String[] args) throws Exception {KafkaAdminOperations admin = new KafkaAdminOperations("localhost:9092");// 创建 Topicadmin.createTopic("admin-test-topic", 3, (short) 1);// 列出所有 Topicsadmin.listTopics();// 查看 Topic 详情admin.describeTopics("admin-test-topic");// 列出消费者组admin.listConsumerGroups();admin.close();}
}
7.3 常用运维命令
# ========== Topic 管理 ==========# 创建 Topic
kafka-topics.sh --create --topic my-topic \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 2# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092# 查看 Topic 详情
kafka-topics.sh --describe --topic my-topic \--bootstrap-server localhost:9092# 增加分区数(只能增加不能减少)
kafka-topics.sh --alter --topic my-topic \--partitions 5 --bootstrap-server localhost:9092# 删除 Topic
kafka-topics.sh --delete --topic my-topic \--bootstrap-server localhost:9092# ========== 消费者组管理 ==========# 查看消费者组列表
kafka-consumer-groups.sh --list \--bootstrap-server localhost:9092# 查看消费者组详情
kafka-consumer-groups.sh --describe --group my-group \--bootstrap-server localhost:9092# 重置 Offset 到最早
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic \--to-earliest --execute \--bootstrap-server localhost:9092# 重置 Offset 到最新
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic \--to-latest --execute \--bootstrap-server localhost:9092# 重置 Offset 到指定位置
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic:0 \--to-offset 100 --execute \--bootstrap-server localhost:9092# ========== 生产和消费测试 ==========# 生产消息(命令行)
kafka-console-producer.sh --topic my-topic \--bootstrap-server localhost:9092# 消费消息(从最早开始)
kafka-console-consumer.sh --topic my-topic \--from-beginning --bootstrap-server localhost:9092# 消费消息(显示 key)
kafka-console-consumer.sh --topic my-topic \--property print.key=true \--property key.separator=: \--from-beginning --bootstrap-server localhost:9092# ========== 性能测试 ==========# 生产者性能测试
kafka-producer-perf-test.sh \--topic perf-test \--num-records 1000000 \--record-size 1024 \--throughput 100000 \--producer-props bootstrap.servers=localhost:9092# 消费者性能测试
kafka-consumer-perf-test.sh \--topic perf-test \--messages 1000000 \--bootstrap-server localhost:9092
第八章:Spring Boot 集成
8.1 Maven 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.0</version>
</dependency>
8.2 配置文件
spring:kafka:bootstrap-servers: localhost:9092# 生产者配置producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3batch-size: 16384linger-ms: 10buffer-memory: 33554432compression-type: lz4# 消费者配置consumer:group-id: my-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 500# 监听器配置listener:ack-mode: manual_immediate # 手动立即确认concurrency: 3 # 并发度
8.3 Spring Kafka 使用示例
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;import java.util.List;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// ========== 生产者 ==========// 简单发送public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}// 发送带 key 的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}// 异步发送带回调public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("发送成功: " + result.getRecordMetadata().offset()),ex -> System.err.println("发送失败: " + ex.getMessage()));}// ========== 消费者 ==========// 简单消费@KafkaListener(topics = "test-topic", groupId = "my-group")public void consume(String message) {System.out.println("收到消息: " + message);}// 手动确认@KafkaListener(topics = "test-topic", groupId = "manual-group")public void consumeWithAck(String message, Acknowledgment acknowledgment) {System.out.println("收到消息: " + message);// 处理消息processMessage(message);// 手动确认acknowledgment.acknowledge();}// 获取消息元数据@KafkaListener(topics = "test-topic", groupId = "meta-group")public void consumeWithMetadata(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.OFFSET) long offset) {System.out.printf("Topic: %s, Partition: %d, Offset: %d, Message: %s%n",topic, partition, offset, message);}// 批量消费@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatch(List<String> messages, Acknowledgment acknowledgment) {System.out.println("批量接收 " + messages.size() + " 条消息");for (String message : messages) {processMessage(message);}acknowledgment.acknowledge();}// 消费指定分区@KafkaListener(topicPartitions = @TopicPartition(topic = "partition-topic",partitions = {"0", "1"}),groupId = "partition-group")public void consumePartition(String message) {System.out.println("分区消息: " + message);}private void processMessage(String message) {// 业务逻辑处理System.out.println("处理消息: " + message);}
}
8.4 配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {// ========== 生产者配置 ==========@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.ACKS_CONFIG, "all");config.put(ProducerConfig.RETRIES_CONFIG, 3);config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);config.put(ProducerConfig.LINGER_MS_CONFIG, 10);config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");return new DefaultKafkaProducerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// ========== 消费者配置 ==========@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);return new DefaultKafkaConsumerFactory<>(config);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 并发度factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}
第九章:面试要点
9.1 基础概念
Q1: Kafka 的核心架构是什么?
答案: Kafka 主要由以下组件构成:
- Broker:Kafka 集群中的服务器节点,负责存储和转发消息
- Topic:消息的逻辑分类,类似数据库中的表
- Partition:Topic 的物理分组,每个 Partition 是一个有序的消息队列
- Producer:生产者,向 Kafka 发送消息
- Consumer:消费者,从 Kafka 消费消息
- Consumer Group:消费者组,实现负载均衡和故障转移
- ZooKeeper/KRaft:元数据管理和协调服务
Q2: Kafka 如何保证高吞吐量?
答案:
- 顺序写磁盘:避免随机 IO,性能接近内存
- 零拷贝(Zero-Copy):使用 sendfile() 系统调用,减少数据拷贝次数
- 批量发送:减少网络往返次数
- 消息压缩:减少网络传输和磁盘占用
- 分区并行:多个分区并行读写,充分利用多核 CPU
- Page Cache:利用操作系统的页缓存
Q3: Partition 的作用是什么?
答案:
- 负载均衡:数据分散到多个 Broker,避免单点瓶颈
- 并行处理:多个消费者可以并行消费不同分区
- 容错:通过副本机制实现高可用
- 有序性:单个分区内消息严格有序
9.2 可靠性保证
Q4: Kafka 如何保证消息不丢失?
答案:
生产者端:
- 设置
acks=all:等待所有 ISR 副本确认 - 启用幂等性:
enable.idempotence=true - 配置重试:
retries=Integer.MAX_VALUE - 同步发送或正确处理异步回调
Broker 端:
- 副本数 >= 2:
replication.factor>=2 - ISR 最小数量:
min.insync.replicas>=2 - 禁用 unclean 选举:
unclean.leader.election.enable=false
消费者端:
- 手动提交 offset:
enable.auto.commit=false - 先处理消息,再提交 offset
- 使用事务消费(Exactly-Once)
Q5: Kafka 的三种消息语义是什么?
答案:
| 语义 | 描述 | 实现方式 | 适用场景 |
|---|---|---|---|
| At Most Once | 最多一次,消息可能丢失,不会重复 | 先提交 offset,再处理消息 | 日志收集 |
| At Least Once | 至少一次,消息不会丢失,可能重复 | 先处理消息,再提交 offset | 最常用 |
| Exactly Once | 精确一次,消息不丢失、不重复 | 事务 API + 幂等性 | 金融交易 |
Q6: 如何实现 Exactly-Once?
答案:
// 1. 生产者启用事务和幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 2. 消费者设置隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");// 3. 在事务中处理消息
producer.beginTransaction();
// 处理并发送消息
producer.sendOffsetsToTransaction(offsets, groupMetadata);
producer.commitTransaction();
核心原理:
- 幂等性:通过 PID + Sequence Number 去重
- 事务:保证消息发送和 offset 提交的原子性
- 隔离级别:消费者只读取已提交的消息
9.3 性能优化
Q7: 如何提高生产者性能?
答案:
- 批量发送:增大
batch.size和linger.ms - 压缩:使用
compression.type=lz4 - 异步发送:使用回调处理结果
- 增大缓冲区:调大
buffer.memory - 减少 acks:使用
acks=1(根据需求) - 多线程:创建多个 Producer 实例
Q8: 如何提高消费者性能?
答案:
- 增加分区数:提高并行度
- 增加消费者数:但不超过分区数
- 批量处理:积累一批消息后统一处理
- 异步处理:使用线程池处理消息
- 调整拉取参数:
fetch.min.bytes、fetch.max.wait.ms - 手动提交:批量提交 offset
Q9: Kafka 的零拷贝原理?
答案:
传统方式(4 次拷贝): 磁盘 → 内核缓冲区 → 用户缓冲区 → Socket 缓冲区 → 网卡
零拷贝(2 次拷贝): 磁盘 → 内核缓冲区 → 网卡
- 使用
sendfile()系统调用 - 数据直接从文件传输到 Socket
- Java NIO 的
FileChannel.transferTo()实现
9.4 集群与运维
Q10: Kafka 的副本机制?
答案:
- Leader 副本:处理所有读写请求
- Follower 副本:只同步数据,不处理请求
- ISR(In-Sync Replicas):与 Leader 保持同步的副本集合
- OSR(Out-of-Sync Replicas):落后太多的副本
- Leader 故障时,从 ISR 中选举新 Leader
Q11: 什么是 Rebalance?如何避免?
答案:
Rebalance 触发条件:
- 消费者组成员变化(加入/退出)
- 订阅的 Topic 分区数变化
- 消费者心跳超时
影响:
- 所有消费者停止工作
- 重新分配分区
- 可能导致重复消费
避免方法:
- 增加
session.timeout.ms和max.poll.interval.ms - 减少单次 poll 的数据量:
max.poll.records - 提高消费速度,避免处理超时
- 合理设置心跳间隔:
heartbeat.interval.ms - 使用静态成员(Kafka 2.3+)
Q12: Kafka 如何保证消息有序?
答案:
- 单分区有序:同一分区内消息严格有序
- 全局有序:只能使用单分区 Topic
- Key 相对有序:相同 Key 的消息发送到同一分区
实现方式:
// 指定 key,相同 key 进入同一分区
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "userID", "message");
Q13: Controller 的作用?
答案:
职责:
- 管理分区状态
- 进行 Leader 选举
- 处理 Broker 上下线
- 分区副本分配
选举:
- 第一个在 ZooKeeper 创建临时节点的 Broker
- Controller 故障时自动重新选举
9.5 实战问题
Q14: 消费者消费很慢怎么办?
答案:
- 增加消费者数量:前提是分区数足够
- 增加分区数:提高并行度
- 异步处理:消费者只负责接收,业务处理交给线程池
- 批量处理:减少数据库等外部调用次数
- 优化业务逻辑:减少单条消息处理时间
- 扩容:增加 Broker 数量
Q15: 如何处理消息积压?
答案:
- 临时扩容:快速增加消费者
- 创建临时 Topic:将消息转发到多个临时 Topic 并行消费
- 降级处理:跳过不重要的消息
- 限流:生产端限流,减轻压力
- 扩容分区:增加分区数后重新分配
Q16: Kafka 为什么快?
答案:
- 顺序 IO:磁盘顺序读写速度快
- Page Cache:利用操作系统缓存
- 零拷贝:减少数据拷贝次数
- 批量压缩:减少网络 IO
- 分区并行:充分利用多核 CPU
- 高效序列化:紧凑的二进制格式
Q17: Kafka 与 RabbitMQ 的区别?
答案:
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 架构 | 分布式日志 | 消息队列 |
| 吞吐量 | 极高(百万级/秒) | 较高(万级/秒) |
| 消息顺序 | 分区内有序 | 队列有序 |
| 持久化 | 强制持久化 | 可选 |
| 消息堆积 | 支持大量堆积 | 堆积影响性能 |
| 消费模式 | Pull | Push/Pull |
| 适用场景 | 大数据、日志收集、流处理 | 任务队列、RPC |
Q18: 如何监控 Kafka?
答案:
关键指标:
- Broker:CPU、内存、磁盘、网络 IO
- Topic:消息速率、字节速率
- Consumer:消费延迟(Lag)、消费速率
- Producer:发送速率、错误率
- 系统:Under-replicated partitions、Offline partitions
监控工具:
- Kafka Manager(CMAK)
- Prometheus + Grafana
- JMX Metrics
- Burrow(Consumer Lag 监控)
Q19: Kafka 事务的实现原理?
答案:
-
事务协调器(Transaction Coordinator)
- 管理事务状态
- 存储在内部 Topic
__transaction_state
-
两阶段提交
- 准备阶段:写入所有分区
- 提交阶段:写入事务标记
-
事务 ID
- 保证生产者唯一性
- 实现跨会话幂等性
-
隔离级别
read_uncommitted:读取所有消息read_committed:只读取已提交消息
Q20: Kafka 如何实现幂等性?
答案:
- Producer ID(PID):唯一标识生产者
- Sequence Number:单调递增的序列号
- 去重:Broker 检测并丢弃重复消息
- 限制:单会话、单分区幂等
- 配置:
enable.idempotence=true
原理:
消息1: PID=100, Seq=0
消息2: PID=100, Seq=1
消息2(重试): PID=100, Seq=1 → 被 Broker 去重
第十章:常见问题与解决方案
10.1 常见错误
错误 1: "Not enough replicas"
原因: ISR 副本数不足
解决方案:
- 检查 Broker 是否在线
- 检查网络连接
- 降低
min.insync.replicas - 增加副本数
错误 2: "Offset out of range"
原因: 消费者的 offset 不存在或已过期
解决方案:
// 配置重置策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 或 "latest"// 或手动重置
consumer.seekToBeginning(consumer.assignment());
consumer.seekToEnd(consumer.assignment());
错误 3: "Rebalance timeout"
原因: 消费者处理时间超过 max.poll.interval.ms
解决方案:
- 增加
max.poll.interval.ms - 减少
max.poll.records - 优化业务处理速度
- 使用异步处理
错误 4: "Producer 发送超时"
原因: 网络问题或 Broker 负载过高
解决方案:
// 增加超时时间
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);// 检查 Broker 是否健康
// 检查网络连接
10.2 最佳实践
1. Topic 设计
- 合理规划分区数:CPU 核心数的 2-3 倍
- 副本数设置为 3:平衡性能与可靠性
- 使用有意义的命名:
业务.环境.topic - 设置合理的保留时间:
retention.ms=604800000(7天)
2. 生产者实践
// 推荐配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);// 使用 try-with-resources
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {producer.send(record, callback);
}
3. 消费者实践
// 推荐配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 手动提交 + 幂等性处理
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {if (!isDuplicate(record.key())) {processMessage(record);}}consumer.commitSync();
}
10.3 容量规划
磁盘容量计算
磁盘容量 = 日均消息量 × 消息大小 × 保留天数 × 副本数 / 压缩比示例:
1 亿条/天 × 1KB × 7天 × 3副本 / 0.5 = 4.2TB
分区数规划
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数量)示例:
100MB/s 目标 / 10MB/s 单分区 = 10 个分区
Broker 数量
Broker 数 = 总分区数 × 副本数 / 单 Broker 承载分区数建议:单 Broker 不超过 2000-4000 个分区
附录:快速参考
常用命令速查
# Topic 操作
kafka-topics.sh --create --topic <topic-name> --partitions <num> --replication-factor <num> --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic <topic-name> --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic <topic-name> --bootstrap-server localhost:9092# 消费者组操作
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group <group-name> --bootstrap-server localhost:9092
kafka-consumer-groups.sh --reset-offsets --group <group-name> --topic <topic-name> --to-earliest --execute --bootstrap-server localhost:9092# 生产消费测试
kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic <topic-name> --from-beginning --bootstrap-server localhost:9092
核心配置速查
| 配置项 | 生产者 | 消费者 | 推荐值 |
|---|---|---|---|
| acks | ✓ | all | |
| batch.size | ✓ | 32768 | |
| linger.ms | ✓ | 10-20 | |
| compression.type | ✓ | lz4 | |
| enable.idempotence | ✓ | true | |
| enable.auto.commit | ✓ | false | |
| max.poll.records | ✓ | 500 | |
| session.timeout.ms | ✓ | 30000 |
文档版本: v2.0
最后更新: 2024
适用 Kafka 版本: 2.8+ / 3.x
作者建议: 建议先理解基础概念,再实践代码示例,最后深入高级特性
