当前位置: 首页 > news >正文

Spring for Apache Pulsar->Reactive Support->Message Consumption

1. @ReactivePulsarListener

对于Pulsar消费者,我们建议最终用户应用程序使用ReactivePulsarListener注释。要使用ReactivePulsarListener,您需要使用@EnableReactivePulse注释。当您使用Spring Boot支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层Pulsar消费者)。

让我们重新审视我们在快速浏览部分看到的ReactivePulse Listener代码片段:

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {System.out.println(message);return Mono.empty();
}

监听器方法返回一个Mono<Void>,以表示消息是否已成功处理。Mono.empty()表示成功(确认),Mono.error()表示失败(否定确认)。

您还可以进一步简化此方法:

@ReactivePulsarListener
Mono<Void> listen(String message) {System.out.println(message);return Mono.empty();
}

在这种最基本的形式中,当没有直接提供主题时,使用主题解析过程来确定目标主题。同样,当@ReactivePulsarListener注释上未提供subscriptionName时,将使用自动生成的订阅名称。

在前面显示的ReactivePulse Listener方法中,我们以String形式接收数据,但我们没有指定任何模式类型。在内部,该框架依赖Pulsar的模式机制将数据转换为所需的类型。

框架检测到您期望的String类型,然后根据该信息推断模式类型,并将该模式提供给消费者。该框架对所有原始类型都进行了这种推理。对于所有非基元类型,默认模式假定为JSON。如果复杂类型使用JSON以外的任何内容(如AVRO或KEY_VALUE),则必须使用schemaType属性在注释上提供模式类型。

此示例显示了如何从主题中使用复杂类型:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {System.out.println(message);return Mono.empty();
}

让我们再看看我们可以消费的几种方式。

此示例直接使用Pulsar消息:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {System.out.println(message.getValue());return Mono.empty();
}

此示例使用包装在Spring消息信封中的记录:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {System.out.println(message.getPayload());return Mono.empty();
}

1.1. Streaming

以上所有内容都是逐一消费单个记录的示例。然而,使用Reactive的一个令人信服的原因是具有背压支持的流媒体功能。

以下示例使用ReactivePulsarListener来消耗POJO流:

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {return messages.doOnNext((msg) -> System.out.println("Received: " + msg.getValue())).map(MessageResult::acknowledge);
}

在这里,我们以脉冲星消息流的形式接收记录。此外,要在ReactivePulse Listener级别启用流消费,您需要将注释上的流属性设置为true。

监听器方法返回一个Flux<MessageResult<Void>>,其中每个元素表示一个已处理的消息,并保存消息id、值以及是否被确认。MessageResult有一组静态工厂方法,可用于创建相应的MessageResult实例。

根据Flux中消息的实际类型,框架试图推断要使用的模式。如果它包含复杂类型,您仍然需要在ReactivePulsarListener上提供schemaType。

以下侦听器使用具有复杂类型的Spring消息传递消息信封:

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {return messages.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload())).map(MessageUtils::acknowledge);
}

监听器方法返回一个Flux<MessageResult<Void>>,其中每个元素表示一个已处理的消息,并保存消息id、值以及是否被确认。SpringMessageUtils有一组静态工厂方法,可用于从Spring消息创建相应的MessageResult实例。MessageUtils为Spring消息提供了与MessagResult上的工厂方法集为Pulsar消息提供的功能相同的功能。

不支持使用org.apache.pulser.client.api。@ReactivePulsarListener中的消息<T>

1.2. Configuration - Application Properties

监听器依赖ReactivePulsarConsumerFactory来创建和管理它用来消费消息的底层Pulsar消费者。Spring Boot提供了这个消费者工厂,您可以通过指定Spring.pulser.consumer.*应用程序属性来进一步配置它。

1.3. Generic records with AUTO_CONSUME

如果没有机会提前知道Pulsar主题的模式类型,您可以使用AUTO_CONSUME模式类型来使用通用记录。在这种情况下,主题使用与主题关联的模式信息将消息反序列化为GenericRecord对象。

要使用通用记录,请设置schemaType=schemaType。在@ReactivePulsarListener上设置AUTO_CONSUME,并使用GenericRecord类型的Pulsar消息作为消息参数,如下所示。

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {GenericRecord record = message.getValue();record.getFields().forEach((f) ->System.out.printf("%s = %s%n", f.getName(), record.getField(f)));return Mono.empty();
}

