当前位置: 首页 > news >正文

kafka学习笔记(三、消费者Consumer使用教程——使用实例及及核心流程源码讲解)

在这里插入图片描述


1.核心概念与架构

1.1.消费者与消费者组

Kafka消费者是订阅主题(Topic)并拉取消息的客户端实例,其核心逻辑通过KafkaConsumer类实现。消费者组(Consumer Group)是由多个逻辑关联的消费者组成的集合

  1. 核心规则
  • 同一分区独占性: 一个分区(Partition)只能被同一消费者组内的一个消费者消费,但不同消费者组可同时消费同一分区,实现广播模式。

  • 负载均衡与容错: 通过动态分区分配(Rebalance)实现消费者增减时的负载均衡,支持水平扩展和容错恢复。

  1. 消息投递模式
  • 点对点模式: 所有消费者属于同一组,消息被均匀分配给组内消费者,每条消息仅被消费一次。

  • 发布/订阅模式: 消费者属于不同组,消息广播到所有组,每个组独立消费全量数据。

消费者与消费者组的这种模式可以让整体的消费能力具备横向伸缩性,可以增加或减少消费者的个数来提高或降低整体的消费能力。注:消费者个数不能大于分区数。

1.2.核心类与组件

类名作用
KafkaConsumer消费者入口类,封装所有消费逻辑(线程不安全,需单线程操作)
ConsumerNetworkClient网络通信层,管理与 Broker 的 TCP 连接和请求发送/接收
Fetcher消息拉取核心逻辑,处理消息批次、反序列化、异常重试
SubscriptionState维护订阅状态(主题、分区分配、消费偏移量等)
ConsumerCoordinator消费者协调器,负责消费者组管理、心跳发送、分区再平衡(Rebalance)
Deserializer反序列化器接口,将字节数组转换为 Java 对象

