聊聊 Pulsar:Consumer 源码解析
一、前言
Apache Pulsar 是一款开源的分布式消息系统,以其多租户、高性能、存储与计算分离等特性在现代消息队列领域占据一席之地。作为消息队列的重要组成部分,Consumer(消费者)承担了从 Broker 获取消息并处理的关键角色,其设计直接关系到系统的吞吐量、延迟以及可靠性。理解 Pulsar Consumer 的源码实现,不仅能帮助我们更好地掌握消息消费的流程和机制,还能为性能优化、故障排查及二次开发提供指导。
在本文中,我们将深入分析 Pulsar Consumer 的源码结构与实现细节,从创建流程、消息拉取、消息确认到负载均衡等方面全面剖析其工作原理,揭示其背后的设计思想与工程实践。无论是初学者还是有经验的开发者,相信都能通过这篇文章获得关于 Pulsar Consumer 的新认知。
二、Consumer测试类
老规矩,我们先以Consumer消费消息,来跟进Consumer的相关源码流程。
/*** @author: 微信公众号【老周聊架构】*/
public class PulsarClientTest {public static void main(String[] args) throws PulsarClientException {PulsarClient client = PulsarClient.builder().listenerThreads(1).ioThreads(1).serviceUrl("pulsar://127.0.0.1:6650").build();Consumer<byte[]> consumer = client.newConsumer().topic("my-topic").subscriptionName("my-sub").subscribe();Message<byte[]> message = consumer.receive();System.out.println("receive message" + message.getData());}
}
从上面的代码可以看出 Pulsar 为用户提供了非常简洁方便的 API,在使用时,只需要如下两步:
- 创建 Pulsar Consumer 实例
- 调用 receive 接口消费数据
三、Pulsar Consumer 实例化与消费数据
3.1 消费者的创建
创建 PulsarClient 时实际上是创建了 PulsarClientImpl 对象,其中 newConsumer 方法是创建一个 builder 用于链式调用:
其中 builder 的方法就不仔细阅读了,主要是对参数进行必要的验证后设置相应字段,比如必要的是主题名和订阅名,都保存在 ClientConfigurationData conf 中。
主要跟到 org.apache.pulsar.client.impl.ConsumerBuilderImpl#subscribeAsync
上述代码小结:
这段代码的主要功能是通过一系列配置验证和处理步骤来确保消费者配置的正确性,并根据配置动态生成重试和死信队列的主题名称。最终,它会创建并返回一个 Consumer 实例。具体来说:
- 配置验证:确保主题名称或模式、订阅名称等必要参数已正确设置。
- 重试和死信队列处理:根据配置生成重试和死信队列的主题名称,并进行兼容性检查。
- 创建消费者:根据是否有拦截器列表选择不同的创建方式,并返回创建好的消费者实例。
继续跟进 org.apache.pulsar.client.impl.PulsarClientImpl#subscribeAsync(org.apache.pulsar.client.impl.conf.ConsumerConfigurationData, org.apache.pulsar.client.api.Schema, org.apache.pulsar.client.impl.ConsumerInterceptors)
简单起见,我们这只看单主题订阅的情况:
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {// 调用 preProcessSchemaBeforeSubscribe 方法预处理模式(schema)return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())// 使用 thenCompose 将返回的 CompletableFuture 进行链式调用.thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
}
preProcessSchemaBeforeSubscribe方法:
doSingleTopicSubscribeAsync方法:
多分区订阅和多主题订阅本质上是一样的,都是使用 MultiTopicsConsumerImpl 管理多个主题(因为 Pulsar 中分区只不过是一个包含后缀 -partition- 的主题)。
这里我们还是来关注单分区订阅,ConsumerImpl.newConsumerImpl(…) 只是将参数原封不动传给其构造方法,构造方法包括了 consumer 内部一些字段的初始化。
我们还是只关注重点,那就是 Consumer 和 Broker 如何交互的?实际上这部分逻辑在构造方法的最后面,调用的 grabCnx 方法:
void grabCnx() {this.connectionHandler.grabCnx();
}
3.2 Consumer 与 Broker 连接的建立
我们先来看下 connectionHandler 的构造:
// 创建一个新的 ConnectionHandler 实例,并将其赋值给当前类的 connectionHandler 属性
this.connectionHandler = new ConnectionHandler(this, // 当前类的实例,作为上下文或回调传递给 ConnectionHandler// 使用 BackoffBuilder 构建一个回退策略对象new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), // 从客户端配置中获取初始回退时间间隔(纳秒)TimeUnit.NANOSECONDS // 指定时间单位为纳秒).setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), // 从客户端配置中获取最大回退时间间隔(纳秒)TimeUnit.NANOSECONDS // 指定时间单位为纳秒).setMandatoryStop(0, TimeUnit.MILLISECONDS) // 设置强制停止时间为 0 毫秒,意味着没有强制停止时间.create(), // 根据上述配置创建具体的回退策略对象this // 再次传递当前类的实例,可能是为了在 ConnectionHandler 中使用当前类的某些方法或属性
);
可以看出,当前代码片段,只体现了固定时间间隔的退避策略,我发现其它很多中间件喜欢用指数退避逻辑。那么问题来了,如果这里要实现指数退避该怎么操作呢?
可以考虑修改 BackoffBuilder 的配置或使用支持指数退避的库。例如,可以添加类似 .setMultiplier(2) 的方法来指定每次重试时等待时间的增长因子。
可以看出 ConsumerImpl 与 ProducerImpl 一样去构造connectionHandler的实例,意思就是connectionHandler连接的组件在生产者和消费者中都持有,connectionHandler相当于两者的通信门面。
这段grabCnx()代码同样也适用于生产者以及多主题消费者,它们对应的类都实现了 Connection 接口,只需要实现各自的回调即可。
3.3 连接成功的回调
可以看到连接成功后,主要是发送 CommandSubscribe(订阅命令),Broker 处理成功后,consumer 就处于 Ready 状态,并且会发送 Flow 请求携带 permits 为接收队列大小的一半再发送给Broker,从这里看到Flow指令消息也只有consumer_id、messagePermits,消费者只给了Broker消费者id和消息许可的最大消息数量,从这里也可以看出,Pulsar的Consumer是push模式,消费者只告诉Broker许可的最大消息数量,剩下的交给Broker去推。
3.4 Broker 处理 Consumer 指令消息
Broker 对 TCP 协议的处理代码位于 org.apache.pulsar.broker.service.ServerCnx
对于 Consumer 而言,比较独有的就是 CommandSubscribe 和 CommandFlow。
3.4.1 CommandSubscribe 处理
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {/*** 1.参数验证与初始化* 2.权限检查* 3.提取订阅参数* 这三步的代码省略*/// 检查是否允许操作主题CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(topicName,subscriptionName,TopicOperation.CONSUME);isAuthorizedFuture.thenApply(isAuthorized -> {if (isAuthorized) {if (log.isDebugEnabled()) {log.debug("[{}] Client is authorized to subscribe with role {}",remoteAddress, getPrincipal());}log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);try {// 元数据验证Metadata.validateMetadata(metadata);} catch (IllegalArgumentException iae) {final String msg = iae.getMessage();commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);return null;}CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();// 检查是否已有相同ID的消费者CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,consumerFuture);// 已经创建过 broker consumerif (existingConsumerFuture != null) {if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {Consumer consumer = existingConsumerFuture.getNow(null);log.info("[{}] Consumer with the same id is already created:"+ " consumerId={}, consumer={}",remoteAddress, consumerId, consumer);// 创建完成则直接发送成功的响应commandSender.sendSuccessResponse(requestId);return null;} else {// There was an early request to create a consumer with same consumerId. This can happen// when// client timeout is lower the broker timeouts. We need to wait until the previous// consumer// creation request either complete or fails.log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);// 之前已经有相同 consumerId 的 subscribe 请求,这是因为 client timeout 小于 broker timeout,// 因此 client 发生重试,此时需要等待之前的 consumer future 完成。ServerError error = null;if (!existingConsumerFuture.isDone()) {// 前一个 subscribe 请求还未完成,直接返回 ServiceNotReady。error = ServerError.ServiceNotReady;} else {// 前一个 subscribe 请求异常完成,则返回同样的错误码并将其移除 cache 避免下次重新进入此分支。error = getErrorCode(existingConsumerFuture);consumers.remove(consumerId, existingConsumerFuture);}commandSender.sendErrorResponse(requestId, error,"Consumer is already present on the connection");return null;}}// 判断是否自动创建 topic,forceTopicCreation 为 client 填充的字段(proto字段定义默认为true)// service 对象则是通过配置或者 system topic 的配置来判断是否允许自动创建 topicboolean createTopicIfDoesNotExist = forceTopicCreation&& service.isAllowAutoTopicCreation(topicName.toString());// 当前 broker 获取或创建 Topic 对象service.getTopic(topicName.toString(), createTopicIfDoesNotExist).thenCompose(optTopic -> {if (!optTopic.isPresent()) {return FutureUtil.failedFuture(new TopicNotFoundException("Topic " + topicName + " does not exist"));}Topic topic = optTopic.get();// 对于 durable cursor 而言,如果该订阅不存在且不允许订阅自动创建,subscribe 会失败boolean rejectSubscriptionIfDoesNotExist = isDurable&& !service.isAllowAutoSubscriptionCreation(topicName.toString())&& !topic.getSubscriptions().containsKey(subscriptionName);if (rejectSubscriptionIfDoesNotExist) {return FutureUtil.failedFuture(new SubscriptionNotFoundException("Subscription does not exist"));}// 若带有 schema 则先检查 schema 兼容性,最后都会调用 Topic#subscribeif (schema != null) {return topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,subType, priorityLevel, consumerName, isDurable,startMessageId, metadata,readCompacted, initialPosition, startMessageRollbackDurationSec,isReplicated, keySharedMeta));} else {return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,subType, priorityLevel, consumerName, isDurable,startMessageId, metadata, readCompacted, initialPosition,startMessageRollbackDurationSec, isReplicated, keySharedMeta);}}).thenAccept(consumer -> {if (consumerFuture.complete(consumer)) {log.info("[{}] Created subscription on topic {} / {}",remoteAddress, topicName, subscriptionName);commandSender.sendSuccessResponse(requestId);} else {// The consumer future was completed before by a close command// 如果 consumerFuture 已经完成,则当前 consumer 是 client timeout 重新创建的 consumer// 此时需要关闭 consumer 并移除这个 futuretry {consumer.close();log.info("[{}] Cleared consumer created after timeout on client side {}",remoteAddress, consumer);} catch (BrokerServiceException e) {log.warn("[{}] Error closing consumer created"+ " after timeout on client side {}: {}",remoteAddress, consumer, e.getMessage());}consumers.remove(consumerId, consumerFuture);}}).exceptionally(exception -> {if (exception.getCause() instanceof ConsumerBusyException) {if (log.isDebugEnabled()) {log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer"+ " is already connected: {}",remoteAddress, topicName, subscriptionName,exception.getCause().getMessage());}} else if (exception.getCause() instanceof BrokerServiceException) {log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",remoteAddress, topicName, subscriptionName,consumerId, exception.getCause().getMessage());} else {log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",remoteAddress, topicName, subscriptionName,consumerId, exception.getCause().getMessage(), exception);}// If client timed out, the future would have been completed by subsequent close.// Send error// back to client, only if not completed already.if (consumerFuture.completeExceptionally(exception)) {commandSender.sendErrorResponse(requestId,BrokerServiceException.getClientErrorCode(exception),exception.getCause().getMessage());}consumers.remove(consumerId, consumerFuture);return null;});} else {String msg = "Client is not authorized to subscribe";log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));}return null;}).exceptionally(ex -> {logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());return null;});
}
小结:
处理 subscribe 请求核心调用是:
- BrokerService#getTopic:获取当前 Broker 所拥有(own)的 Topic 对象
- Topic#subscribe:在 Topic 对象中创建对应的订阅,并得到 Consumer 对象
其中 Topic 和 Consumer 是 Broker 端对主题和消费者的抽象,负责管理对应的资源。均位于 org.apache.pulsar.broker.service
包下。
可以看出,这里有持久化主题和非持久化主题,这里我们重点看 PersistentTopic#subscribe
。
/*** 订阅主题并创建消费者** @param cnx 客户端连接对象* @param subscriptionName 订阅名称* @param consumerId 消费者ID* @param subType 订阅类型(如 Failover, Exclusive 等)* @param priorityLevel 消费者的优先级* @param consumerName 消费者名称* @param isDurable 是否为持久化订阅* @param startMessageId 开始消费的消息ID* @param metadata 元数据* @param readCompacted 是否读取压缩后的消息* @param initialPosition 初始位置(最早或最晚)* @param startMessageRollbackDurationSec 消息回滚时间* @param replicatedSubscriptionStateArg 是否启用复制订阅状态* @param keySharedMeta Key-Shared 元数据* @return 返回一个 CompletableFuture<Consumer>,表示订阅操作的结果*/
@Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,SubType subType, int priorityLevel, String consumerName,boolean isDurable, MessageId startMessageId,Map<String, String> metadata, boolean readCompacted,InitialPosition initialPosition,long startMessageRollbackDurationSec,boolean replicatedSubscriptionStateArg,KeySharedMeta keySharedMeta) {// 如果启用了读取压缩消息,但订阅类型不是 Failover 或 Exclusive,则返回失败if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {return FutureUtil.failedFuture(new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));}// 检查主题的命名空间所有权return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {/*** 1.如果启用了复制订阅状态,但代理配置中未启用,则禁用复制订阅状态* 2.如果订阅类型是 Key_Shared,但代理配置中未启用,则返回失败* 3.检查主题是否支持指定的订阅类型* 4.检查订阅名称是否为空* 5.检查消费者是否支持批量消息* 6.检查订阅名称是否为保留名称* 以上检查代码省略*/// 检查订阅速率限制// 用连接地址作为 key,检查 consumer 对应的 RateLimiter 是否存在,限制重连次数,因为重连的连接地址是相同的。// 参考 https://github.com/apache/pulsar/pull/2977if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer)|| !subscribeRateLimiter.get().tryAcquire(consumer))) {log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));return FutureUtil.failedFuture(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));}}lock.readLock().lock();try {// 当 topic 被删除或关闭,或者 producer 写消息失败时都会标记为 fence 状态,此时禁止订阅。if (isFenced) {log.warn("[{}] Attempting to subscribe to a fenced topic", topic);return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));}// 实际上是增加 usageCount(连接的 producer 和 consumer 总数),传参数是为了打印 debug 日志handleConsumerAdded(subscriptionName, consumerName);} finally {lock.readLock().unlock();}// 根据是否持久化订阅,获取相应的订阅对象CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,replicatedSubscriptionState): getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,startMessageRollbackDurationSec);// 对于持久化订阅,可以获取最大的未确认的消息int maxUnackedMessages = isDurable? getMaxUnackedMessagesOnConsumer(): 0;CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {// 创建 Consumer 对象并加入到对应的订阅中Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,readCompacted, initialPosition, keySharedMeta, startMessageId);return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {checkBackloggedCursors();if (!cnx.isActive()) { // 连接已经断开,则需要关闭 consumertry {consumer.close();} catch (BrokerServiceException e) {if (e instanceof ConsumerBusyException) {log.warn("[{}][{}] Consumer {} {} already connected",topic, subscriptionName, consumerId, consumerName);} else if (e instanceof SubscriptionBusyException) {log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());}// 减少使用计数decrementUsageCount();return FutureUtil.failedFuture(e);}if (log.isDebugEnabled()) {log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,consumer.consumerName(), currentUsageCount());}decrementUsageCount();return FutureUtil.failedFuture(new BrokerServiceException("Connection was closed while the opening the cursor "));} else {// 连接仍存活,则继续检查复制订阅的状态,至此完成整个订阅。checkReplicatedSubscriptionControllerState();log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);return CompletableFuture.completedFuture(consumer);}});});future.exceptionally(ex -> {decrementUsageCount();if (ex.getCause() instanceof ConsumerBusyException) {log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,consumerName);} else if (ex.getCause() instanceof SubscriptionBusyException) {log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage());} else {log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);}return null;});return future;});
}
核心流程:
- 获取 Subscription(Broker 对订阅的抽象),若不存在则创建。
- 创建 Consumer(Broker 对消费者的抽象)后加入订阅。
小结:
1. 持久化与非持久化订阅的区别
- 持久化订阅(PersistentSubscription):
- Cursor 管理:持久化订阅会打开一个 cursor,该 cursor 对应于 PersistentTopic 内部的 ledger。如果 cursor 不存在,则会创建一个新的 cursor。
- 消息 ID 处理:在创建持久化订阅时,如果 cursor 已存在,则直接使用现有的 cursor,而忽略掉 CommandSubscribe 中提供的初始消息 ID (MessageId)。这意味着持久化订阅依赖于 cursor 的状态来确定从哪里开始消费消息。
- 非持久化订阅(NonPersistentSubscription):
- 内存管理:非持久化订阅不会创建或打开 cursor,而是直接在内存中维护消息 ID (MessageIdImpl)。
- 消息 ID 处理:非持久化订阅会根据 CommandSubscribe 中提供的初始消息 ID 来确定从哪里开始消费消息。由于没有持久化的 cursor,所有状态都保存在内存中。
2.消费者加入订阅的过程
当消费者加入订阅时,系统会根据订阅类型创建相应的分发器(Dispatcher)。以默认的 Exclusive 订阅为例:
- 创建 Dispatcher:
- 根据订阅类型(如 Exclusive、Failover、Shared 等),创建对应的 Dispatcher 实例。Dispatcher 负责管理和分发消息给消费者。
- 添加 Consumer:
- 将新创建的 Consumer 添加到订阅的 Dispatcher 中。这一步确保了消费者能够接收到订阅中的消息。
- 如果订阅已经存在其他消费者,并且订阅类型不允许多个消费者(如 Exclusive),则会拒绝新的消费者加入,并返回相应的错误信息。
注:https://github.com/apache/pulsar/pull/9056 引入了 streaming dispatcher。
将 consumer 加入 dispatcher:
订阅里实际负责消息分发的就是 dispatcher。
3.4.2 CommandFlow 处理
直接定位到Netty读取消息的位置: org.apache.pulsar.common.protocol.PulsarDecoder#channelRead
指定handleFlow处理器处理:org.apache.pulsar.broker.service.ServerCnx#handleFlow
然后递交给 Consumer:
可见,更新完 Consumer 内部的 messagePermits 后,交由对应的 Subscription 对象处理。这里仅看 PersistentSubscription 的实现:
仅仅是传递给 dispatcher,因此实际的处理是由 dispatcher 完成的。
四、Consumer整体流程UML图
五、总结
5.1 Pulsar 消费者订阅 Topic 的流程
5.1.1 Client 端
- 建立连接
- 客户端(Client)首先与 Broker 建立连接。这一部分和生产者的连接过程是通用的。
- 发送 Subscribe 命令
- 连接成功后,在回调中客户端会发送一个
subscribe
命令给 Broker,注册自己为该 Topic 的消费者。
- 连接成功后,在回调中客户端会发送一个
- 发送 Flow 请求
- 注册成功后,客户端会发送一个
Flow
请求,携带permits
参数。permits
的值通常是内部缓冲区大小的一半(对于无缓冲区的零队列消费者例外)。这个请求告知 Broker 客户端可以接收的消息数量。
- 注册成功后,客户端会发送一个
5.1.2 Broker 端
- 处理 Subscribe 命令
- Broker 接收到
subscribe
命令后,在对应的 Topic 中创建或查找已有的 Subscription。 - 每个 Topic 可以有多个订阅(Subscription),每个订阅可以对应多个消费者(Consumer)。
- Broker 接收到
- 创建 Dispatcher
- 根据消费者的订阅类型(如
Exclusive
、Failover
、Shared
等),Broker 会创建相应的Dispatcher
实例。 Dispatcher
负责管理和分发消息给消费者。
- 根据消费者的订阅类型(如
- 维护 Cursor
- 订阅(Subscription)主要负责维护消费进度(Cursor)。对于持久化订阅(PersistentSubscription),会创建一个持久化的
cursor
;对于非持久化订阅(NonPersistentSubscription),则在内存中管理消费进度。
- 订阅(Subscription)主要负责维护消费进度(Cursor)。对于持久化订阅(PersistentSubscription),会创建一个持久化的
- 处理 Flow 请求
- 当 Broker 收到
Flow
请求时,它会根据permits
的值调整Dispatcher
的行为,确保按照客户端的需求发送消息。
- 当 Broker 收到
5.2 Push vs Pull 模型
5.2.1 Kafka 的 Pull 模型
- FETCH 请求:
- Kafka 的消费者通过发送
FETCH
请求给 Broker 来拉取消息。 - Broker 在
FETCH
响应中返回读取的消息。 - 流量控制由客户端(consumer)发起,即客户端决定何时拉取消息以及拉取多少消息。
- Kafka 的消费者通过发送
5.2.2 Pulsar 的 Push 模型
- Flow 请求:
- Pulsar 的消费者发送
Flow
请求,告知 Broker 自己可以缓存多少条消息。 - Broker 根据
Flow
请求中的permits
参数灵活定制Dispatcher
的行为,并主动推送消息给客户端。 - 流量控制由服务端(Broker)进行,即 Broker 决定何时推送消息以及推送多少消息。
- Pulsar 的消费者发送
5.2.3 Kafka与Pulsar消费者模型对比
- Kafka:采用 Pull 模型,流量控制由客户端发起,客户端通过
FETCH
请求拉取消息。 - Pulsar:采用 Push 模型,流量控制由服务端(Broker)进行,客户端通过
Flow
请求告知 Broker 缓存能力,Broker 主动推送消息。
这种设计使得 Pulsar 的 Push 模型能够更好地适应高吞吐量和低延迟的场景,因为服务端可以根据客户端的反馈灵活调整消息推送策略。