当前位置: 首页 > news >正文

Using Spring for Apache Pulsar:Transactions

本节介绍Spring for Apache Pulsar如何支持事务。

Overview

Spring for Apache Pulsar事务支持是基于Spring Framework提供的事务支持构建的。在高层,事务资源向事务管理器注册,事务管理器反过来处理注册资源的事务状态(提交、回滚等)。

Apache Pulsar的Spring提供了以下功能:

PulsaTransactionManager-用于正常的Spring事务支持(@transactional,transactionTemplate等)

交易脉冲星模板

交易@pulsaListener

与其他事务管理器的事务同步

事务支持尚未添加到响应式组件中

默认情况下,事务支持已禁用。要在使用Spring Boot时启用支持,只需设置Spring.pulsar.transaction.enabled属性。下面每个组件部分都概述了进一步的配置选项。

Transactional Publishing with PulsarTemplate

事务性PulsarTemplate上的所有发送操作都会查找活动事务,并在事务中登记每个发送操作(如果找到)。

Non-transactional use

默认情况下,事务性PulsarTemplate也可用于非事务性操作。当未找到现有事务时,它将以非事务方式继续发送操作。但是,如果模板配置为需要事务,则任何在事务范围之外使用模板的尝试都会导致异常。

事务可以由TransactionTemplate、@Transactional方法、调用executeInTransaction或事务侦听器容器启动。

Local Transactions

我们使用术语“本地”事务来表示不受Spring事务管理工具(即PulsarTransactionManager)管理或与之关联的Pulsar本地事务。相反,“同步”事务是由PulsarTransactionManager管理或与之关联的事务。

您可以使用PulsarTemplate在本地事务中执行一系列操作。以下示例显示了如何执行此操作:

var results = pulsarTemplate.executeInTransaction((template) -> {var rv = new HashMap<String, MessageId>();rv.put("msg1", template.send(topic, "msg1"));rv.put("msg2", template.send(topic, "msg2"));return rv;
});

回调中的参数是调用executeInTransaction方法的模板实例。模板上的所有操作都登记在当前事务中。如果回调正常退出,则事务被提交。如果抛出异常,则事务将回滚。

若有一个同步的事务正在处理中,它将被忽略,并使用一个新的“嵌套”事务。

Configuration

以下交易设置可直接在PulsarTemplate上使用(通过交易字段):

enabled-模板是否支持事务(默认为false)

required-模板是否需要交易(默认为false)

timeout-事务超时的持续时间(默认为空)

不使用Spring Boot时,您可以在提供的模板上调整这些设置。但是,使用Spring Boot时,模板是自动配置的,没有影响属性的机制。在这种情况下,您可以注册一个可用于调整设置的PulsarTemplateCustomizer bean。以下示例显示了如何在自动配置的模板上设置超时:

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

Transactional Receiving with @PulsarListener

当启用侦听器事务时,在同步事务的范围内调用@PulsarListener注释的侦听器方法。

DefaultPulsarMessageListenerContainer使用配置了PulsarTransactionManager的Spring TransactionTemplate在方法调用之前启动事务。

每个接收到的消息的确认都登记在作用域事务中。

Consume-Process-Produce Scenario

一种常见的事务模式是,消费者从Pulsar主题读取消息,转换消息,最后生产者将生成的消息写入另一个Pulsar主题。当启用事务并且您的侦听器方法使用事务性PulsarTemplate来生成转换后的消息时,该框架支持此用例。

给定以下侦听器方法:

@PulsarListener(topics = "my-input-topic")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.transactionalTemplate.send("my-output-topic", transformedMsg);
}

启用侦听器事务时会发生以下交互:

侦听器容器启动新事务并在事务范围内调用侦听器方法
侦听器方法接收消息
侦听器方法转换消息
监听器方法使用事务模板发送转换后的消息,该模板在活动事务中注册发送操作
侦听器容器自动确认消息,并在活动事务中注册确认操作
侦听器容器(通过TransactionTemplate)提交事务

如果您没有使用@PulsarListener,而是直接使用监听器容器,则会提供与上述相同的事务支持。记住,@PulsarListener只是为了方便将Java方法注册为侦听器容器消息侦听器。

Transactions with Record Listeners

上面的例子使用了一个记录监听器。使用记录侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每条消息一个事务。

由于事务边界是针对每条消息的,并且每条消息的确认都登记在每个事务中,因此批处理确认模式不能用于事务记录侦听器。

Transactions with Batch Listeners

使用批侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每批消息创建一个事务。

事务性批处理侦听器当前不支持自定义错误处理程序。

Configuration

Listener container factory

以下事务设置可以直接在ConcurrentPulsarListenerContainerFactory在创建侦听器容器时使用的PulsarContainerProperties上使用。这些设置会影响所有侦听器容器,包括@PulsarListener使用的容器。

enabled-容器是否支持事务(默认为false)

required-容器是否需要事务(默认为false)

timeout-事务超时的持续时间(默认为空)