2.基础消费者示例

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class BasicKafkaConsumer {public static void main(String[] args) {// 1. 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 当无提交offset时,从最早的消息开始props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 每秒自动提交// 2. 创建消费者实例try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 3. 订阅主题(支持多个主题和正则表达式)consumer.subscribe(Collections.singletonList("test-topic"));// 4. 持续拉取消息while (true) {// poll()参数为等待时间(毫秒),返回消息集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.printf("Received message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());}}}}
}

3.源码及核心功能解析

3.1.订阅主题和分区

3.1.1. 订阅主题

在上面#2中的代码实例中的注释2中,使用subscribe()方法订阅了一个主题。消费者通过 subscribe() 方法订阅一个或多个主题,Kafka会自动将主题的分区分配给消费者组内的各个消费者。这是最常见的消费方式,适用于动态分区分配场景。

核心特点:

  • 自动负载均衡: 消费者组内的消费者会分摊主题的分区(例如:3个分区被2个消费者分配为2+1)。
  • 支持分区再平衡(Rebalance): 当消费者加入或离开组时,Kafka会自动重新分配分区。
  • 依赖消费者组(Group ID): 通过 group.id 标识消费者组,同一组的消费者共享分区。

代码实例:

// 订阅单个主题
consumer.subscribe(Collections.singletonList("test-topic"));// 订阅多个主题
consumer.subscribe(Arrays.asList("topic1", "topic2"));// 使用正则表达式订阅(匹配所有以 "logs-" 开头的主题)
consumer.subscribe(Pattern.compile("logs-.*"));

适用场景:

  • 需要动态扩展消费者数量(水平扩展)。
  • 分区数量固定或动态变化时(如新增分区)。
  • 消费者需要自动故障恢复(如某个消费者宕机,其他消费者接管分区)。

分区分配策略:
通过 partition.assignment.strategy 配置:

  • RangeAssignor(默认): 按范围分配分区(可能导致不均衡)。

  • RoundRobinAssignor: 轮询分配分区(更均衡)。

  • StickyAssignor: 尽量保持分配粘性,减少Rebalance时的分区变动。

3.1.2.直接订阅分区

消费者通过 assign() 方法直接指定要消费的分区,绕过消费者组管理,需手动管理分区偏移量。

核心特点:

  • 无消费者组协调: 不依赖 group.id,消费者独立运行。
  • 完全手动控制: 需自行指定分区和起始偏移量。
  • 不支持自动再平衡: 分区增减或消费者故障时需手动处理。

代码示例:

// 分配指定主题的特定分区
TopicPartition partition0 = new TopicPartition("test-topic", 0);
TopicPartition partition1 = new TopicPartition("test-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));// 指定从某个offset开始消费
consumer.seek(partition0, 100); // 从offset=100开始
consumer.seekToBeginning(Collections.singleton(partition1)); // 从最早开始
consumer.seekToEnd(Collections.singleton(partition1)); // 从最新开始

适用场景:

  • 需要精确控制消费特定分区(如按业务逻辑分区)。
  • 消费者组管理不适用时(如单消费者消费所有分区)。
  • 测试或调试场景下手动控制消费位置。

3.1.3.取消订阅

适用KafkaConsumer中的unsubscribe()方法来取消主题和分区的订阅。
代码实例:

consumer.unsubscribe()

如果将subscribe(Collection)assign(Collection)中的集合参数设置为空集合,那么作用与unsubscribe()等同。

3.2.消息拉取

poll() 方法是消息消费的核心入口,其内部实现涉及 网络通信消息批量拉取分区状态管理反序列化位移提交 等多个关键步骤。以下从源码层面逐层解析 poll() 方法的完整流程:

3.2.1.poll()方法源码解析

// KafkaConsumer.poll() 方法定义
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 检查线程安全性(确保单线程调用)acquireAndEnsureOpen();try {// 2. 记录开始时间(用于超时控制)long startMs = time.milliseconds();long remainingMs = timeout.toMillis();// 3. 主循环:处理消息拉取、心跳、Rebalance 等do {// 3.1 发送心跳(维持消费者组活性)coordinator.poll(timeRemaining(remainingMs, startMs));// 3.2 拉取消息(核心逻辑在 Fetcher 中)Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {// 3.3 返回拉取到的消息(退出循环)return new ConsumerRecords<>(records);}// 3.4 计算剩余等待时间remainingMs = timeRemaining(remainingMs, startMs);} while (remainingMs > 0);return ConsumerRecords.empty();} finally {release();}
}

3.2.2.核心子流程解析

  1. 发送心跳(coordinator.poll()
    消费者通过 ConsumerCoordinator 维持与 GroupCoordinator 的心跳,确保消费者组活性。

    关键源码路径:
    ConsumerCoordinator.poll()pollHeartbeat()sendHeartbeatRequest()

    // ConsumerCoordinator.poll() 核心逻辑
    public void poll(long timeoutMs) {// 1. 处理未完成的异步请求(如心跳、JoinGroup 等)client.poll(timeoutMs, time.milliseconds(), new PollCondition() {@Overridepublic boolean shouldBlock() {return !coordinatorUnknown(); // 是否等待响应}});// 2. 检查是否需要触发 Rebalanceif (needRejoin()) {joinGroupIfNeeded(); // 触发 Rebalance}// 3. 周期性发送心跳pollHeartbeat(time.milliseconds());
    }
    

    心跳机制:

    • 心跳间隔:heartbeat.interval.ms 控制(默认 3 秒)。

    • 会话超时:session.timeout.ms 控制(默认 10 秒)。若超时未心跳,消费者被踢出组。

  2. 消息拉取(fetcher.fetchedRecords()
    Fetcher 负责从 Broker 拉取消息并解析。其内部维护一个 completedFetches 队列缓存已拉取但未处理的消息批次。

    源码路径:
    Fetcher.fetchedRecords()parseCompletedFetches()parseRecord()deserializeRecord()

    // Fetcher.fetchedRecords() 核心逻辑
    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {Map<TopicPartition, List<ConsumerRecord<K, V>>> drainedRecords = new HashMap<>();int recordsRemaining = maxPollRecords; // 单次 poll 最大记录数(默认 500)// 1. 遍历已完成的拉取请求(completedFetches)while (recordsRemaining > 0 && !completedFetches.isEmpty()) {CompletedFetch completedFetch = completedFetches.peek();// 2. 解析消息批次(MemoryRecords → RecordBatch)MemoryRecords records = completedFetch.records();for (RecordBatch batch : records.batches()) {// 3. 遍历批次内的每条消息for (Record record : batch) {// 4. 反序列化消息(调用 Deserializer)K key = keyDeserializer.deserialize(record.topic(), record.key());V value = valueDeserializer.deserialize(record.topic(), record.value());// 5. 构建 ConsumerRecordConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(record.topic(),record.partition(),record.offset(),record.timestamp(),TimestampType.forCode(record.timestampType()),record.serializedKeySize(),record.serializedValueSize(),key,value,record.headers(),record.leaderEpoch());// 6. 按分区缓存消息drainedRecords.computeIfAbsent(partition, p -> new ArrayList<>()).add(consumerRecord);recordsRemaining--;}}completedFetches.poll();}return drainedRecords;
    }
    

    关键设计:

    • 批量拉取: Kafka 按批次(RecordBatch)拉取消息,减少网络开销。
    • 零拷贝优化: MemoryRecords 直接操作 ByteBuffer,避免内存复制。
    • 反序列化延迟: 反序列化在消息实际被消费时进行(而非拉取时)。
  3. 位移提交(maybeAutoCommitOffsetsAsync()
    若启用自动提交(enable.auto.commit=true),消费者会在 poll() 后异步提交位移。

    源码路径:
    KafkaConsumer.maybeAutoCommitOffsetsAsync()commitOffsetsAsync()

    // KafkaConsumer 自动提交逻辑
    private void maybeAutoCommitOffsetsAsync() {if (autoCommitEnabled) {// 1. 计算距离上次提交的时间间隔long now = time.milliseconds();if (now - lastAutoCommitTime >= autoCommitIntervalMs) {// 2. 提交所有分区的位移commitOffsetsAsync(subscriptions.allConsumed());lastAutoCommitTime = now;}}
    }// 异步提交位移
    private void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {log.error("Async auto-commit failed", exception);}}});
    }
    

    自动提交配置:

    • 提交间隔:auto.commit.interval.ms 控制(默认 5 秒)。

    • 提交内容: 提交所有已消费分区的 offset + 1(确保至少一次语义)。

3.2.3.网络通信

所有网络请求(拉取消息、心跳、位移提交)由 ConsumerNetworkClient 管理,其核心机制为 异步请求-响应模型

  1. 请求发送

    // ConsumerNetworkClient.send() 方法
    public RequestFuture<ClientResponse> send(Node node,AbstractRequest.Builder<?> requestBuilder,long timeoutMs
    ) {// 1. 构建请求对象long now = time.milliseconds();ClientRequest clientRequest = client.newClientRequest(node.idString(),requestBuilder,now,true,timeoutMs,null);// 2. 将请求加入队列(非阻塞)unsent.put(node, clientRequest);return clientRequest.future();
    }
    
  2. 响应处理

    // ConsumerNetworkClient.poll() 方法
    public void poll(long timeout, long now, PollCondition pollCondition) {// 1. 发送所有未完成的请求sendAllRequests();// 2. 轮询网络通道(Selector),接收响应client.poll(timeout, now, new PollCondition() {@Overridepublic boolean shouldBlock() {return pollCondition.shouldBlock() || hasPendingRequests();}});// 3. 处理已完成的响应handleCompletedSends();handleCompletedReceives();handleDisconnections();handleConnections();handleTimedOutRequests();
    }
    

    关键机制:

    • 请求队列: unsent 缓存待发送的请求。

    • 响应回调: 每个请求关联一个 RequestFuture ,在响应到达时触发回调。

3.3.反序列化

3.3.1.核心接口

Kafka 的反序列化通过 org.apache.kafka.common.serialization.Deserializer 接口实现,所有反序列化器必须实现该接口。

接口定义:

public interface Deserializer<T> extends Closeable {// 配置反序列化器(从消费者配置中读取参数)void configure(Map<String, ?> configs, boolean isKey);// 核心反序列化方法T deserialize(String topic, byte[] data);// 反序列化方法(带Headers)default T deserialize(String topic, Headers headers, byte[] data) {return deserialize(topic, data);}// 关闭资源@Overridevoid close();
}

3.3.2.源码级执行流程

  1. 消费者初始化阶段
    当创建 KafkaConsumer 时,会通过 ConsumerConfig 加载配置,并初始化 key.deserializervalue.deserializer

    关键源码路径:
    KafkaConsumer#initialize()ConsumerConfig#getConfiguredInstance()

    // 源码片段:ConsumerConfig.java
    public static <T> T getConfiguredInstance(String key, Class<T> t) {String className = getString(key);try {Class<?> c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader());return Utils.newInstance(c, t); // 反射创建实例} catch (ClassNotFoundException e) {// 处理异常...}
    }// KafkaConsumer 构造函数中初始化反序列化器
    this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
    this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);// 调用 configure() 方法
    Map<String, Object> configs = config.originals();
    this.keyDeserializer.configure(configs, true);   // 标记为 key 反序列化器
    this.valueDeserializer.configure(configs, false); // 标记为 value 反序列化器
  2. 消息拉取与反序列化
    当调用 poll() 方法拉取消息后,Kafka 会遍历每条消息,使用反序列化器将字节数组转换为 Java 对象。

    关键源码路径:
    KafkaConsumer#poll()Fetcher#parseRecord()ConsumerRecord#toRecord()

    // 源码片段:ConsumerRecord.java
    public static <K, V> ConsumerRecord<K, V> toRecord(/* 参数省略 */) {// 反序列化 KeyK key = keyDeserializer != null ? keyDeserializer.deserialize(topic, headers, keyBytes) : null;// 反序列化 ValueV value = valueDeserializer != null ? valueDeserializer.deserialize(topic, headers, valueBytes) : null;return new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,key, value, headers, leaderEpoch);
    }
  3. 线程安全与生命周期

    • 线程安全: 每个 KafkaConsumer 实例持有独立的反序列化器实例,因此反序列化器无需考虑线程安全。

    • 生命周期: 反序列化器的 close() 方法在消费者关闭时被调用。

3.3.3.自定义反序列化器实现

  1. 实现 Deserializer 接口
    假设需要反序列化 JSON 数据:
    public class JsonDeserializer<T> implements Deserializer<T> {private ObjectMapper objectMapper = new ObjectMapper();private Class<T> targetType;@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 从配置中获取目标类型(如 User.class)String configKey = isKey ? "key.type" : "value.type";String typeName = (String) configs.get(configKey);try {this.targetType = (Class<T>) Class.forName(typeName);} catch (ClassNotFoundException e) {throw new KafkaException("Class not found: " + typeName, e);}}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) return null;try {return objectMapper.readValue(data, targetType);} catch (IOException e) {throw new SerializationException("Error deserializing JSON", e);}}@Overridepublic void close() {// 清理资源(可选)}
    }
    
  2. 消费者配置
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
    props.put("value.type", "com.example.User"); // 自定义参数传递
    

