当前位置: 首页 > news >正文

Kafka 从入门到精通完整指南

📑 目录

  • 第一章:Kafka 基础概念
  • 第二章:环境搭建
  • 第三章:生产者开发
  • 第四章:消费者开发
  • 第五章:高级特性
  • 第六章:性能优化
  • 第七章:监控与运维
  • 第八章:Spring Boot 集成
  • 第九章:面试要点
  • 第十章:常见问题与解决方案

第一章:Kafka 基础概念

1.1 什么是 Kafka

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。

核心特点:

  • 高吞吐量:每秒可处理数百万条消息
  • 持久化:消息持久化到磁盘
  • 分布式:天然支持集群部署
  • 容错性:通过副本机制保证高可用

1.2 核心组件

Broker(代理服务器)
  • Kafka 集群中的服务器节点
  • 负责存储和转发消息
  • 一个集群通常由多个 Broker 组成
Topic(主题)
  • 消息的逻辑分类
  • 类似数据库中的表
  • 生产者发送消息到特定 Topic
Partition(分区)
  • Topic 的物理分组
  • 每个 Partition 是一个有序的消息队列
  • 提供并行处理能力和数据冗余
Producer(生产者)
  • 向 Kafka Topic 发送消息的客户端应用
Consumer(消费者)
  • 从 Kafka Topic 订阅并消费消息的客户端应用
Consumer Group(消费者组)
  • 多个消费者的逻辑组合
  • 同一组内的消费者共同消费 Topic 的所有分区
  • 实现负载均衡和故障转移
Offset(偏移量)
  • 消息在分区中的唯一标识(递增的整数)
  • 消费者通过 Offset 追踪消费进度

1.3 架构图

┌─────────────┐
│  Producer   │
└──────┬──────┘│ 发送消息▼
┌─────────────────────────────────┐
│         Kafka Cluster           │
│  ┌─────────┐  ┌─────────┐      │
│  │ Broker1 │  │ Broker2 │ ...  │
│  └─────────┘  └─────────┘      │
│                                 │
│  Topic: my-topic                │
│  ├─ Partition 0 (Leader)        │
│  ├─ Partition 1 (Leader)        │
│  └─ Partition 2 (Leader)        │
└─────────────────────────────────┘│ 拉取消息▼
┌──────────────────┐
│ Consumer Group   │
│ ┌──────────────┐ │
│ │ Consumer 1   │ │
│ │ Consumer 2   │ │
│ └──────────────┘ │
└──────────────────┘

第二章:环境搭建

2.1 Maven 依赖

<dependencies><!-- Kafka 客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency><!-- JSON 序列化(可选) --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.7</version></dependency>
</dependencies>

2.2 Docker Compose 快速启动

创建 docker-compose.yml 文件:

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

启动命令:

docker-compose up -d

2.3 验证安装

# 进入 Kafka 容器
docker exec -it kafka bash# 创建测试 Topic
kafka-topics --create --topic test-topic \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1# 查看 Topic 列表
kafka-topics --list --bootstrap-server localhost:9092

第三章:生产者开发