transactionDefinition-一个蓝图事务定义,其属性将被复制到容器的事务模板中(默认为null)

transactionManager-用于启动事务的事务管理器

不使用Spring Boot时,您可以在提供的容器出厂设置中调整这些设置。但是,使用Spring Boot时,容器工厂是自动配置的。在这种情况下,您可以注册一个org.springframework.boot.pulser.autofigure。PulsarContainerFactory定制器<并发PulsarListenerContainerFactory<?>>bean访问和自定义容器属性。以下示例显示了如何在容器工厂设置超时:

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener

默认情况下,每个侦听器都尊重其相应侦听器容器工厂的事务设置。但是,用户可以在每个@PulsarListener上设置事务属性,以覆盖容器工厂设置,如下所示:

如果容器工厂启用了事务,那么transaction=false将禁用单个侦听器的事务。

如果容器工厂启用了事务并且是必需的,那么尝试设置transaction=false将导致抛出一个异常,说明事务是必需的。

如果容器工厂已禁用事务,则将忽略设置transaction=true的尝试,并记录警告。

Using PulsarTransactionManager

PulsarTransactionManager是Spring框架的PlatformTransactionManager的实现。您可以将PulsarTransactionManager与正常的Spring事务支持(@Transactional、TransactionTemplate等)一起使用。

如果事务处于活动状态,则在事务范围内执行的任何PulsarTemplate操作都会登记并参与正在进行的事务。经理提交或回滚事务,取决于成功或失败。

您可能不需要直接使用PulsarTransactionManager,因为大多数事务用例都包含在PulsarTemplate和@PulsarListener中。

Pulsar Transactions with Other Transaction Managers

Producer-only transaction

如果你想将记录发送到Pulsar并在单个事务中执行一些数据库更新,你可以使用DataSourceTransactionManager进行正常的Spring事务管理。

以下示例假设有一个名为“DataSourceTransactionManager”的DataSourceTransactionManager bean注册

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.pulsarTemplate.send("my-topic", msg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

@Transactional注释的拦截器启动数据库事务,PulsarTemplate将与DB事务管理器同步事务;每次发送都将参与该交易。当该方法退出时,数据库事务将提交,然后是Pulsar事务。

如果您希望首先提交Pulsar事务,并且仅在Pulsar事务成功时提交DB事务,请使用嵌套的@Transactional方法,其中外部方法配置为使用DataSourceTransactionManager,内部方法配置为用PulsarTransactionManager。

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));this.sendToPulsar(msg);
}@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {this.pulsarTemplate.send("my-topic", msg);
}

Consumer + Producer transaction

如果你想使用Pulsar的记录,将记录发送到Pulsar,并在事务中执行一些数据库更新,你可以将正常的Spring事务管理(使用DataSourceTransactionManager)与容器发起的事务相结合。

在以下示例中,侦听器容器启动Pulsar事务,@Transactional注释启动DB事务。DB事务首先提交;如果Pulsar事务未能提交,记录将被重新传递,因此DB更新应该是幂等的。

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.pulsarTemplate.send("my-output-topic", transformedMsg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}

http://www.dtcms.com/a/272069.html

相关文章:

  • MySQL断开连接后无法正常启动解决记录
  • 第一次搭建数据库
  • JavaScript 树形菜单总结
  • 2025社交电商新风口:推客小程序的商业逻辑与技术实现
  • 数据结构与算法之美:广义表
  • 通过vue如何利用 Three 绘制 简单3D模型(源码案例)
  • Redis中BigKey的隐患
  • Mysql分片:一致性哈希算法
  • 服务器内核级故障排查
  • 【计算机网络】HTTP1.0 HTTP1.1 HTTP2.0 QUIC HTTP3 究极总结
  • 【Python办公】使用Python和Tkinter构建Excel数据导入MySQL工具(GUI版)
  • 如何排查处理机械臂算法不精准问题?
  • 在徐州网络中服务器租用与托管的优势
  • LangChain框架 Prompts、Agents 应用
  • 浅克隆 深克隆
  • mvn能只test单独一个文件吗
  • vscode和插件用法
  • 数据分析中的拉链表解析
  • 网络安全初级
  • 从Rust模块化探索到DLB 2.0实践|得物技术
  • Linux进程——进程状态
  • ZW3D 二次开发-创建球体
  • 自动驾驶大模型---聊一聊特斯拉的FSD端到端系统
  • 自动驾驶数据集综述:统计特征、标注质量与未来展望
  • 一句话理解 ——【单点登录】
  • 【性能测试】jmeter+Linux环境部署和分布式压测,一篇打通...
  • 阿里云错题集分享
  • 在IDEA中无缝接入DeepSeek:智能编程助手指南
  • 如何把Arduino IDE中ESP32程序bin文件通过乐鑫flsah_download_tool工具软件下载到ESP32中
  • 探索Alibaba-NLP/WebAgent:迈向智能信息搜索新时代