Kafka 面试题及详细答案100道(91-95)-- 问题排查与解决方案1
《前后端面试题
》专栏集合了前后端各个知识模块的面试题,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。
文章目录
- 一、本文面试题目录
- 91. KafkaProducer发送消息失败的可能原因有哪些?如何排查?
- 可能的失败原因
- 排查方法与示例代码
- 92. 消费者无法消费到消息,可能的原因是什么?
- 可能的原因及排查方向
- 排查示例
- 93. 如何处理Kafka的消息重复消费问题?
- 重复消费的常见原因
- 解决方案
- 1. 可靠的偏移量提交策略
- 2. 业务层面实现幂等性
- 3. 优化重平衡机制
- 4. 监控与告警
- 94. 如何处理Kafka的消息丢失问题?
- 消息丢失的可能环节及原因
- 解决方案
- 1. 生产者配置优化
- 2. Broker配置优化
- 3. 消费者配置优化
- 4. 辅助措施
- 95. 当 Kafka 出现分区副本不同步(ISR 收缩)时,该如何处理?
- 原理说明
- 常见原因
- 处理步骤
- 注意事项
- 二、100道Kafka 面试题目录列表
一、本文面试题目录
91. KafkaProducer发送消息失败的可能原因有哪些?如何排查?
KafkaProducer发送消息失败可能由多种因素导致,涉及网络、Broker配置、消息格式等多个层面。排查时需结合错误日志和系统监控逐步定位问题。
可能的失败原因
-
网络问题
- Broker地址错误或端口未开放
- 网络延迟或中断(超过
request.timeout.ms
) - 防火墙或安全组限制
-
Broker配置与状态
- Broker集群不可用(如全部宕机)
- 主题不存在且
auto.create.topics.enable=false
- 分区leader不存在(如所有副本离线)
- 超出Broker配置的
message.max.bytes
(消息过大)
-
Producer配置问题
acks
设置为all
但ISR中副本不足- 超时配置过短(
request.timeout.ms
、delivery.timeout.ms
) - 缓冲区满(
buffer.memory
不足且block.on.buffer.full=false
)
-
权限问题
- 缺少发送消息的ACL权限
- 安全认证失败(如SASL、SSL配置错误)
-
消息格式问题
- 序列化失败(Serializer抛出异常)
- 消息键或值为
null
但序列化器不支持
排查方法与示例代码
1. 基础排查步骤
# 检查Broker连通性
telnet broker1 9092# 查看主题状态
bin/kafka-topics.sh --describe --bootstrap-server broker1:9092 --topic test-topic# 查看Broker日志(关键错误信息)
tail -f /var/log/kafka/server.log | grep ERROR
2. 增强Producer的错误处理与日志
import org.apache.kafka.clients.producer.;
import org.apache.kafka.common.errors.;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ErrorHandlingProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092,broker2:9092”);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(ProducerConfig.ACKS_CONFIG, “all”);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);String topic = "test-topic";String message = "test-message";// 同步发送(便于捕获异常)try {RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, message)).get();System.out.printf("消息发送成功: 主题=%s, 分区=%d, 偏移量=%d%n",metadata.topic(), metadata.partition(), metadata.offset());} catch (ExecutionException e) {Throwable cause = e.getCause();handleSendException(cause, topic, message);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("发送被中断: " + e.getMessage());} finally {producer.close();}
}private static void handleSendException(Throwable cause, String topic, String message) {if (cause instanceof UnknownTopicOrPartitionException) {System.err.printf("主题不存在: %s,检查主题是否创建或名称是否正确%n", topic);} else if (cause instanceof NotEnoughReplicasException) {System.err.println("副本不足,无法满足acks=all要求,检查ISR状态");} else if (cause instanceof MessageTooLargeException) {System.err.println("消息过大,超过broker的message.max.bytes配置");} else if (cause instanceof TimeoutException) {System.err.println("发送超时,可能网络延迟或broker负载过高");} else if (cause instanceof AuthorizationException) {System.err.println("无发送权限,检查ACL配置");} else if (cause instanceof SerializationException) {System.err.println("序列化失败: " + cause.getMessage());} else {System.err.printf("发送失败: %s%n", cause.getMessage());cause.printStackTrace();}
}
}
3. 高级排查工具
- 使用
kafka-producer-perf-test.sh
测试基础发送能力:bin/kafka-producer-perf-test.sh \--topic test-topic \--num-records 1000 \--record-size 1024 \--throughput 100 \--producer-props bootstrap.servers=broker1:9092 acks=all
- 启用Producer的DEBUG日志(在
log4j.properties
中设置log4j.logger.org.apache.kafka=DEBUG
)
92. 消费者无法消费到消息,可能的原因是什么?
消费者无法消费到消息是Kafka使用中的常见问题,可能涉及主题配置、消费组状态、权限控制等多个方面。需从消息生产、主题状态、消费配置三个维度逐步排查。
可能的原因及排查方向
-
消息未成功写入Kafka
- 生产者发送失败(如网络问题、权限不足)
- 消息被Broker拒绝(如超过大小限制、主题不存在)
- 事务消息未提交(
isolation.level=read_committed
时)
-
消费组配置问题
- 消费组ID错误或重复(导致重平衡异常)
auto.offset.reset
配置不当(如none
且无初始偏移量)- 消费者未订阅正确的主题(名称错误或正则匹配失败)
-
偏移量问题
- 偏移量已过期(被日志清理删除)
- 偏移量超出分区当前最大偏移量(如手动设置错误)
- 消费组无初始偏移量且
auto.offset.reset=none
-
分区与副本问题
- 分区leader不可用(无可用副本)
- 消费者分配到的分区无消息(如数据分布不均)
- ISR集合为空导致消息无法被消费
-
权限与认证
- 消费者无
READ
主题的权限 - 安全认证失败(如SSL证书过期、SASL配置错误)
- 消费者无
-
代码逻辑问题
- 消费者未调用
poll()
方法或调用频率过低 poll()
超时时间过短导致消息未处理- 消费逻辑中存在死循环或异常未捕获
- 消费者未调用
排查示例
1. 验证消息是否存在
# 查看主题分区的消息数量
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \--bootstrap-server broker1:9092 \--topic test-topic \--time -1 # -1表示最大偏移量,-2表示最小偏移量# 直接消费消息验证
bin/kafka-console-consumer.sh \--bootstrap-server broker1:9092 \--topic test-topic \--from-beginning \--max-messages 10
2. 检查消费组状态
# 查看消费组详情
bin/kafka-consumer-groups.sh \--bootstrap-server broker1:9092 \--describe \--group test-group# 输出说明:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# test-topic 0 100 100 0 consumer-test-group-1-xxx-xxx-xxx-xxx /192.168.1.100 consumer-test-group-1
3. 代码层面排查示例
import org.apache.kafka.clients.consumer.;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.;
public class TroubleshootConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “test-group”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("分区被撤销: " + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("分配到的分区: " + partitions);// 打印初始偏移量Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);for (TopicPartition tp : partitions) {System.out.printf("分区 %s: 起始偏移量=%d, 结束偏移量=%d%n",tp, beginningOffsets.get(tp), endOffsets.get(tp));}}});try {while (true) {System.out.println("开始调用poll()...");ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));System.out.printf("收到 %d 条消息%n", records.count());if (records.isEmpty()) {// 检查当前偏移量和日志结束偏移量Set<TopicPartition> assignedPartitions = consumer.assignment();Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignedPartitions);Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(assignedPartitions);for (TopicPartition tp : assignedPartitions) {Long committed = committedOffsets.get(tp) != null ? committedOffsets.get(tp).offset() : -1;Long end = endOffsets.get(tp);System.out.printf("分区 %s: 已提交偏移量=%d, 结束偏移量=%d, 滞后=%d%n",tp, committed, end, end - committed);}}for (ConsumerRecord<String, String> record : records) {System.out.printf("消费消息: 分区=%d, 偏移量=%d, 内容=%s%n",record.partition(), record.offset(), record.value());}consumer.commitSync();Thread.sleep(1000);}} catch (Exception e) {System.err.println("消费过程异常: " + e.getMessage());e.printStackTrace();} finally {consumer.close();}
}
}
4. 常见解决方案
- 若偏移量过期:重置消费组偏移量(
--reset-offsets --to-earliest
) - 若权限不足:通过
kafka-acls.sh
配置正确权限 - 若分区不可用:检查Broker状态和副本配置
- 若代码逻辑问题:确保
poll()
被正确调用并处理异常
93. 如何处理Kafka的消息重复消费问题?
Kafka消息重复消费指同一消息被消费者多次处理,通常由偏移量提交机制与业务处理逻辑不匹配导致。解决核心是实现幂等性处理与可靠的偏移量管理。
重复消费的常见原因
-
偏移量提交时机不当:
- 自动提交时,消息未处理完成但偏移量已提交(如
enable.auto.commit=true
且处理耗时超过提交间隔)。 - 手动提交前消费者崩溃,重启后从上次提交的偏移量重新消费。
- 自动提交时,消息未处理完成但偏移量已提交(如
-
重平衡(Rebalance)影响:
- 消费组成员变化触发重平衡,未提交的偏移量导致分区重新分配后重复消费。
-
网络与故障恢复:
- 偏移量提交请求失败但消费者误以为成功。
- Broker故障切换后,副本数据同步延迟导致偏移量回滚。
解决方案
1. 可靠的偏移量提交策略
- 手动提交偏移量:确保消息处理完成后再提交,避免“先提交后处理”。
- 批量提交优化:减少提交频率但控制批次大小,平衡性能与可靠性。
public class SafeOffsetCommit {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “idempotent-group”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”); // 禁用自动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("order-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));if (records.isEmpty()) continue;// 记录已处理的偏移量(按分区)Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();for (ConsumerRecord<String, String> record : records) {try {// 处理消息(确保幂等性)processMessage(record);// 记录偏移量(+1表示下一条待消费的位置)TopicPartition tp = new TopicPartition(record.topic(), record.partition());offsetsToCommit.put(tp, new OffsetAndMetadata(record.offset() + 1));} catch (Exception e) {System.err.println("处理消息失败,跳过偏移量提交: " + e.getMessage());}}// 批量提交已处理的偏移量if (!offsetsToCommit.isEmpty()) {consumer.commitSync(offsetsToCommit);System.out.println("提交偏移量: " + offsetsToCommit);}}} finally {consumer.close();}
}// 幂等性处理消息(核心)
private static void processMessage(ConsumerRecord<String, String> record) {String orderId = record.key(); // 假设key为唯一业务ID(如订单ID)// 检查消息是否已处理(如查询数据库或缓存)if (isMessageProcessed(orderId)) {System.out.println("消息已处理,跳过: " + orderId);return;}// 实际业务处理(如写入数据库、调用API)System.out.println("处理消息: " + orderId + ",内容: " + record.value());markMessageAsProcessed(orderId); // 标记为已处理
}// 模拟:检查消息是否已处理
private static boolean isMessageProcessed(String id) {// 实际实现:查询数据库或分布式缓存(如Redis)return false;
}// 模拟:标记消息为已处理
private static void markMessageAsProcessed(String id) {// 实际实现:写入数据库或分布式缓存
}
}
2. 业务层面实现幂等性
即使消息重复,处理结果也应一致。常见方案:
- 唯一标识符:使用消息的
key
(如订单ID、用户ID)作为唯一标识,通过数据库唯一键或缓存去重。 - 状态机设计:业务流程按状态流转(如“待处理→处理中→已完成”),重复消息仅对“待处理”状态生效。
- 分布式锁:处理前通过锁(如Redis锁)确保同一消息同一时间仅被一个消费者处理。
3. 优化重平衡机制
- 减少重平衡频率:
- 合理设置
session.timeout.ms
(默认10秒)和heartbeat.interval.ms
(默认3秒),避免消费者因短暂卡顿被踢出消费组。 - 确保消费者在
max.poll.interval.ms
(默认5分钟)内完成消息处理,避免因超时触发重平衡。
- 合理设置
- 优雅退出:消费者关闭前主动提交偏移量并调用
consumer.unsubscribe()
。
4. 监控与告警
- 监控消费组的
LAG
(消息滞后量),异常增长可能暗示重复消费或处理能力不足。 - 记录重复处理的消息ID,分析重复原因(如网络波动、Broker故障)。
94. 如何处理Kafka的消息丢失问题?
Kafka消息丢失指消息未被正确生产、存储或消费,可能导致数据不完整。需从生产者、Broker、消费者三个环节排查并优化配置。
消息丢失的可能环节及原因
-
生产者环节:
- 消息发送失败未重试(如
retries=0
)。 acks
配置为0
(不等待Broker确认)或1
(仅Leader确认),Leader崩溃后消息丢失。- 生产者缓冲区满(
buffer.memory
不足)且block.on.buffer.full=false
(默认true
,旧版本可能丢弃消息)。
- 消息发送失败未重试(如
-
Broker环节:
- 副本因子不足(
replication-factor=1
),Leader崩溃后无备份。 - ISR(同步副本集)收缩至空,Broker仍允许写入(
min.insync.replicas=1
且Leader离线)。 - 日志清理策略不当(如
retention.ms
过短,消息被提前删除)。
- 副本因子不足(
-
消费者环节:
- 自动提交偏移量(
enable.auto.commit=true
),消息未处理完成但偏移量已提交,消费者崩溃后丢失消息。 - 消费逻辑异常导致消息未处理,但偏移量已提交。
- 自动提交偏移量(
解决方案
1. 生产者配置优化
确保消息可靠发送至Broker:
- 设置
acks=all
:等待所有ISR中的副本确认后才返回成功。 - 启用重试:
retries=N
(如3
)及retry.backoff.ms=1000
,处理临时网络故障。 - 配置足够的缓冲区:
buffer.memory=67108864
(64MB),避免缓冲区溢出。
public class ReliableProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092,broker2:9092”);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”);
// 核心可靠性配置props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有ISR副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 重试间隔1秒props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000); // 总超时30秒props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB缓冲区props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性,避免重试导致重复KafkaProducer<String, String> producer = new KafkaProducer<>(props);String topic = "critical-data-topic";// 发送消息并处理回调ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key1", "critical-message");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("消息发送失败,需人工处理: " + exception.getMessage());// 实际应用中可将失败消息写入死信队列(DLQ)} else {System.out.printf("消息发送成功: 主题=%s, 分区=%d, 偏移量=%d%n",metadata.topic(), metadata.partition(), metadata.offset());}}});producer.flush();producer.close();
}
}
2. Broker配置优化
确保消息持久化与副本可靠性:
- 合理设置副本因子:
replication-factor=3
(生产环境推荐,至少2),避免单点故障。 - 配置
min.insync.replicas=2
:要求至少2个副本同步后才确认写入,与acks=all
配合使用。 - 禁用自动创建主题(可选):
auto.create.topics.enable=false
,避免意外创建低可靠性的主题(默认副本因子1)。 - 调整日志保留策略:根据业务需求设置
retention.ms
(如7天),避免消息过早被清理。
# 示例:创建高可靠性主题
bin/kafka-topics.sh --create \--bootstrap-server broker1:9092 \--topic critical-data-topic \--partitions 3 \--replication-factor 3 \--config min.insync.replicas=2 \--config retention.ms=604800000
3. 消费者配置优化
确保消息被正确处理并提交偏移量:
- 禁用自动提交:
enable.auto.commit=false
,手动提交偏移量(消息处理完成后)。 - 控制消费速度:通过
max.poll.records
限制单次拉取量,避免处理超时。
public class ReliableConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “reliable-consumer-group”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringDeserializer”);
// 核心可靠性配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 禁用自动提交props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 单次拉取最多100条props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟处理超时KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("critical-data-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));if (records.isEmpty()) continue;// 处理消息(确保业务逻辑无异常)boolean allProcessed = processRecords(records);// 所有消息处理完成后再提交偏移量if (allProcessed) {consumer.commitSync();System.out.println("偏移量提交成功");} else {System.err.println("部分消息处理失败,不提交偏移量");}}} finally {consumer.close();}
}// 处理消息,返回是否全部成功
private static boolean processRecords(ConsumerRecords<String, String> records) {boolean allSuccess = true;for (ConsumerRecord<String, String> record : records) {try {// 业务处理逻辑(如写入数据库)System.out.printf("处理消息: 分区=%d, 偏移量=%d, 内容=%s%n",record.partition(), record.offset(), record.value());} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());allSuccess = false;// 可将失败消息写入死信队列}}return allSuccess;
}
}
4. 辅助措施
- 死信队列(DLQ):将处理失败的消息转发至专用主题(如
topic-dlq
),后期人工处理。 - 监控与告警:
- 监控Broker的
UnderReplicatedPartitions
(副本不同步的分区数)。 - 监控消费组的
LAG
,确保消息被及时消费。 - 定期校验数据完整性(如对比生产者发送量与消费者处理量)。
- 监控Broker的
95. 当 Kafka 出现分区副本不同步(ISR 收缩)时,该如何处理?
原理说明
Kafka 中,每个分区有一个领导者副本(Leader)和多个追随者副本(Follower)。ISR(In-Sync Replicas,同步副本集) 是指与 Leader 保持同步的 Follower 集合(包含 Leader 自身)。当 Follower 因某些原因(如网络延迟、磁盘 IO 瓶颈、负载过高)无法及时从 Leader 同步数据时,会被踢出 ISR,导致 ISR 收缩。若 ISR 持续收缩至仅剩余 Leader,可能会因 Leader 故障导致数据丢失风险升高。
常见原因
- 网络问题:Follower 与 Leader 之间网络延迟过高或不稳定,导致同步超时。
- Follower 负载过高:Follower 节点 CPU、内存、磁盘 IO 占用过高,无法及时处理同步请求。
- 参数配置不合理:如
replica.lag.time.max.ms
配置过小(默认 30 秒),轻微延迟就会触发 Follower 被踢出 ISR。 - 数据写入速度过快:Leader 接收消息速度超过 Follower 同步能力,导致 lag 增大。
处理步骤
-
监控与定位
- 通过 Kafka 监控工具(如 Prometheus + Grafana)查看指标:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
:统计未充分复制的分区数(非空即异常)。kafka.cluster:type=Partition,name=Leader,partition=*,topic=*
:查看具体分区的 ISR 状态。
- 检查 Follower 节点的网络状况(
ping
、traceroute
)、磁盘 IO(iostat
)、CPU/内存使用率(top
)。
- 通过 Kafka 监控工具(如 Prometheus + Grafana)查看指标:
-
优化网络环境
- 排查网络链路是否存在丢包或延迟,修复网络故障。
- 若跨机房部署,考虑减少跨机房副本同步,优先在同机房内配置副本。
-
调整 Follower 负载
- 若 Follower 节点资源紧张,迁移部分分区到其他节点,均衡负载。
- 优化 Follower 节点的 JVM 配置(如增大堆内存),避免 GC 停顿影响同步。
-
调整关键参数
replica.lag.time.max.ms
:适当增大该值(如调整为 60000 毫秒),允许 Follower 短暂延迟而不被踢出 ISR。# server.properties replica.lag.time.max.ms=60000 # 从默认 30 秒调整为 60 秒
replica.fetch.min.bytes
:减小 Follower 拉取数据的最小字节数(默认 1 字节),降低同步延迟。replica.fetch.min.bytes=1 # 保持默认或适当减小
replica.fetch.wait.max.ms
:减小 Follower 拉取数据的最大等待时间(默认 500 毫秒),加快同步频率。replica.fetch.wait.max.ms=200 # 从 500 毫秒调整为 200 毫秒
-
重启或重建副本
- 若 Follower 长期无法同步,可重启 Follower 节点尝试恢复。
- 若重启无效,通过
kafka-reassign-partitions.sh
工具重建副本:# 1. 创建分区重分配计划(示例:将 topic1 的分区 0 迁移到节点 2) echo '{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[1,2]}]}' > reassignment.json# 2. 执行重分配 bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --reassignment-json-file reassignment.json --execute
-
长期优化
- 合理规划副本分布,避免将同一分区的多个副本部署在负载过高的节点。
- 定期清理过期日志,避免磁盘空间不足影响 Follower 同步。
注意事项
- 调整
replica.lag.time.max.ms
需平衡数据可靠性与可用性:过大可能导致 ISR 中包含长期落后的副本,过小则易引发 ISR 频繁收缩。 - 若 ISR 频繁收缩且无法通过参数调整解决,需排查硬件故障(如磁盘损坏、网卡故障)。
二、100道Kafka 面试题目录列表
文章序号 | Kafka 100道 |
---|---|
1 | Kafka面试题及详细答案100道(01-10) |
2 | Kafka面试题及详细答案100道(11-22) |
3 | Kafka面试题及详细答案100道(23-35) |
4 | Kafka面试题及详细答案100道(36-50) |
5 | Kafka面试题及详细答案100道(51-65) |
6 | Kafka面试题及详细答案100道(66-80) |
7 | Kafka面试题及详细答案100道(81-90) |
8 | Kafka面试题及详细答案100道(91-95) |
9 | Kafka面试题及详细答案100道(96-100) |