3.1 基础生产者示例

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class BasicProducer {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2. 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {// 3. 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);// 异步发送并处理回调producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("消息发送成功 - Topic: %s, Partition: %d, Offset: %d%n",metadata.topic(), metadata.partition(), metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}});}} finally {// 4. 关闭生产者producer.close();}}
}

3.2 同步发送 vs 异步发送

同步发送
public class SyncProducer {public void sendSync(KafkaProducer<String, String> producer) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");try {// 同步发送,等待结果RecordMetadata metadata = producer.send(record).get();System.out.println("发送成功,Offset: " + metadata.offset());} catch (Exception e) {System.err.println("发送失败: " + e.getMessage());}}
}
异步发送
public class AsyncProducer {public void sendAsync(KafkaProducer<String, String> producer) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");// 异步发送,立即返回producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("发送成功");} else {System.err.println("发送失败: " + exception.getMessage());}}});}
}

3.3 生产者重要配置

public class ProducerConfiguration {public static Properties getReliableConfig() {Properties props = new Properties();// 基础配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 可靠性配置props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性// 性能配置props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小 16KBprops.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟 10msprops.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区 32MBreturn props;}
}

配置说明:

配置项说明推荐值
acks确认机制:0-不等待,1-等待Leader,all-等待所有ISRall
retries重试次数3 或 Integer.MAX_VALUE
batch.size批量大小(字节)16384 (16KB)
linger.ms等待时间(毫秒)10-100
compression.type压缩类型:none/gzip/snappy/lz4/zstdlz4 或 snappy

3.4 自定义分区器

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionCountForTopic(topic);if (key == null) {return 0; // key 为空发送到分区 0}// 根据 key 的哈希值选择分区return Math.abs(key.hashCode()) % numPartitions;}@Overridepublic void close() {// 清理资源}@Overridepublic void configure(Map<String, ?> configs) {// 初始化配置}
}// 使用自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

3.5 事务生产者

public class TransactionalProducer {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务配置props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务producer.initTransactions();try {// 开始事务producer.beginTransaction();// 发送消息producer.send(new ProducerRecord<>("topic1", "key1", "value1"));producer.send(new ProducerRecord<>("topic2", "key2", "value2"));// 提交事务producer.commitTransaction();System.out.println("事务提交成功");} catch (Exception e) {// 回滚事务producer.abortTransaction();System.err.println("事务回滚: " + e.getMessage());} finally {producer.close();}}
}

⚠️ 生产者注意事项

  1. 必须关闭生产者:使用 try-finally 或 try-with-resources 确保资源释放
  2. 异步发送需要回调:异步发送时必须处理回调,否则无法感知失败
  3. 幂等性配置:开启幂等性可防止重复消息,但会略微降低性能
  4. 批量大小:batch.size 过小影响吞吐量,过大增加延迟
  5. 序列化器选择:根据数据类型选择合适的序列化器

第四章:消费者开发

4.1 基础消费者示例

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;public class BasicConsumer {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 2. 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));try {// 4. 循环拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息 - Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}} finally {// 5. 关闭消费者consumer.close();}}
}

4.2 手动提交 Offset

同步提交
public class ManualCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-commit-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 处理消息processMessage(record);}// 同步提交 offsetconsumer.commitSync();}} catch (CommitFailedException e) {System.err.println("提交失败: " + e.getMessage());} finally {consumer.close();}}private static void processMessage(ConsumerRecord<String, String> record) {System.out.println("处理消息: " + record.value());}
}
异步提交
public class AsyncCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "async-commit-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {processMessage(record);}// 异步提交consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("提交失败: " + exception.getMessage());} else {System.out.println("提交成功: " + offsets);}});}} finally {consumer.close();}}private static void processMessage(ConsumerRecord<String, String> record) {System.out.println("处理消息: " + record.value());}
}

4.3 指定分区和 Offset 消费

public class PartitionConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "partition-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 手动指定分区TopicPartition partition0 = new TopicPartition("test-topic", 0);TopicPartition partition1 = new TopicPartition("test-topic", 1);consumer.assign(Arrays.asList(partition0, partition1));// 从指定 offset 开始消费consumer.seek(partition0, 100);// 从最早开始消费// consumer.seekToBeginning(Collections.singletonList(partition0));// 从最新开始消费// consumer.seekToEnd(Collections.singletonList(partition0));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("消费消息: " + record.value());}}} finally {consumer.close();}}
}

4.4 消费者重要配置

public class ConsumerConfiguration {public static Properties getOptimizedConfig() {Properties props = new Properties();// 基础配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// Offset 配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest/latest/none// 性能配置props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取 1KBprops.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最多等待 500msprops.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取 500 条// 会话管理props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 会话超时 30sprops.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 心跳间隔 3sprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 最大处理时间 5minreturn props;}
}

配置说明:

配置项说明推荐值
group.id消费者组 ID必填
enable.auto.commit是否自动提交false(手动提交)
auto.offset.resetOffset 不存在时的策略earliest
max.poll.records每次拉取的最大记录数500
session.timeout.ms会话超时时间30000 (30s)
heartbeat.interval.ms心跳间隔3000 (3s)

4.5 多线程消费

import java.util.concurrent.*;public class MultiThreadConsumer {private static final int THREAD_COUNT = 3;public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);// 创建多个消费者线程for (int i = 0; i < THREAD_COUNT; i++) {executorService.submit(new ConsumerWorker(i));}// 优雅关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("开始关闭...");executorService.shutdown();try {executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}));}static class ConsumerWorker implements Runnable {private final int workerId;public ConsumerWorker(int workerId) {this.workerId = workerId;}@Overridepublic void run() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-thread-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (!Thread.currentThread().isInterrupted()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("Worker %d 消费: %s%n", workerId, record.value());}}} finally {consumer.close();}}}
}

⚠️ 消费者注意事项

  1. Consumer 非线程安全:不能在多线程中共享同一个 Consumer 实例
  2. 心跳超时:确保 poll 方法调用频率足够高,避免心跳超时被踢出组
  3. 消息丢失:自动提交可能导致消息丢失,建议手动提交
  4. 重复消费:手动提交失败会导致重复消费,需要业务幂等性
  5. Rebalance:频繁 Rebalance 会影响性能,合理配置 session.timeout.ms

第五章:高级特性

