当前位置: 首页 > news >正文

Using Spring for Apache Pulsar:Publishing and Consuming Partitioned Topics

在下面的示例中,我们发布了一个名为hello pulser participated的主题。这是一个被分区的主题,对于这个示例,我们假设该主题已经创建了三个分区。

@SpringBootApplication
public class PulsarBootPartitioned {public static void main(String[] args) {SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");}@Beanpublic ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");return args -> {for (int i = 0; i < 10; i++) {pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());}};}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")public void listen(String message) {System.out.println("Message Received: " + message);}static class FooRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 0;}}static class BarRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 1;}}static class BuzzRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 2;}}}

在前面的示例中,我们发布到一个分区的主题,我们想将一些数据段发布到特定的分区。如果您将其保留为Pulsar的默认值,它将遵循分区分配的轮转模式,我们希望覆盖该模式。为此,我们提供了一个带有send方法的消息路由器对象。考虑实现的三个消息路由器。FooRouter始终将数据发送到分区0,BarRouter发送到分区1,BuzzRouter发送给分区2。还要注意,我们现在使用PulsarTemplate的sendAsync方法,该方法返回CompletableFuture。运行应用程序时,我们还需要将生产者上的messageRoutingMode设置为CustomPartition(spring.pulsinger.producer.message路由模式)。

在消费者端,我们使用具有独占订阅类型的PulsarListener。这意味着来自所有分区的数据最终都在同一个消费者中,并且没有订购保证。

如果我们希望每个分区由一个不同的消费者使用,我们该怎么办?我们可以切换到故障转移订阅模式,并添加三个单独的消费者:

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {System.out.println("Message Received 3: " + foo);
}

当你遵循这种方法时,一个分区总是被一个专用的消费者占用。

同样,如果你想使用Pulsar的共享消费者类型,你可以使用共享订阅类型。但是,当您使用共享模式时,您将失去任何排序保证,因为单个消费者可能会在另一个消费者有机会之前收到来自所有分区的消息。

考虑以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}

http://www.dtcms.com/a/270693.html

相关文章:

  • vue2 echarts中国地图、在地图上标注经纬度及标注点
  • AI应用实践:制作一个支持超长计算公式的计算器,计算内容只包含加减乘除算法,保存在一个HTML文件中
  • 「macOS 系统字体收集器 (C++17 实现)」
  • Oracle存储过程导出数据到Excel:全面实现方案详解
  • Java零基础笔记08(Java编程核心:面向对象编程高级 {继承、多态})
  • 【macOS】【Swift】【RTF】黑色文字在macOS深色外观下看不清的解决方法
  • yolo8实现目标检测
  • springMVC05-异常处理器
  • HashMap源码分析:put与get方法详解
  • 【拓扑空间】示例及详解1
  • sqlplus表结构查询
  • 高效集成-C#全能打印报表设计器诞生记
  • Android-重学kotlin(协程源码第一阶段)新学习总结
  • mongodb: cannot import name ‘_check_name‘ from ‘pymongo.database‘
  • 池化思想-Mysql异步连接池
  • 教育行业可以采用Html5全链路对视频进行加密?有什么优势?
  • 高通 QCS6490PI 集群架构支撑 DeepSeek 模型稳定运行的技术实现
  • upload-labs靶场通关详解:第19关 条件竞争(二)
  • Java-----韩顺平单例设计模式学习笔记
  • java项目maven编译的时候报错:Fatal error compiling: 无效的标记: --release
  • 【计算机组成原理——知识点总结】-(总线与输入输出设备)-学习笔记总结-复习用
  • Caffeine的tokenCache与Spring的CaffeineCacheManager缓存区别
  • uniapp,Anroid10+版本如何保存图片并删除
  • 缓存三大问题详解与工业级解决方案
  • 视频音频转换器V!P版(安卓)安装就解锁V!P!永久免费使用!
  • 【RK3568+PG2L50H开发板实验例程】FPGA部分 | DDR3 读写实验例程
  • 创客匠人:在 IP 变现浪潮中,坚守知识变现的本质
  • 飞算AI-idea强大的AI工具
  • 二分查找篇——在排序数组中查找元素的第一个和最后一个位置【LeetCode】
  • 如何把一个多行的RAS key放到环境变量中?