GenericRecord API允许访问字段及其关联值

1.4. Consumer Customization

您可以指定ReactivePulsarListenerMessageConsumerBuilderCustomizer来配置底层Pulsar消费者构建器,该构建器最终构建了监听器用于接收消息的消费者。

请谨慎使用,因为这可以完全访问消费者构建器,调用其某些方法(如create)可能会产生意想不到的副作用。

例如,以下代码显示了如何将订阅的初始位置设置为主题上最早的消息。

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {System.out.println(message);return Mono.empty();
}@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

如果您的应用程序只注册了一个@ReactivePulsarListener和一个ReactivePulse ListenerMessageConsumerBuilderCustomizer bean,则会自动应用自定义程序。

您还可以使用定制器向消费者构建器提供Pulsar消费者的直接属性。如果您不想使用前面提到的Boot配置属性,或者有多个配置不同的ReactivePulse Listener方法,这很方便。

以下自定义程序示例使用Pulsar直接消费者属性:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties

2. Specifying Schema Information

如前所述,对于Java原语,Spring for Apache Pulsar框架可以推断出在ReactivePulsarListener上使用的正确模式。对于非基元类型,如果没有在注释上明确指定Schema,Spring For Apache Pulsar框架将尝试构建Schema。JSON类型。

目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE和INLINE编码。

2.1. Custom Schema Mapping

作为在ReactivePulse Listener上为复杂类型指定模式的替代方案,可以使用类型的映射配置模式解析器。这消除了在侦听器上设置模式的需要,因为框架使用传入消息类型咨询解析器。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings属性进行配置。以下示例使用application.yml分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息类型是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上述属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用模式解析器定制器分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息类型的默认模式信息的另一种选择是用@PulsarMessage注释标记消息类。可以通过注释上的schemaType属性指定架构信息。

以下示例将系统配置为在生成或使用Foo类型的消息时使用JSON作为默认模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在监听器上设置模式,例如:

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {System.out.println(user);return Mono.empty();
}

3. Message Listener Container Infrastructure

在大多数情况下,我们建议直接使用ReactivePulsarListener注释从Pulsar主题中消费,因为该模型涵盖了广泛的应用程序用例。然而,了解ReactivePulsarListener的内部工作原理非常重要。

当您使用Spring for Apache Pulsar时,消息监听器容器是消息消费的核心。ReactivePulsarListener在幕后使用消息侦听器容器基础设施来创建和管理底层Pulsar消费者。

3.1. ReactivePulsarMessageListenerContainer

此消息侦听器容器的合约是通过ReactivePulsarMessageListenerContainer提供的,其默认实现创建了一个反应式Pulsar消费者,并连接了一个使用创建的消费者的反应式消息管道。

3.2. ReactiveMessagePipeline

管道是底层Apache Pulsar响应式客户端的一个功能,它以响应式方式接收数据,然后将其移交给提供的消息处理程序。反应式消息侦听器容器实现要简单得多,因为管道处理了大部分工作。

3.3. ReactivePulsarMessageHandler

“监听器”方面由ReactivePulse MessageHandler提供,它提供了两种实现:

ReactivePulsarOneByOneMessageHandler-逐一处理单个消息

ReactivePulsarStreamingHandler-通过Flux处理多条消息

如果在直接使用侦听器容器时未指定主题信息,则使用ReactivePulse listener使用的相同主题解析过程,但省略了“消息类型默认”步骤。

3.4. Handling Startup Failures

消息侦听器容器在刷新应用程序上下文时启动。默认情况下,启动过程中遇到的任何故障都会被重新抛出,应用程序将无法启动。您可以使用相应容器属性上的StartupFailurePolicy调整此行为。

可用选项包括:

Stop(默认)-记录并重新抛出异常,有效地停止应用程序

继续-记录异常,使容器处于非运行状态,但不要停止应用程序

重试-记录异常,重试异步启动容器,但不要停止应用程序。

默认的重试行为是重试3次,每次尝试之间有10秒的延迟。但是,可以在相应的容器属性上指定自定义重试模板。如果容器在重试尝试结束后未能重新启动,它将处于非运行状态。

3.4.1. Configuration
With Spring Boot