5.1 自定义序列化器(JSON)

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.*;// JSON 序列化器
public class JsonSerializer<T> implements Serializer<T> {private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}try {return objectMapper.writeValueAsBytes(data);} catch (Exception e) {throw new RuntimeException("JSON 序列化失败", e);}}@Overridepublic void close() {// 清理资源}
}// JSON 反序列化器
public class JsonDeserializer<T> implements Deserializer<T> {private final ObjectMapper objectMapper = new ObjectMapper();private Class<T> type;public JsonDeserializer(Class<T> type) {this.type = type;}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 可以从 configs 中读取类型信息}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}try {return objectMapper.readValue(data, type);} catch (Exception e) {throw new RuntimeException("JSON 反序列化失败", e);}}@Overridepublic void close() {// 清理资源}
}// 使用示例
public class User {private String id;private String name;private int age;// getters and setters
}// 生产者使用
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());KafkaProducer<String, User> producer = new KafkaProducer<>(props);
User user = new User("1", "张三", 25);
producer.send(new ProducerRecord<>("user-topic", user.getId(), user));

5.2 消息拦截器

import org.apache.kafka.clients.producer.*;public class MessageInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 发送前拦截,可以修改消息String timestamp = String.valueOf(System.currentTimeMillis());String newValue = record.value() + " [时间戳: " + timestamp + "]";return new ProducerRecord<>(record.topic(), record.partition(), record.key(), newValue, record.headers());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 收到响应时触发if (exception == null) {System.out.println("消息发送成功 - Offset: " + metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}}@Overridepublic void close() {// 清理资源}@Overridepublic void configure(Map<String, ?> configs) {// 初始化配置}
}// 配置拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MessageInterceptor.class.getName());

