Spring Retry机制详解
介绍
Spring框架提供了Spring Retry
能让在项目工程中很方便的使用重试。
使用
1、引入pom
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.2</version>
</dependency>
2、启用retry
在Configuration
上加 @EnableRetry
。
3、示例
@Service
public class RetryService {
private int times = 0;
private Instant begin = null;
@Retryable(value = BizException.class, maxAttempts = 3,
backoff= @Backoff(value = 1500, maxDelay = 10000, multiplier = 2))
public void service() {
Instant instant = Instant.now();
if(begin == null)
{
begin = instant;
}
times++;
System.out.println(StrUtil.format(" call times: {} at {}. ", times, begin.until(instant, ChronoUnit.MILLIS) ));
if (times < 5) {
throw new BizException(StrUtil.format(" call times: {} error. ", times));
}
}
@Recover
public void recover(BizException e){
System.out.println("service retry after Recover => " + e.getMessage());
}
}
@PostMapping(value = "/retry")
public R retry() {
try {
retryService.service();
} catch (Exception ex) {
ex.printStackTrace();
return R.fail(ex.getMessage());
}
return R.ok();
}
输出:
call times: 1 at 0. #第1次调用
call times: 2 at 1504. #重试第1次 ,间隔 1.5 * 2
call times: 3 at 4506. #重试第2次 ,间隔
service retry after Recover => call times: 3 error. # recover
重试次数改为7 ,输出:
call times: 1 at 0.
call times: 2 at 1506. # value = 1.5 ,第1 次间隔
call times: 3 at 4507. # 第2次间隔 1.5 * 2 = 3
call times: 4 at 10508. # 第3次,1.5 * 2 * 2 = 6
call times: 5 at 20509. #最大延迟就是10
call times: 6 at 30511.
call times: 7 at 40512.
4、注解说明
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Retryable {
/** recover 方法名 */
String recover() default "";
/** 自定义 interceptor bean name */
String interceptor() default "";
Class<? extends Throwable>[] value() default {};
Class<? extends Throwable>[] include() default {};
Class<? extends Throwable>[] exclude() default {};
/** 统计报表 唯一 label */
String label() default "";
boolean stateful() default false;
int maxAttempts() default 3;
/** maxAttempts 表达式,spel 表达式,例如: ${retry.attempts:5} */
String maxAttemptsExpression() default "";
Backoff backoff() default @Backoff();
/**
1、直接使用 异常的属性。
message.contains('you can retry this')
2、其他方法判断:
格式#{@bean.methodName(#root)}。methodName的返回值为boolean类型。#root是异常类,即用户可以在代码中判断是否进行重试
*/
String exceptionExpression() default "";
String[] listeners() default {};
}
-
@EnableRetry
:启用重试,proxyTargetClass
属性为true
时(默认false
),使用CGLIB
代理。 -
@Retryable
:标记当前方法会使用重试机制。value
:指定抛出那些异常才会触发重试(可以配置多个异常类型) 默认为空。include
:就是value
,默认为空,当exclude
也为空时,默认所有异常都可以触发重试exclude
:指定哪些异常不触发重试(可以配置多个异常类型),默认为空maxAttempts
:最大重试次数,默认3次(包括第一次调用)backoff
:重试等待策略 默认使用@Backoff
注解
-
@Backoff
:重试回退策略(立即重试还是等待一会再重试)value
: 重试的间隔时间(毫秒),默认为1000L
。delay
:value
的别名maxDelay
:重试次数之间的最大时间间隔,默认为0,如果小于delay
的设置,则默认为30000L
multiplier
:delay
时间的间隔倍数,默认为0
,表示固定暂停1
秒后进行重试,如果把multiplier
设置为1.5
,则第一次重试为2
秒,第二次为3
秒,第三次为4.5
秒。
- 不设置参数时,默认使用
FixedBackOffPolicy
(固定时间等待策略),重试等待1000ms
- 只设置
delay
时,使用FixedBackOffPolicy
,重试等待指定的毫秒数- 当设置
delay
和maxDealy
时,重试等待在这两个值之间均态分布- 设置
delay
,maxDealy
和multiplier
时,使用ExponentialBackOffPolicy
(倍数等待策略)- 当设置
multiplier
不等于0时,同时也设置了random
时,使用ExponentialRandomBackOffPolicy
(随机倍数等待策略),从 [1, multiplier-1] 中的均匀分布中为每个延迟选择乘数
@Recover
标记方法为@Retryable
失败时的“兜底”处理方法
- 传参与
@Retryable
的配置的value
必须一样。@Recover
的标记方法的参数必须要与@Retryable
注解value
“形参”保持一致,第一入参为要重试的异常(一定要是@Retryable
方法里抛出的异常或者异常父类),其他参数与@Retryable
保持一致,返回值也要一样,否则无法执行!
@CircuitBreaker
:用于标记方法,实现熔断模式。include
指定处理的异常类。默认为空exclude
指定不需要处理的异常。默认为空vaue
指定要重试的异常。默认为空maxAttempts
最大重试次数。默认3次openTimeout
配置熔断器打开的超时时间,默认5s
,当超过openTimeout
之后熔断器电路变成半打开状态(只要有一次重试成功,则闭合电路)resetTimeout
配置熔断器重新闭合的超时时间,默认20s
,超过这个时间断路器关闭
注意事项
-
使用了
@Retryable
注解的方法直接实例化调用不会触发重试,要先将实现类实例化到Spring
容器中,然后通过注入等方式使用 -
Spring-Retry
是通过捕获异常的方式来触发重试的,@Retryable
标注方法产生的异常不能使用try-catch
捕获,要在方法上抛出异常,不然不会触发重试 -
查询可以进行重试,写操作要慎重,除非业务方支持重入
原理
引入
@EnableRetry
注解,引入Retry能力,导入了RetryConfiguration
类。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {
boolean proxyTargetClass() default false;
}
@Component
public class RetryConfiguration extends AbstractPointcutAdvisor
implements IntroductionAdvisor, BeanFactoryAware, InitializingBean {
private Advice advice; //
private Pointcut pointcut;
private RetryContextCache retryContextCache;
private List<RetryListener> retryListeners;
private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;
private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;
private Sleeper sleeper;
private BeanFactory beanFactory;
@Override
public ClassFilter getClassFilter() {
return this.pointcut.getClassFilter();
}
@Override
public Class<?>[] getInterfaces() {
return new Class[] { org.springframework.retry.interceptor.Retryable.class };
}
@Override
public void validateInterfaces() throws IllegalArgumentException {
}
}
RetryConfiguration
继承 AbstractPointcutAdvisor
,实现了 IntroductionAdvisor
,它有一个pointcut
和一个advice
,在IOC
过程中会根据PointcutAdvisor
类来对Bean进行Pointcut
的过滤,然后生成对应的AOP
代理类,用advice来加强处理。
初始化
afterPropertiesSet
方法进行初始化。
@Override
public void afterPropertiesSet() throws Exception {
//RetryContextCache
this.retryContextCache = findBean(RetryContextCache.class);
this.methodArgumentsKeyGenerator = findBean(MethodArgumentsKeyGenerator.class);
this.newMethodArgumentsIdentifier = findBean(NewMethodArgumentsIdentifier.class);
//RetryListener
this.retryListeners = findBeans(RetryListener.class);
this.sleeper = findBean(Sleeper.class);
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
//注解为 Retryable
retryableAnnotationTypes.add(Retryable.class);
//构造 pointcut
this.pointcut = buildPointcut(retryableAnnotationTypes);
//构造 advice
this.advice = buildAdvice();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(this.beanFactory);
}
}
buildPointcut 和 buildAdvice
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {
//根据 注解,构造 Pointcut
Pointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);
if (result == null) {
result = new ComposablePointcut(filter);
}
else {
result.union(filter);
}
}
return result;
}
protected Advice buildAdvice() {
//构造一个 AnnotationAwareRetryOperationsInterceptor
AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
if (this.retryContextCache != null) {
interceptor.setRetryContextCache(this.retryContextCache);
}
if (this.retryListeners != null) {
interceptor.setListeners(this.retryListeners);
}
if (this.methodArgumentsKeyGenerator != null) {
interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);
}
if (this.newMethodArgumentsIdentifier != null) {
interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);
}
if (this.sleeper != null) {
interceptor.setSleeper(this.sleeper);
}
return interceptor;
}
pointcut
private final class AnnotationClassOrMethodPointcut extends StaticMethodMatcherPointcut {
private final MethodMatcher methodResolver;
AnnotationClassOrMethodPointcut(Class<? extends Annotation> annotationType) {
this.methodResolver = new AnnotationMethodMatcher(annotationType);
setClassFilter(new AnnotationClassOrMethodFilter(annotationType));
}
@Override
public boolean matches(Method method, Class<?> targetClass) {
return getClassFilter().matches(targetClass) || this.methodResolver.matches(method, targetClass);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof AnnotationClassOrMethodPointcut)) {
return false;
}
AnnotationClassOrMethodPointcut otherAdvisor = (AnnotationClassOrMethodPointcut) other;
return ObjectUtils.nullSafeEquals(this.methodResolver, otherAdvisor.methodResolver);
}
}
private final class AnnotationClassOrMethodFilter extends AnnotationClassFilter {
private final AnnotationMethodsResolver methodResolver;
AnnotationClassOrMethodFilter(Class<? extends Annotation> annotationType) {
super(annotationType, true);
this.methodResolver = new AnnotationMethodsResolver(annotationType);
}
@Override
public boolean matches(Class<?> clazz) {
// 类的方法上 标记了指定注解。
return super.matches(clazz) || this.methodResolver.hasAnnotatedMethods(clazz);
}
}
private static class AnnotationMethodsResolver {
private Class<? extends Annotation> annotationType;
public AnnotationMethodsResolver(Class<? extends Annotation> annotationType) {
this.annotationType = annotationType;
}
public boolean hasAnnotatedMethods(Class<?> clazz) {
final AtomicBoolean found = new AtomicBoolean(false);
//遍历所有的方法,
ReflectionUtils.doWithMethods(clazz, new MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
if (found.get()) {
return;
}
//method 有注解。
Annotation annotation = AnnotationUtils.findAnnotation(method,
AnnotationMethodsResolver.this.annotationType);
if (annotation != null) {
found.set(true);
}
}
});
return found.get();
}
}
AnnotationAwareRetryOperationsInterceptor
buildAdvice()
方法会构造一个AnnotationAwareRetryOperationsInterceptor
实例。用于做增强操作。
public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {
private static final TemplateParserContext PARSER_CONTEXT = new TemplateParserContext();
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
private static final MethodInterceptor NULL_INTERCEPTOR = new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
throw new OperationNotSupportedException("Not supported");
}
};
private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
//用于缓存每个object的用于 增强的方法。
private final ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>> delegates = new ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>>();
private RetryContextCache retryContextCache = new MapRetryContextCache();
private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;
private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;
private Sleeper sleeper;
private BeanFactory beanFactory;
private RetryListener[] globalListeners;
}
invoke
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
//构造代理
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
}
else {
return invocation.proceed();
}
}
private MethodInterceptor getDelegate(Object target, Method method) {
//缓存
ConcurrentMap<Method, MethodInterceptor> cachedMethods = this.delegates.get(target);
if (cachedMethods == null) {
cachedMethods = new ConcurrentHashMap<Method, MethodInterceptor>();
}
MethodInterceptor delegate = cachedMethods.get(method);
if (delegate == null) {
MethodInterceptor interceptor = NULL_INTERCEPTOR;
//获取方法的 Retryable 注解。
Retryable retryable = AnnotatedElementUtils.findMergedAnnotation(method, Retryable.class);
if (retryable == null) {
//父类?
retryable = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Retryable.class);
}
if (retryable == null) {
//在类上查找
retryable = findAnnotationOnTarget(target, method, Retryable.class);
}
if (retryable != null) {
//如果有 interceptor,则直接使用
if (StringUtils.hasText(retryable.interceptor())) {
interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
}
else if (retryable.stateful()) {
interceptor = getStatefulInterceptor(target, method, retryable);
}
else {
interceptor = getStatelessInterceptor(target, method, retryable);
}
}
cachedMethods.putIfAbsent(method, interceptor);
delegate = cachedMethods.get(method);
}
this.delegates.putIfAbsent(target, cachedMethods);
return delegate == NULL_INTERCEPTOR ? null : delegate;
}
getStatefulInterceptor 和 getStatelessInterceptor
private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {
//生成一个RetryTemplate
RetryTemplate template = createTemplate(retryable.listeners());
//生成retryPolicy
template.setRetryPolicy(getRetryPolicy(retryable));
//生成backoffPolicy
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
//RetryOperationsInterceptor
return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label())
.recoverer(getRecoverer(target, method)).build();
}
private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
RetryTemplate template = createTemplate(retryable.listeners());
template.setRetryContextCache(this.retryContextCache);
//
CircuitBreaker circuit = AnnotatedElementUtils.findMergedAnnotation(method, CircuitBreaker.class);
if (circuit == null) {
circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);
}
if (circuit != null) {
RetryPolicy policy = getRetryPolicy(circuit);
CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
breaker.setOpenTimeout(getOpenTimeout(circuit));
breaker.setResetTimeout(getResetTimeout(circuit));
template.setRetryPolicy(breaker);
template.setBackOffPolicy(new NoBackOffPolicy());
String label = circuit.label();
if (!StringUtils.hasText(label)) {
label = method.toGenericString();
}
return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit"))
.retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();
}
RetryPolicy policy = getRetryPolicy(retryable);
template.setRetryPolicy(policy);
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
String label = retryable.label();
return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator)
.newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label)
.recoverer(getRecoverer(target, method)).build();
}
RetryOperationsInterceptor
invoke
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
String name;
if (StringUtils.hasText(this.label)) {
name = this.label;
}
else {
name = invocation.getMethod().toGenericString();
}
final String label = name;
// RetryCallback,主要调用了invocation的proceed()方法
RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, this.label);
/*
* If we don't copy the invocation carefully it won't keep a reference to
* the other interceptors in the chain. We don't have a choice here but to
* specialise to ReflectiveMethodInvocation (but how often would another
* implementation come along?).
*/
if (this.invocation instanceof ProxyMethodInvocation) {
context.setAttribute("___proxy___", ((ProxyMethodInvocation) this.invocation).getProxy());
try {
return ((ProxyMethodInvocation) this.invocation).invocableClone().proceed();
}
catch (Exception e) {
throw e;
}
catch (Error e) {
throw e;
}
catch (Throwable e) {
throw new IllegalStateException(e);
}
}
else {
throw new IllegalStateException(
"MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "
+ "so please raise an issue if you see this exception");
}
}
};
// recoverer
if (this.recoverer != null) {
ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(invocation.getArguments(), // 真实调用的 参数
this.recoverer);
try {
Object recovered = this.retryOperations.execute(retryCallback, recoveryCallback);
return recovered;
}
finally {
RetryContext context = RetrySynchronizationManager.getContext();
if (context != null) {
context.removeAttribute("__proxy__");
}
}
}
//最终还是进入到retryOperations的execute方法,这个retryOperations就是在之前的builder set进来的RetryTemplate。
return this.retryOperations.execute(retryCallback);
}
private static final class ItemRecovererCallback implements RecoveryCallback<Object> {
private final Object[] args;
private final MethodInvocationRecoverer<?> recoverer;
/**
* @param args the item that failed.
*/
private ItemRecovererCallback(Object[] args, MethodInvocationRecoverer<?> recoverer) {
this.args = Arrays.asList(args).toArray();
this.recoverer = recoverer;
}
@Override
public Object recover(RetryContext context) {
// this.args
return this.recoverer.recover(this.args, context.getLastThrowable());
}
}
无论是RetryOperationsInterceptor
还是StatefulRetryOperationsInterceptor
,最终的拦截处理逻辑还是调用到RetryTemplate
的execute
方法,从名字也看出来,RetryTemplate
作为一个模板类,里面包含了重试统一逻辑。
RetryTemplate
RetryTemplate
的 execute
方法主要就是参数的不同。核心就是3个参数:RetryCallback
,RecoveryCallback
,RetryState
@Override
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {
return doExecute(retryCallback, null, null);
}
@Override
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback) throws E {
return doExecute(retryCallback, recoveryCallback, null);
}
@Override
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState)
throws E, ExhaustedRetryException {
return doExecute(retryCallback, null, retryState);
}
@Override
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E, ExhaustedRetryException {
return doExecute(retryCallback, recoveryCallback, retryState);
}
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
//新建一个RetryContext来保存本轮重试的上下文
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// Make sure the context is available globally for clients who need
// it...
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
//如果有注册RetryListener,则会调用它的open方法,给调用者一个通知。
boolean running = doOpenInterceptors(retryCallback, context);
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// Get or Start the backoff context...
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
//判断能否重试,就是调用RetryPolicy的canRetry方法来判断。
//这个循环会直到原方法不抛出异常,或不需要再重试
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
//清除上次记录的异常
// the close interceptors will not think we failed...
lastException = null;
//doWithRetry方法,一般来说就是原方法
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
//记录异常
lastException = e;
try {
//记录异常信息
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
}
finally {
//调用RetryListener的onError方法
doOnErrorInterceptors(retryCallback, context, e);
}
//再次判断能否重试
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
//如果可以重试则走退避策略
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
}
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
/*
* A stateful attempt that can retry may rethrow the exception before now,
* but if we get this far in a stateful retry there's a reason for it,
* like a circuit breaker or a rollback classifier.
*/
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
} // END WHILE
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
//重试结束后如果有兜底Recovery方法则执行,否则抛异常
return handleRetryExhausted(recoveryCallback, context, state);
} //END FIRST TRY
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
//处理一些关闭逻辑
close(retryPolicy, context, state, lastException == null || exhausted);
//调用RetryListener的close方法
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
重试策略
用来判断当方法调用异常时是否需要重试。常用策略有:
SimpleRetryPolicy
:默认最多重试3次TimeoutRetryPolicy
:默认在1秒内失败都会重试ExpressionRetryPolicy
:符合表达式就会重试CircuitBreakerRetryPolicy
:增加了熔断的机制,如果不在熔断状态,则允许重试CompositeRetryPolicy
:可以组合多个重试策略NeverRetryPolicy
:从不重试(也是一种重试策略哈)AlwaysRetryPolicy
:总是重试
重试策略最重要的方法就是 canRetry
public interface RetryPolicy extends Serializable {
boolean canRetry(RetryContext context);
RetryContext open(RetryContext parent);
void close(RetryContext context);
void registerThrowable(RetryContext context, Throwable throwable);
}
//SimpleRetryPolicy
@Override
public boolean canRetry(RetryContext context) {
Throwable t = context.getLastThrowable();
//判断抛出的异常是否符合重试的异常
return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;
}
//ExpressionRetryPolicy extends SimpleRetryPolicy
public ExpressionRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, String expressionString, boolean defaultValue) {
super(maxAttempts, retryableExceptions, traverseCauses, defaultValue);
Assert.notNull(expressionString, "'expressionString' cannot be null");
this.expression = getExpression(expressionString);
}
退避策略
控制下一次的间隔时间。常用策略有:
FixedBackOffPolicy
默认固定延迟1秒后执行下一次重试ExponentialBackOffPolicy
指数递增延迟执行重试,默认初始0.1秒,系数是2,那么下次延迟0.2秒,再下次就是延迟0.4秒,如此类推,最大30秒。ExponentialRandomBackOffPolicy
在上面那个策略上增加随机性UniformRandomBackOffPolicy
这个跟上面的区别就是,上面的延迟会不停递增,这个只会在固定的区间随机StatelessBackOffPolicy
这个说明是无状态的,所谓无状态就是对上次的退避无感知,从它下面的子类也能看出来
退避策略主要方法是 backOff
public interface BackOffPolicy {
BackOffContext start(RetryContext context);
void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;
}
FixedBackOffPolicy
public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {
private static final long DEFAULT_BACK_OFF_PERIOD = 1000L;
private volatile long backOffPeriod = DEFAULT_BACK_OFF_PERIOD;
private Sleeper sleeper = new ThreadWaitSleeper();
public FixedBackOffPolicy withSleeper(Sleeper sleeper) {
FixedBackOffPolicy res = new FixedBackOffPolicy();
res.setBackOffPeriod(backOffPeriod);
res.setSleeper(sleeper);
return res;
}
protected void doBackOff() throws BackOffInterruptedException {
try {
//sleep 指定时间
// 内部:Thread.sleep(backOffPeriod);
sleeper.sleep(backOffPeriod);
}
catch (InterruptedException e) {
throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
}
}
}
//ExponentialBackOffPolicy
@Override
public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException {
ExponentialBackOffContext context = (ExponentialBackOffContext) backOffContext;
try {
// ExponentialBackOffContext
long sleepTime = context.getSleepAndIncrement();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Sleeping for " + sleepTime);
}
this.sleeper.sleep(sleepTime);
}
catch (InterruptedException e) {
throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
}
}
//ExponentialBackOffContext
public synchronized long getSleepAndIncrement() {
// this.interval:本次间隔时间。在上一次结束时计算。
long sleep = this.interval;
if (sleep > this.maxInterval) {
sleep = this.maxInterval;
}
else {
this.interval = getNextInterval();
}
return sleep;
}
protected long getNextInterval() {
return (long) (this.interval * this.multiplier);
}
RetryContext
RetryContext
主要用于记录一些状态。
public interface RetryContext extends AttributeAccessor {
String NAME = "context.name";
String STATE_KEY = "context.state";
String CLOSED = "context.closed";
String RECOVERED = "context.recovered";
String EXHAUSTED = "context.exhausted";
void setExhaustedOnly();
boolean isExhaustedOnly();
RetryContext getParent();
int getRetryCount();
Throwable getLastThrowable();
}
每一个策略都有对应的Context
。在Spring Retry
里,其实每一个策略都是单例来的。单例则会导致重试策略之间才产生冲突,不是单例,则多出了很多策略对象出来,增加了使用者的负担,这不是一个好的设计。
Spring Retry
采用了一个更加轻量级的做法,就是针对每一个需要重试的方法只new一个上下文Context
对象,然后在重试时,把这个Context
传到策略里,策略再根据这个Context
做重试,而且Spring Retry
还对这个Context
做了cache
。这样就相当于对重试的上下文做了优化。
private RetryContext doOpenInternal(RetryPolicy retryPolicy, RetryState state) {
RetryContext context = retryPolicy.open(RetrySynchronizationManager.getContext());
if (state != null) {
context.setAttribute(RetryContext.STATE_KEY, state.getKey());
}
if (context.hasAttribute(GLOBAL_STATE)) {
registerContext(context, state);
}
return context;
}
附录
参考
Spring retry
Guava retry
参考:https://java.jverson.com/tools/guava-retryer.html
添加依赖
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
示例
public boolean guavaTestTask(String param) {
// 构建重试实例 可以设置重试源且可以支持多个重试源 可以配置重试次数或重试超时时间,以及可以配置等待时间间隔
Retryer<Boolean> retriever = RetryerBuilder.<Boolean>newBuilder()
// 重试的异常类以及子类
.retryIfExceptionOfType(ServiceException.class)
// 根据返回值进行重试
.retryIfResult(result -> !result)
// 设置等待间隔时间,每次请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
// 设置最大重试次数,尝试请求3次
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
try {
//调用 真实的服务。
return retriever.call(() -> randomResult(param));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
原理
Retryer 接口
定义了执行方法重试的方法,并提供了多个配置方法来设置重试条件、等待策略、停止策略等。它包含一个call
方法,将需要重试的操作以Callable
形式传递给它。
public V call(Callable<V> callable) throws ExecutionException, RetryException {
long startTime = System.nanoTime();
//根据attemptNumber进行循环次数
for (int attemptNumber = 1; ; attemptNumber++) {
// 进入方法不等待,立即执行一次
Attempt<V> attempt;
try {
// 执行callable中的具体业务
// attemptTimeLimiter限制了每次尝试等待的时长
V result = attemptTimeLimiter.call(callable);
// 利用调用结果构造新的attempt
attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (Throwable t) {
attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
// 遍历自定义的监听器
for (RetryListener listener : listeners) {
listener.onRetry(attempt);
}
// 判断是否满足重试条件,来决定是否继续等待并进行重试
if (!rejectionPredicate.apply(attempt)) {
return attempt.get();
}
// 此时满足停止策略,因为还没有得到想要的结果,因此抛出异常
if (stopStrategy.shouldStop(attempt)) {
throw new RetryException(attemptNumber, attempt);
} else {
// 行默认的停止策略——线程休眠
long sleepTime = waitStrategy.computeSleepTime(attempt);
try {
// 也可以执行定义的停止策略
blockStrategy.block(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RetryException(attemptNumber, attempt);
}
}
}
}
RetryerBuilder 类
创建Retryer
实例的构建器类。通过RetryerBuilder
配置重试策略、条件和其他参数,最终构建出一个Retryer
实例。
Guava-Retry和Spring Retry 比较
- 框架来源:
Guava-Retry
:Guava-Retry是Google Guava
库的一部分,它提供了一种用于重试操作的机制。Spring Retry
:Spring Retry是Spring
框架的一个模块,专门用于在Spring应用程序中实现重试逻辑。
- 库依赖:
Guava-Retry
:需要添加Guava库的依赖。Spring Retry
:需要添加spring-retry
模块的依赖。
- 配置和注解:
Guava-Retry
:重试逻辑通过构建Retryer
实例并定义重试条件、等待策略等来配置。Spring Retry
:Spring Retry提供了注解(如@Retryable
、@Recover
等)和编程式配置来实现重试逻辑。
- 重试策略:
Guava-Retry
:Guava-Retry
基于结果和异常类型来定义重试条件。使用RetryerBuilder
来自定义重试策略。Spring Retry
:Spring Retry
使用注解来定义重试条件和相关属性,如最大重试次数、重试间隔等。
- 等待策略:
Guava-Retry
:Guava-Retry
提供了不同的等待策略(如固定等待、指数等待等),自己组合。Spring Retry
:Spring Retry
通过注解或编程式配置来指定等待时间。
- 适用范围:
Guava-Retry
:可以用于任何Java应用程序,不仅限于Spring框架。Spring Retry
:专门设计用于Spring应用程序中,可以与其他Spring功能(如Spring AOP
)集成。
- 依赖性:
Guava-Retry
:相对较轻量级,如果只需要重试功能,可以考虑使用Guava库的一部分。Spring Retry
:如果已使用Spring框架,可以方便地集成Spring Retry,但可能需要更多的Spring依赖。