使用Spring Boot时,您可以注册PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>bean,用于设置容器启动属性。

Without Spring Boot

但是,如果您改为手动配置组件,则在构建消息侦听器容器工厂时,必须相应地更新容器启动属性。

4. Concurrency

当以流模式(stream=true)消费记录时,并发性自然会通过客户端实现中的底层响应式支持来实现。

然而,在逐一处理消息时,可以指定并发性以提高处理吞吐量。只需在@ReactivePulsarListener上设置并发属性。此外,当并发性>1时,您可以通过在注释上设置useKeyOrderedProcessing=“true”来确保消息按键排序,从而发送到同一个处理程序。

同样,ReactiveMessagePipeline完成了繁重的工作,我们只需设置它的属性。

反应式容器中的并发性不同于其命令式对应物。后者创建多个线程(每个线程都有一个Pulsar消费者),而前者在响应式并行调度器上将消息并发分派给多个处理程序实例。

反应式并发模型的一个优点是,它可以与独占订阅一起使用,而命令式并发模型则不能。

5. Pulsar Headers

Pulsar消息元数据可以作为Spring消息头使用。可用标头的列表可以在PulsarHeaders.java中找到。

5.1. Accessing In OneByOne Listener

以下示例显示了使用逐一消息侦听器时如何访问Pulsar Header:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,@Header("foo") String foo) {System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);return Mono.empty();
}

在前面的示例中,我们访问messageId消息元数据的值以及名为foo的自定义消息属性。Spring@Header注释用于每个标头字段。

您还可以使用Pulsar的消息作为信封来携带有效载荷。这样做时,用户可以直接调用Pulsar消息上的相应方法来检索元数据。但是,为了方便起见,您还可以使用Header注释来检索它。请注意,您还可以使用Spring消息传递消息信封来携带有效载荷,然后使用@Header检索Pulsar标头。

5.2. Accessing In Streaming Listener

使用流式消息侦听器时,标头支持是有限的。仅当Flux包含Spring org.springframework.messing时。消息元素将填充标头。此外,Spring@Header注释不能用于检索数据。您必须直接调用Spring消息上的相应方法来检索数据。

6. Message Acknowledgment

该框架自动处理消息确认。但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。然后,容器实现使用该信号执行ack或nack操作。这与它的命令式对应略有不同,在命令式对应中,除非方法抛出异常,否则信号被暗示为正。

6.1. OneByOne Listener

单消息(又名OneByOne)消息侦听器方法返回Mono<Void>,以表示消息是否已成功处理。Mono.empty()表示成功(确认),Mono.error()表示失败(否定确认)。

6.2. Streaming Listener

流式监听器方法返回一个Flux<MessageResult<Void>>,其中每个MessageResult元素表示一个已处理的消息,并保存消息id、值以及是否已确认。MessageResult有一组acknowledge和negativeAcknowledge静态工厂方法,可用于创建相应的MessageResult实例。

7. Message Redelivery and Error Handling

Apache Pulsar为消息重新传递和错误处理提供了各种本机策略。我们将看看它们,看看如何在Spring for Apache Pulsar中使用它们。

7.1. Acknowledgment Timeout

默认情况下,Pulsar消费者不会重新传递消息,除非消费者崩溃,但您可以通过在Pulsar消费者上设置ack超时来更改此行为。如果ack timeout属性的值大于零,并且Pulsar消费者在该超时时间内未确认消息,则重新传递消息。

您可以通过消费者定制器直接将此属性指定为Pulsar消费者属性,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {return b -> b.property("ackTimeoutMillis", "60000");
}

7.2. Negative Acknowledgment Redelivery Delay

当确认为否定时,Pulsar消费者允许您指定应用程序希望如何重新传递消息。默认情况是在一分钟内重新传递消息,但您可以通过消费者自定义程序进行更改,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

7.3. Dead Letter Topic