5.3 Exactly-Once 语义

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;public class ExactlyOnceExample {public static void main(String[] args) {// 生产者配置Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-producer");producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 消费者配置Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once-group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 重要!KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);producer.initTransactions();consumer.subscribe(Collections.singletonList("input-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if (!records.isEmpty()) {producer.beginTransaction();try {// 处理消息并发送到输出 topicfor (ConsumerRecord<String, String> record : records) {String processedValue = processMessage(record.value());producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));}// 将消费者的 offset 也作为事务的一部分提交Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());producer.commitTransaction();System.out.println("事务提交成功");} catch (Exception e) {producer.abortTransaction();System.err.println("事务回滚: " + e.getMessage());}}}} finally {producer.close();consumer.close();}}private static String processMessage(String value) {// 业务处理逻辑return value.toUpperCase();}private static Map<TopicPartition, OffsetAndMetadata> getOffsets(ConsumerRecords<String, String> records) {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);long offset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition, new OffsetAndMetadata(offset + 1));}return offsets;}
}

5.4 Kafka Streams 流处理

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;public class KafkaStreamsExample {public static void main(String[] args) {// 配置Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();// 读取输入流KStream<String, String> source = builder.stream("input-topic");// 处理流数据KStream<String, String> processed = source.filter((key, value) -> value != null && value.length() > 5)  // 过滤.mapValues(value -> value.toUpperCase())                      // 转换.peek((key, value) -> System.out.println("处理: " + value));  // 查看// 输出到目标 topicprocessed.to("output-topic");// 构建并启动流处理应用KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();// 优雅关闭Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}

5.5 聚合与窗口操作

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;public class AggregationExample {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StreamsBuilder builder = new StreamsBuilder();KStream<String, String> stream = builder.stream("input-topic");// 1. 按键分组并计数KTable<String, Long> counts = stream.groupByKey().count();// 2. 时间窗口聚合(5 分钟窗口)TimeWindowedKStream<String, String> windowedStream = stream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)));KTable<Windowed<String>, Long> windowedCounts = windowedStream.count();// 3. 会话窗口(间隔超过 10 分钟视为新会话)SessionWindowedKStream<String, String> sessionStream = stream.groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)));KTable<Windowed<String>, Long> sessionCounts = sessionStream.count();// 输出结果counts.toStream().to("count-output");KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}

第六章:性能优化

6.1 生产者性能优化

public class OptimizedProducer {public static Properties getHighThroughputConfig() {Properties props = new Properties();// 基础配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 吞吐量优化props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB 批量大小props.put(ProducerConfig.LINGER_MS_CONFIG, 20);            // 延迟 20ms 等待批量props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // 使用 LZ4 压缩props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB 缓冲区// 可靠性优化(根据需求调整)props.put(ProducerConfig.ACKS_CONFIG, "1");                // Leader 确认即可props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 超时配置props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);return props;}
}

6.2 消费者性能优化

public class OptimizedConsumer {public static Properties getHighThroughputConfig() {Properties props = new Properties();// 基础配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 性能优化props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);        // 最小 1KBprops.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);       // 最多等待 500msprops.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);       // 每次拉取 1000 条// 提交策略props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);    // 手动提交// 会话管理props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);    // 30sprops.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);  // 3sprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5minreturn props;}
}

6.3 批量处理优化

import java.util.*;public class BatchProcessingConsumer {private static final int BATCH_SIZE = 100;public static void main(String[] args) {Properties props = OptimizedConsumer.getHighThroughputConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));List<ConsumerRecord<String, String>> buffer = new ArrayList<>();try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {buffer.add(record);// 达到批量大小时处理if (buffer.size() >= BATCH_SIZE) {processBatch(buffer);consumer.commitSync();buffer.clear();}}// 处理剩余消息if (!buffer.isEmpty()) {processBatch(buffer);consumer.commitSync();buffer.clear();}}} finally {consumer.close();}}private static void processBatch(List<ConsumerRecord<String, String>> batch) {System.out.println("批量处理 " + batch.size() + " 条消息");// 批量写入数据库// batchInsertToDatabase(batch);// 或批量处理业务逻辑for (ConsumerRecord<String, String> record : batch) {// 处理单条消息}}
}

6.4 性能调优建议

生产者调优:

  1. 增大批量大小batch.size=32768 (32KB)
  2. 增加延迟时间linger.ms=20 (20ms)
  3. 启用压缩compression.type=lz4
  4. 增大缓冲区buffer.memory=67108864 (64MB)
  5. 降低 acksacks=1(根据可靠性需求)

消费者调优:

  1. 增加拉取量max.poll.records=1000
  2. 增加分区数:提高并行度
  3. 增加消费者数:不超过分区数
  4. 批量处理:积累一批消息后统一处理
  5. 异步处理:使用线程池处理消息

Broker 调优(server.properties):

# 增加网络和 IO 线程
num.network.threads=8
num.io.threads=8# 增加缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400# 日志段大小
log.segment.bytes=1073741824# 刷盘策略(性能优先)
log.flush.interval.messages=10000
log.flush.interval.ms=1000

第七章:监控与运维

7.1 JMX 监控指标

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;public class MetricsMonitoring {public static void printProducerMetrics(KafkaProducer<String, String> producer) {Map<MetricName, ? extends Metric> metrics = producer.metrics();System.out.println("=== 生产者指标 ===");for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {MetricName name = entry.getKey();Metric metric = entry.getValue();// 重要指标if (name.name().equals("record-send-rate") ||name.name().equals("record-error-rate") ||name.name().equals("request-latency-avg") ||name.name().equals("batch-size-avg")) {System.out.printf("%s: %.2f%n", name.name(), metric.metricValue());}}}public static void printConsumerMetrics(KafkaConsumer<String, String> consumer) {Map<MetricName, ? extends Metric> metrics = consumer.metrics();System.out.println("=== 消费者指标 ===");for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {MetricName name = entry.getKey();Metric metric = entry.getValue();// 重要指标if (name.name().equals("records-consumed-rate") ||name.name().equals("fetch-latency-avg") ||name.name().equals("records-lag-max")) {System.out.printf("%s: %.2f%n", name.name(), metric.metricValue());}}}
}

关键监控指标:

生产者指标:

  • record-send-rate:消息发送速率(条/秒)
  • record-error-rate:消息错误率
  • request-latency-avg:平均请求延迟(ms)
  • batch-size-avg:平均批量大小(字节)
  • compression-rate-avg:压缩比率

消费者指标:

  • records-consumed-rate:消息消费速率(条/秒)
  • fetch-latency-avg:拉取延迟(ms)
  • records-lag-max:最大消费延迟(条数)
  • commit-latency-avg:提交延迟(ms)

Broker 指标(JMX):

  • UnderReplicatedPartitions:未充分复制的分区数
  • OfflinePartitionsCount:离线分区数
  • ActiveControllerCount:活跃控制器数(应该为 1)
  • BytesInPerSec:入站字节速率
  • BytesOutPerSec:出站字节速率

7.2 管理操作

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaAdminOperations {private final AdminClient adminClient;public KafkaAdminOperations(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);}// 创建 Topicpublic void createTopic(String topicName, int partitions, short replicationFactor) throws ExecutionException, InterruptedException {NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));result.all().get();System.out.println("Topic 创建成功: " + topicName);}// 删除 Topicpublic void deleteTopic(String topicName) throws ExecutionException, InterruptedException {DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton(topicName));result.all().get();System.out.println("Topic 删除成功: " + topicName);}// 列出所有 Topicpublic void listTopics() throws ExecutionException, InterruptedException {ListTopicsResult result = adminClient.listTopics();Set<String> topics = result.names().get();System.out.println("所有 Topics: " + topics);}// 查看 Topic 详情public void describeTopics(String... topicNames) throws ExecutionException, InterruptedException {DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicNames));Map<String, TopicDescription> descriptions = result.all().get();for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {TopicDescription desc = entry.getValue();System.out.printf("Topic: %s, Partitions: %d%n", desc.name(), desc.partitions().size());for (TopicPartitionInfo partition : desc.partitions()) {System.out.printf("  Partition %d: Leader=%s, Replicas=%s%n",partition.partition(), partition.leader().id(),partition.replicas());}}}// 列出消费者组public void listConsumerGroups() throws ExecutionException, InterruptedException {ListConsumerGroupsResult result = adminClient.listConsumerGroups();Collection<ConsumerGroupListing> groups = result.all().get();System.out.println("=== 消费者组列表 ===");for (ConsumerGroupListing group : groups) {System.out.println("消费者组: " + group.groupId());}}// 查看消费者组详情public void describeConsumerGroup(String groupId) throws ExecutionException, InterruptedException {DescribeConsumerGroupsResult result = adminClient.describeConsumerGroups(Collections.singleton(groupId));ConsumerGroupDescription description = result.all().get().get(groupId);System.out.println("=== 消费者组详情 ===");System.out.println("消费者组: " + description.groupId());System.out.println("状态: " + description.state());System.out.println("成员数: " + description.members().size());}public void close() {adminClient.close();}public static void main(String[] args) throws Exception {KafkaAdminOperations admin = new KafkaAdminOperations("localhost:9092");// 创建 Topicadmin.createTopic("admin-test-topic", 3, (short) 1);// 列出所有 Topicsadmin.listTopics();// 查看 Topic 详情admin.describeTopics("admin-test-topic");// 列出消费者组admin.listConsumerGroups();admin.close();}
}

7.3 常用运维命令

# ========== Topic 管理 ==========# 创建 Topic
kafka-topics.sh --create --topic my-topic \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 2# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092# 查看 Topic 详情
kafka-topics.sh --describe --topic my-topic \--bootstrap-server localhost:9092# 增加分区数(只能增加不能减少)
kafka-topics.sh --alter --topic my-topic \--partitions 5 --bootstrap-server localhost:9092# 删除 Topic
kafka-topics.sh --delete --topic my-topic \--bootstrap-server localhost:9092# ========== 消费者组管理 ==========# 查看消费者组列表
kafka-consumer-groups.sh --list \--bootstrap-server localhost:9092# 查看消费者组详情
kafka-consumer-groups.sh --describe --group my-group \--bootstrap-server localhost:9092# 重置 Offset 到最早
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic \--to-earliest --execute \--bootstrap-server localhost:9092# 重置 Offset 到最新
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic \--to-latest --execute \--bootstrap-server localhost:9092# 重置 Offset 到指定位置
kafka-consumer-groups.sh --reset-offsets \--group my-group --topic my-topic:0 \--to-offset 100 --execute \--bootstrap-server localhost:9092# ========== 生产和消费测试 ==========# 生产消息(命令行)
kafka-console-producer.sh --topic my-topic \--bootstrap-server localhost:9092# 消费消息(从最早开始)
kafka-console-consumer.sh --topic my-topic \--from-beginning --bootstrap-server localhost:9092# 消费消息(显示 key)
kafka-console-consumer.sh --topic my-topic \--property print.key=true \--property key.separator=: \--from-beginning --bootstrap-server localhost:9092# ========== 性能测试 ==========# 生产者性能测试
kafka-producer-perf-test.sh \--topic perf-test \--num-records 1000000 \--record-size 1024 \--throughput 100000 \--producer-props bootstrap.servers=localhost:9092# 消费者性能测试
kafka-consumer-perf-test.sh \--topic perf-test \--messages 1000000 \--bootstrap-server localhost:9092

第八章:Spring Boot 集成

8.1 Maven 依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.0</version>
</dependency>

8.2 配置文件

spring:kafka:bootstrap-servers: localhost:9092# 生产者配置producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3batch-size: 16384linger-ms: 10buffer-memory: 33554432compression-type: lz4# 消费者配置consumer:group-id: my-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 500# 监听器配置listener:ack-mode: manual_immediate  # 手动立即确认concurrency: 3               # 并发度

8.3 Spring Kafka 使用示例

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;import java.util.List;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// ========== 生产者 ==========// 简单发送public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}// 发送带 key 的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}// 异步发送带回调public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("发送成功: " + result.getRecordMetadata().offset()),ex -> System.err.println("发送失败: " + ex.getMessage()));}// ========== 消费者 ==========// 简单消费@KafkaListener(topics = "test-topic", groupId = "my-group")public void consume(String message) {System.out.println("收到消息: " + message);}// 手动确认@KafkaListener(topics = "test-topic", groupId = "manual-group")public void consumeWithAck(String message, Acknowledgment acknowledgment) {System.out.println("收到消息: " + message);// 处理消息processMessage(message);// 手动确认acknowledgment.acknowledge();}// 获取消息元数据@KafkaListener(topics = "test-topic", groupId = "meta-group")public void consumeWithMetadata(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.OFFSET) long offset) {System.out.printf("Topic: %s, Partition: %d, Offset: %d, Message: %s%n",topic, partition, offset, message);}// 批量消费@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatch(List<String> messages, Acknowledgment acknowledgment) {System.out.println("批量接收 " + messages.size() + " 条消息");for (String message : messages) {processMessage(message);}acknowledgment.acknowledge();}// 消费指定分区@KafkaListener(topicPartitions = @TopicPartition(topic = "partition-topic",partitions = {"0", "1"}),groupId = "partition-group")public void consumePartition(String message) {System.out.println("分区消息: " + message);}private void processMessage(String message) {// 业务逻辑处理System.out.println("处理消息: " + message);}
}

8.4 配置类

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {// ========== 生产者配置 ==========@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.ACKS_CONFIG, "all");config.put(ProducerConfig.RETRIES_CONFIG, 3);config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);config.put(ProducerConfig.LINGER_MS_CONFIG, 10);config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");return new DefaultKafkaProducerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// ========== 消费者配置 ==========@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);return new DefaultKafkaConsumerFactory<>(config);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);  // 并发度factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

第九章:面试要点

9.1 基础概念

Q1: Kafka 的核心架构是什么?

答案: Kafka 主要由以下组件构成:

  1. Broker:Kafka 集群中的服务器节点,负责存储和转发消息
  2. Topic:消息的逻辑分类,类似数据库中的表
  3. Partition:Topic 的物理分组,每个 Partition 是一个有序的消息队列
  4. Producer:生产者,向 Kafka 发送消息
  5. Consumer:消费者,从 Kafka 消费消息
  6. Consumer Group:消费者组,实现负载均衡和故障转移
  7. ZooKeeper/KRaft:元数据管理和协调服务
Q2: Kafka 如何保证高吞吐量?

答案:

  1. 顺序写磁盘:避免随机 IO,性能接近内存
  2. 零拷贝(Zero-Copy):使用 sendfile() 系统调用,减少数据拷贝次数
  3. 批量发送:减少网络往返次数
  4. 消息压缩:减少网络传输和磁盘占用
  5. 分区并行:多个分区并行读写,充分利用多核 CPU
  6. Page Cache:利用操作系统的页缓存
Q3: Partition 的作用是什么?

答案:

  1. 负载均衡:数据分散到多个 Broker,避免单点瓶颈
  2. 并行处理:多个消费者可以并行消费不同分区
  3. 容错:通过副本机制实现高可用
  4. 有序性:单个分区内消息严格有序

9.2 可靠性保证

Q4: Kafka 如何保证消息不丢失?

答案:

生产者端:

  • 设置 acks=all:等待所有 ISR 副本确认
  • 启用幂等性:enable.idempotence=true
  • 配置重试:retries=Integer.MAX_VALUE
  • 同步发送或正确处理异步回调

Broker 端:

  • 副本数 >= 2:replication.factor>=2
  • ISR 最小数量:min.insync.replicas>=2
  • 禁用 unclean 选举:unclean.leader.election.enable=false

消费者端:

  • 手动提交 offset:enable.auto.commit=false
  • 先处理消息,再提交 offset
  • 使用事务消费(Exactly-Once)
Q5: Kafka 的三种消息语义是什么?

答案:

语义描述实现方式适用场景
At Most Once最多一次,消息可能丢失,不会重复先提交 offset,再处理消息日志收集
At Least Once至少一次,消息不会丢失,可能重复先处理消息,再提交 offset最常用
Exactly Once精确一次,消息不丢失、不重复事务 API + 幂等性金融交易
Q6: 如何实现 Exactly-Once?

答案:

// 1. 生产者启用事务和幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 2. 消费者设置隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");// 3. 在事务中处理消息
producer.beginTransaction();
// 处理并发送消息
producer.sendOffsetsToTransaction(offsets, groupMetadata);
producer.commitTransaction();

核心原理:

  • 幂等性:通过 PID + Sequence Number 去重
  • 事务:保证消息发送和 offset 提交的原子性
  • 隔离级别:消费者只读取已提交的消息

9.3 性能优化

Q7: 如何提高生产者性能?

答案:

  1. 批量发送:增大 batch.sizelinger.ms
  2. 压缩:使用 compression.type=lz4
  3. 异步发送:使用回调处理结果
  4. 增大缓冲区:调大 buffer.memory
  5. 减少 acks:使用 acks=1(根据需求)
  6. 多线程:创建多个 Producer 实例
Q8: 如何提高消费者性能?

答案:

  1. 增加分区数:提高并行度
  2. 增加消费者数:但不超过分区数
  3. 批量处理:积累一批消息后统一处理
  4. 异步处理:使用线程池处理消息
  5. 调整拉取参数fetch.min.bytesfetch.max.wait.ms
  6. 手动提交:批量提交 offset
Q9: Kafka 的零拷贝原理?

答案:

传统方式(4 次拷贝): 磁盘 → 内核缓冲区 → 用户缓冲区 → Socket 缓冲区 → 网卡

零拷贝(2 次拷贝): 磁盘 → 内核缓冲区 → 网卡

  • 使用 sendfile() 系统调用
  • 数据直接从文件传输到 Socket
  • Java NIO 的 FileChannel.transferTo() 实现

9.4 集群与运维

Q10: Kafka 的副本机制?

答案:

  • Leader 副本:处理所有读写请求
  • Follower 副本:只同步数据,不处理请求
  • ISR(In-Sync Replicas):与 Leader 保持同步的副本集合
  • OSR(Out-of-Sync Replicas):落后太多的副本
  • Leader 故障时,从 ISR 中选举新 Leader
Q11: 什么是 Rebalance?如何避免?

答案:

Rebalance 触发条件:

  1. 消费者组成员变化(加入/退出)
  2. 订阅的 Topic 分区数变化
  3. 消费者心跳超时

影响:

  • 所有消费者停止工作
  • 重新分配分区
  • 可能导致重复消费

避免方法:

  1. 增加 session.timeout.msmax.poll.interval.ms
  2. 减少单次 poll 的数据量:max.poll.records
  3. 提高消费速度,避免处理超时
  4. 合理设置心跳间隔:heartbeat.interval.ms
  5. 使用静态成员(Kafka 2.3+)
Q12: Kafka 如何保证消息有序?

答案:

  • 单分区有序:同一分区内消息严格有序
  • 全局有序:只能使用单分区 Topic
  • Key 相对有序:相同 Key 的消息发送到同一分区

实现方式:

// 指定 key,相同 key 进入同一分区
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "userID", "message");
Q13: Controller 的作用?

答案:

职责:

  • 管理分区状态
  • 进行 Leader 选举
  • 处理 Broker 上下线
  • 分区副本分配

选举:

  • 第一个在 ZooKeeper 创建临时节点的 Broker
  • Controller 故障时自动重新选举

9.5 实战问题

Q14: 消费者消费很慢怎么办?

答案:

  1. 增加消费者数量:前提是分区数足够
  2. 增加分区数:提高并行度
  3. 异步处理:消费者只负责接收,业务处理交给线程池
  4. 批量处理:减少数据库等外部调用次数
  5. 优化业务逻辑:减少单条消息处理时间
  6. 扩容:增加 Broker 数量
Q15: 如何处理消息积压?

答案:

  1. 临时扩容:快速增加消费者
  2. 创建临时 Topic:将消息转发到多个临时 Topic 并行消费
  3. 降级处理:跳过不重要的消息
  4. 限流:生产端限流,减轻压力
  5. 扩容分区:增加分区数后重新分配
Q16: Kafka 为什么快?

答案:

  1. 顺序 IO:磁盘顺序读写速度快
  2. Page Cache:利用操作系统缓存
  3. 零拷贝:减少数据拷贝次数
  4. 批量压缩:减少网络 IO
  5. 分区并行:充分利用多核 CPU
  6. 高效序列化:紧凑的二进制格式
Q17: Kafka 与 RabbitMQ 的区别?

答案:

特性KafkaRabbitMQ
架构分布式日志消息队列
吞吐量极高(百万级/秒)较高(万级/秒)
消息顺序分区内有序队列有序
持久化强制持久化可选
消息堆积支持大量堆积堆积影响性能
消费模式PullPush/Pull
适用场景大数据、日志收集、流处理任务队列、RPC
Q18: 如何监控 Kafka?

答案:

关键指标:

  • Broker:CPU、内存、磁盘、网络 IO
  • Topic:消息速率、字节速率
  • Consumer:消费延迟(Lag)、消费速率
  • Producer:发送速率、错误率
  • 系统:Under-replicated partitions、Offline partitions

监控工具:

  • Kafka Manager(CMAK)
  • Prometheus + Grafana
  • JMX Metrics
  • Burrow(Consumer Lag 监控)
Q19: Kafka 事务的实现原理?

答案:

  1. 事务协调器(Transaction Coordinator)

    • 管理事务状态
    • 存储在内部 Topic __transaction_state
  2. 两阶段提交

    • 准备阶段:写入所有分区
    • 提交阶段:写入事务标记
  3. 事务 ID

    • 保证生产者唯一性
    • 实现跨会话幂等性
  4. 隔离级别

    • read_uncommitted:读取所有消息
    • read_committed:只读取已提交消息
Q20: Kafka 如何实现幂等性?

答案:

  • Producer ID(PID):唯一标识生产者
  • Sequence Number:单调递增的序列号
  • 去重:Broker 检测并丢弃重复消息
  • 限制:单会话、单分区幂等
  • 配置enable.idempotence=true

原理:

消息1: PID=100, Seq=0
消息2: PID=100, Seq=1
消息2(重试): PID=100, Seq=1 → 被 Broker 去重

第十章:常见问题与解决方案

10.1 常见错误

错误 1: "Not enough replicas"

原因: ISR 副本数不足

解决方案:

  1. 检查 Broker 是否在线
  2. 检查网络连接
  3. 降低 min.insync.replicas
  4. 增加副本数
错误 2: "Offset out of range"

原因: 消费者的 offset 不存在或已过期

解决方案:

// 配置重置策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 或 "latest"// 或手动重置
consumer.seekToBeginning(consumer.assignment());
consumer.seekToEnd(consumer.assignment());
错误 3: "Rebalance timeout"

原因: 消费者处理时间超过 max.poll.interval.ms

解决方案:

  1. 增加 max.poll.interval.ms
  2. 减少 max.poll.records
  3. 优化业务处理速度
  4. 使用异步处理
错误 4: "Producer 发送超时"

原因: 网络问题或 Broker 负载过高

解决方案:

// 增加超时时间
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);// 检查 Broker 是否健康
// 检查网络连接

10.2 最佳实践

1. Topic 设计
  • 合理规划分区数:CPU 核心数的 2-3 倍
  • 副本数设置为 3:平衡性能与可靠性
  • 使用有意义的命名业务.环境.topic
  • 设置合理的保留时间retention.ms=604800000(7天)
2. 生产者实践
// 推荐配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);// 使用 try-with-resources
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {producer.send(record, callback);
}
3. 消费者实践
// 推荐配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 手动提交 + 幂等性处理
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {if (!isDuplicate(record.key())) {processMessage(record);}}consumer.commitSync();
}

10.3 容量规划

磁盘容量计算
磁盘容量 = 日均消息量 × 消息大小 × 保留天数 × 副本数 / 压缩比示例:
1 亿条/天 × 1KB × 7天 × 3副本 / 0.5 = 4.2TB
分区数规划
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数量)示例:
100MB/s 目标 / 10MB/s 单分区 = 10 个分区
Broker 数量
Broker 数 = 总分区数 × 副本数 / 单 Broker 承载分区数建议:单 Broker 不超过 2000-4000 个分区

附录:快速参考

常用命令速查

# Topic 操作
kafka-topics.sh --create --topic <topic-name> --partitions <num> --replication-factor <num> --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic <topic-name> --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic <topic-name> --bootstrap-server localhost:9092# 消费者组操作
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group <group-name> --bootstrap-server localhost:9092
kafka-consumer-groups.sh --reset-offsets --group <group-name> --topic <topic-name> --to-earliest --execute --bootstrap-server localhost:9092# 生产消费测试
kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic <topic-name> --from-beginning --bootstrap-server localhost:9092

核心配置速查

配置项生产者消费者推荐值
acksall
batch.size32768
linger.ms10-20
compression.typelz4
enable.idempotencetrue
enable.auto.commitfalse
max.poll.records500
session.timeout.ms30000

文档版本: v2.0
最后更新: 2024
适用 Kafka 版本: 2.8+ / 3.x
作者建议: 建议先理解基础概念,再实践代码示例,最后深入高级特性

http://www.dtcms.com/a/568547.html

相关文章:

  • 常见二三维GIS数据分类及处理流程图
  • LLM结构化输出:约束解码、CFG和response_format
  • 做网站麻烦不文山网站建设求职简历
  • wordpress网站静态页面外国食品优秀设计网站
  • hybrid
  • C++中malloc、free和new、delete的区别
  • 计算机视觉:python车辆行人检测与跟踪系统 YOLO模型 SORT算法 PyQt5界面 目标检测+目标跟踪 深度学习 计算机✅
  • 提高肠氧饱和度测量精度的新技术评估
  • 【数据集+源码+文章】基于yolov8+streamlit的12种水果品质、成熟度检测系统
  • Camera参数(3A)
  • 【C++:搜索二叉树】二叉搜索树从理论到实战完全解读:原理、两种场景下的实现
  • 高性能网络编程实战:用Tokio构建自定义协议服务器
  • H265 vs AV1 vs H266帧内块拷贝差异
  • CSS 中 `data-status` 的使用详解
  • 舟山企业网站建设公司微信小程序麻将辅助免费
  • VMware替代 | 详解ZStack ZSphere产品化运维六大特性
  • 缓存击穿,缓存穿透,缓存雪崩的原因和解决方案(或者说使用缓存的过程中有没有遇到什么问题,怎么解决的)
  • 关于数据包分片总长度字段的计算和MF标志位的判断
  • 手机网站建站流程网站建设卩金手指科杰
  • BuildingAI 用户信息弹出页面PRD
  • ​Oracle RAC灾备环境UNDO表空间管理终极指南:解决备库修改难题与性能优化实战​
  • 《uni-app跨平台开发完全指南》- 02 - 项目结构与配置文件详解
  • 【数据分析】基于R语言的废水微生物抗性分析与负二项回归模型建模
  • 深圳专业网站公司注册查询网站
  • k8s --- resource 资源
  • 神经网络之反射变换
  • k8s——pod详解2
  • 四层神经网络案例(含反向传播)
  • MySQL初阶学习日记(1)--- 数据库的基本操作
  • 【k8s】k8s的网络底层原理