3.4.消费者拦截器

Kafka 支持通过 ConsumerInterceptor 拦截消息处理流程:

public interface ConsumerInterceptor<K, V> extends Configurable {// 在消息返回给用户前拦截ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);// 在位移提交前拦截void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
}
  • KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行定制化处理。
  • KafkaConsumer会在提交完消费者位移之后调用拦截器的onCommit(),可以使用和这个方法来记录跟踪所提交的位移信息。

消费者也有连接链的概念,跟生产者一样也是按照inerceptor.classes参数配置连接链的执行顺序。

配置方式:

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.MyConsumerInterceptor");

相关文章:

  • Docker 部署项目
  • 挡片/测试晶圆(Dummy Wafer)通俗解析
  • UI自动化测试的革新,新一代AI工具MidScene.js实测!
  • K8S集群主机网络端口不通问题排查
  • 滚珠导轨:电子制造“纳米级”精度的运动基石
  • SQLite 中文写入失败问题总结
  • SpringMVC核心原理与前后端数据交互机制详解
  • 如何做好一个决策:基于 Excel的决策树+敏感性分析应用
  • 如何做好一个决策:基于 Excel的决策树+敏感性分析应用(针对多个变量)
  • 罗马-华为
  • #Js篇:两个前端应用通过postMessage传递file对像
  • Vue3实现提示文字组件
  • Wirtinger Flow算法的matlab实现和python实现
  • Cesium 8 ,在 Cesium 上实现雷达动画和车辆动画效果,并控制显示和隐藏
  • 【决策分析】基于Excel的多变量敏感性分析解决方案
  • 【C++】“多态”特性
  • 首发!PPIO派欧云上线DeepSeek-R1-0528-Qwen3-8B蒸馏模型
  • 使用微软最近开源的WSL在Windows上优雅的运行Linux
  • 非线性声学计算与强化学习融合框架:突破复杂环境人机交互的新技术
  • 【计算机网络】第2章:应用层—DNS
  • wordpress黑糖主题/seo站长查询
  • .net开发微信网站/品牌推广与传播方案
  • 图片存放网站做链接/技能培训有哪些
  • 三星手机网上商城/中国网民博客 seo
  • 成都专业手机网站建设服务/seo排名优化推广报价
  • 承德网站建设费用/简述seo