Using Spring for Apache Pulsar:Transactions
本节介绍Spring for Apache Pulsar如何支持事务。
Overview
Spring for Apache Pulsar事务支持是基于Spring Framework提供的事务支持构建的。在高层,事务资源向事务管理器注册,事务管理器反过来处理注册资源的事务状态(提交、回滚等)。
Apache Pulsar的Spring提供了以下功能:
PulsaTransactionManager-用于正常的Spring事务支持(@transactional,transactionTemplate等)
交易脉冲星模板
交易@pulsaListener
与其他事务管理器的事务同步
事务支持尚未添加到响应式组件中
默认情况下,事务支持已禁用。要在使用Spring Boot时启用支持,只需设置Spring.pulsar.transaction.enabled属性。下面每个组件部分都概述了进一步的配置选项。
Transactional Publishing with PulsarTemplate
事务性PulsarTemplate上的所有发送操作都会查找活动事务,并在事务中登记每个发送操作(如果找到)。
Non-transactional use
默认情况下,事务性PulsarTemplate也可用于非事务性操作。当未找到现有事务时,它将以非事务方式继续发送操作。但是,如果模板配置为需要事务,则任何在事务范围之外使用模板的尝试都会导致异常。
事务可以由TransactionTemplate、@Transactional方法、调用executeInTransaction或事务侦听器容器启动。
Local Transactions
我们使用术语“本地”事务来表示不受Spring事务管理工具(即PulsarTransactionManager)管理或与之关联的Pulsar本地事务。相反,“同步”事务是由PulsarTransactionManager管理或与之关联的事务。
您可以使用PulsarTemplate在本地事务中执行一系列操作。以下示例显示了如何执行此操作:
var results = pulsarTemplate.executeInTransaction((template) -> {var rv = new HashMap<String, MessageId>();rv.put("msg1", template.send(topic, "msg1"));rv.put("msg2", template.send(topic, "msg2"));return rv;
});
回调中的参数是调用executeInTransaction方法的模板实例。模板上的所有操作都登记在当前事务中。如果回调正常退出,则事务被提交。如果抛出异常,则事务将回滚。
若有一个同步的事务正在处理中,它将被忽略,并使用一个新的“嵌套”事务。
Configuration
以下交易设置可直接在PulsarTemplate上使用(通过交易字段):
enabled-模板是否支持事务(默认为false)
required-模板是否需要交易(默认为false)
timeout-事务超时的持续时间(默认为空)
不使用Spring Boot时,您可以在提供的模板上调整这些设置。但是,使用Spring Boot时,模板是自动配置的,没有影响属性的机制。在这种情况下,您可以注册一个可用于调整设置的PulsarTemplateCustomizer bean。以下示例显示了如何在自动配置的模板上设置超时:
@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
Transactional Receiving with @PulsarListener
当启用侦听器事务时,在同步事务的范围内调用@PulsarListener注释的侦听器方法。
DefaultPulsarMessageListenerContainer使用配置了PulsarTransactionManager的Spring TransactionTemplate在方法调用之前启动事务。
每个接收到的消息的确认都登记在作用域事务中。
Consume-Process-Produce Scenario
一种常见的事务模式是,消费者从Pulsar主题读取消息,转换消息,最后生产者将生成的消息写入另一个Pulsar主题。当启用事务并且您的侦听器方法使用事务性PulsarTemplate来生成转换后的消息时,该框架支持此用例。
给定以下侦听器方法:
@PulsarListener(topics = "my-input-topic")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.transactionalTemplate.send("my-output-topic", transformedMsg);
}
启用侦听器事务时会发生以下交互:
侦听器容器启动新事务并在事务范围内调用侦听器方法
侦听器方法接收消息
侦听器方法转换消息
监听器方法使用事务模板发送转换后的消息,该模板在活动事务中注册发送操作
侦听器容器自动确认消息,并在活动事务中注册确认操作
侦听器容器(通过TransactionTemplate)提交事务
如果您没有使用@PulsarListener,而是直接使用监听器容器,则会提供与上述相同的事务支持。记住,@PulsarListener只是为了方便将Java方法注册为侦听器容器消息侦听器。
Transactions with Record Listeners
上面的例子使用了一个记录监听器。使用记录侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每条消息一个事务。
由于事务边界是针对每条消息的,并且每条消息的确认都登记在每个事务中,因此批处理确认模式不能用于事务记录侦听器。
Transactions with Batch Listeners
使用批侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每批消息创建一个事务。
事务性批处理侦听器当前不支持自定义错误处理程序。
Configuration
Listener container factory
以下事务设置可以直接在ConcurrentPulsarListenerContainerFactory在创建侦听器容器时使用的PulsarContainerProperties上使用。这些设置会影响所有侦听器容器,包括@PulsarListener使用的容器。
enabled-容器是否支持事务(默认为false)
required-容器是否需要事务(默认为false)
timeout-事务超时的持续时间(默认为空)
transactionDefinition-一个蓝图事务定义,其属性将被复制到容器的事务模板中(默认为null)
transactionManager-用于启动事务的事务管理器
不使用Spring Boot时,您可以在提供的容器出厂设置中调整这些设置。但是,使用Spring Boot时,容器工厂是自动配置的。在这种情况下,您可以注册一个org.springframework.boot.pulser.autofigure。PulsarContainerFactory定制器<并发PulsarListenerContainerFactory<?>>bean访问和自定义容器属性。以下示例显示了如何在容器工厂设置超时:
@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener
默认情况下,每个侦听器都尊重其相应侦听器容器工厂的事务设置。但是,用户可以在每个@PulsarListener上设置事务属性,以覆盖容器工厂设置,如下所示:
如果容器工厂启用了事务,那么transaction=false将禁用单个侦听器的事务。
如果容器工厂启用了事务并且是必需的,那么尝试设置transaction=false将导致抛出一个异常,说明事务是必需的。
如果容器工厂已禁用事务,则将忽略设置transaction=true的尝试,并记录警告。
Using PulsarTransactionManager
PulsarTransactionManager是Spring框架的PlatformTransactionManager的实现。您可以将PulsarTransactionManager与正常的Spring事务支持(@Transactional、TransactionTemplate等)一起使用。
如果事务处于活动状态,则在事务范围内执行的任何PulsarTemplate操作都会登记并参与正在进行的事务。经理提交或回滚事务,取决于成功或失败。
您可能不需要直接使用PulsarTransactionManager,因为大多数事务用例都包含在PulsarTemplate和@PulsarListener中。
Pulsar Transactions with Other Transaction Managers
Producer-only transaction
如果你想将记录发送到Pulsar并在单个事务中执行一些数据库更新,你可以使用DataSourceTransactionManager进行正常的Spring事务管理。
以下示例假设有一个名为“DataSourceTransactionManager”的DataSourceTransactionManager bean注册
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.pulsarTemplate.send("my-topic", msg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
@Transactional注释的拦截器启动数据库事务,PulsarTemplate将与DB事务管理器同步事务;每次发送都将参与该交易。当该方法退出时,数据库事务将提交,然后是Pulsar事务。
如果您希望首先提交Pulsar事务,并且仅在Pulsar事务成功时提交DB事务,请使用嵌套的@Transactional方法,其中外部方法配置为使用DataSourceTransactionManager,内部方法配置为用PulsarTransactionManager。
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));this.sendToPulsar(msg);
}@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {this.pulsarTemplate.send("my-topic", msg);
}
Consumer + Producer transaction
如果你想使用Pulsar的记录,将记录发送到Pulsar,并在事务中执行一些数据库更新,你可以将正常的Spring事务管理(使用DataSourceTransactionManager)与容器发起的事务相结合。
在以下示例中,侦听器容器启动Pulsar事务,@Transactional注释启动DB事务。DB事务首先提交;如果Pulsar事务未能提交,记录将被重新传递,因此DB更新应该是幂等的。
@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.pulsarTemplate.send("my-output-topic", transformedMsg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}