Apache Pulsar允许应用程序在共享订阅类型的消费者身上使用死信主题。对于独占和故障转移订阅类型,此功能不可用。基本思想是,如果消息被重试了一定次数(可能是由于ack超时或nack重新传递),一旦重试次数用尽,消息就可以被发送到一个称为死信队列(DLQ)的特殊主题。让我们通过检查一些代码片段来了解此功能的一些细节:

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {@ReactivePulsarListener(topics = "topic-with-dlp",subscriptionType = SubscriptionType.Shared,deadLetterPolicy = "myDeadLetterPolicy",consumerCustomizer = "ackTimeoutCustomizer" )void listen(String msg) {throw new RuntimeException("fail " + msg);}@ReactivePulsarListener(topics = "my-dlq-topic")void listenDlq(String msg) {System.out.println("From DLQ: " + msg);}@BeanDeadLetterPolicy myDeadLetterPolicy() {return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();}@BeanReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {return b -> b.property("ackTimeoutMillis", "1000");}
}

首先,我们为DeadLetterPolicy提供了一个特殊的bean,它被命名为DeadLetterPolicy(它可以是您想要的任何名称)。此bean指定了许多内容,例如最大传递率(在本例中为10)和死信主题的名称 — 在这种情况下,我的dlq主题。如果不指定DLQ主题名称,Pulsar中默认为<topicname>-<subscriptionname>-DLQ。接下来,我们通过设置deadLetterPolicy属性将此bean名称提供给ReactivePulsarListener。请注意,ReactivePulse Listener的订阅类型为共享,因为DLQ功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了ackTimeoutMillis值1000。这个想法是,代码抛出异常,如果Pulsar在1秒内没有收到ack,它会重试。如果该循环持续十次(因为这是我们在DeadLetterPolicy中的最大重新交付计数),Pulsar消费者会将消息发布到DLQ主题。我们还有另一个ReactivePulse Listener,它监听DLQ主题,以接收发布到DLQ主题的数据。

如果主主题在幕后被分区,Pulsar会将每个分区视为单独的主题。Pulsar在主主题名称后附加partition-<n>,其中n代表分区号。问题是,如果你不指定DLQ主题(与我们上面所做的相反),Pulsar会发布一个默认的主题名称,其中包含“分区-<n>信息” — 例如:topic-with-dlp-partition-0-deadLetterPolicy订阅-DLQ。解决这个问题的简单方法是始终提供DLQ主题名称。

8. Pulsar Reader Support

该框架支持通过ReactivePulse ReaderFactory以响应式方式使用Pulsar Reader。

Spring Boot提供了这个阅读器工厂,可以配置任何Spring.pulser.reader.*应用程序属性。

http://www.dtcms.com/a/271760.html

相关文章:

  • Socket服务器代理工具及服务端网络转发中枢
  • 【Action帧简要分析】
  • iOS APP混合开发性能测试怎么做?页面卡顿、通信异常的工具组合实战
  • iOS Widget 开发-7:TimelineProvider 机制全解析:构建未来时间线
  • 在 MacOS 上安装和配置 Kafka
  • 深入理解 Linux 中的 stat 函数与文件属性操作
  • 每天一个前端小知识 Day 29 - WebGL / WebGPU 数据可视化引擎设计与实践
  • 在Linux上使用libasan开发QT程序定位内存问题
  • Spring AI 系列之七 - MCP Client
  • 限流式保护器如何筑牢无人驾驶汽车充电站的安全防线
  • linxu内核的signal fault和arm内核的flault
  • 【LeetCode100】--- 2.字母异位词分组【复习回顾】
  • 如何发现 Redis 中的 BigKey?
  • 正向代理服务器Squid:功能、架构、部署与应用深度解析
  • 黄瓜苦多于意外,苦瓜苦来自本源——“瓜苦”探源
  • CloudCanal:一款企业级实时数据同步、迁移工具
  • 浪潮CD1000-移动云电脑-RK3528芯片-2+32G-开启ADB ROOT破解教程
  • tomcat源码02 - 理解Tomcat架构设计
  • MyBatis集成Logback日志全攻略
  • 微软云语音识别ASR示例Demo
  • 激活函数与损失函数:神经网络的动力引擎与导航系统
  • defer学习指南
  • 《C++初阶之内存管理》【内存分布 + operator new/delete + 定位new】
  • 启辰智慧预约团队5周年活动掠影,打造一流预约系统
  • 论文精读(一)| 量子计算系统软件研究综述
  • IoT 小程序:如何破解设备互联的碎片化困局?
  • 一条Redis命令是如何执行的?
  • 两种方式清除已经保存的git账号密码
  • 并发编程第一节
  • 【WEB】Polar靶场 Day7 详细笔记