当前位置: 首页 > news >正文

Spring Retry机制详解

介绍

Spring框架提供了Spring Retry能让在项目工程中很方便的使用重试。

使用

1、引入pom

        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
            <version>1.3.2</version>
        </dependency>

2、启用retry

Configuration上加 @EnableRetry

3、示例


@Service
public class RetryService {

    private int times = 0;

    private Instant begin = null;


    @Retryable(value = BizException.class, maxAttempts = 3,
            backoff= @Backoff(value = 1500, maxDelay = 10000, multiplier = 2))
    public void service() {

        Instant instant = Instant.now();
        if(begin == null)
        {
            begin = instant;
        }
        times++;
        System.out.println(StrUtil.format(" call times: {} at {}. ", times, begin.until(instant, ChronoUnit.MILLIS) ));
        if (times < 5) {
            throw new BizException(StrUtil.format(" call times: {}  error. ", times));
        }
        
    }

    @Recover
    public void recover(BizException e){
        System.out.println("service retry after Recover => " + e.getMessage());
    }

}
    @PostMapping(value = "/retry")
    public R retry() {
        try {
            retryService.service();
        } catch (Exception ex) {
            ex.printStackTrace();
            return R.fail(ex.getMessage());
        }

        return R.ok();
    }

输出:

 call times: 1 at 0.                  #第1次调用
 call times: 2 at 1504.               #重试第1次 ,间隔 1.5 * 2 
 call times: 3 at 4506.               #重试第2次 ,间隔 
service retry after Recover =>  call times: 3  error.   # recover

重试次数改为7 ,输出:

 call times: 1 at 0. 
 call times: 2 at 1506.       # value = 1.5  ,第1 次间隔
 call times: 3 at 4507.       # 第2次间隔 1.5 * 2 = 3 
 call times: 4 at 10508.   # 第3次,1.5 * 2 * 2 = 6   
 call times: 5 at 20509.   #最大延迟就是10
 call times: 6 at 30511. 
 call times: 7 at 40512. 

4、注解说明

@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Retryable {

	/**  recover 方法名 */
	String recover() default "";

	/** 自定义 interceptor  bean name */
	String interceptor() default "";

	Class<? extends Throwable>[] value() default {};

	Class<? extends Throwable>[] include() default {};

	Class<? extends Throwable>[] exclude() default {};

	/** 统计报表 唯一 label  */
	String label() default "";
	boolean stateful() default false;
	int maxAttempts() default 3;

	/** maxAttempts 表达式,spel 表达式,例如: ${retry.attempts:5}  */
	String maxAttemptsExpression() default "";

	Backoff backoff() default @Backoff();

    /**
    1、直接使用 异常的属性。
    message.contains('you can retry this')
    2、其他方法判断:
    格式#{@bean.methodName(#root)}。methodName的返回值为boolean类型。#root是异常类,即用户可以在代码中判断是否进行重试
    */
	String exceptionExpression() default "";
	String[] listeners() default {};

}

  • @EnableRetry:启用重试,proxyTargetClass属性为true时(默认false),使用CGLIB代理。

  • @Retryable:标记当前方法会使用重试机制。

    • value:指定抛出那些异常才会触发重试(可以配置多个异常类型) 默认为空。
    • include:就是value,默认为空,当exclude也为空时,默认所有异常都可以触发重试
    • exclude:指定哪些异常不触发重试(可以配置多个异常类型),默认为空
    • maxAttempts:最大重试次数,默认3次(包括第一次调用)
    • backoff:重试等待策略 默认使用@Backoff注解
  • @Backoff:重试回退策略(立即重试还是等待一会再重试)

    • value: 重试的间隔时间(毫秒),默认为1000L
    • delayvalue的别名
    • maxDelay:重试次数之间的最大时间间隔,默认为0,如果小于delay的设置,则默认为30000L
    • multiplierdelay时间的间隔倍数,默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。
  • 不设置参数时,默认使用FixedBackOffPolicy(固定时间等待策略),重试等待1000ms
  • 只设置delay时,使用FixedBackOffPolicy,重试等待指定的毫秒数
  • 当设置delaymaxDealy时,重试等待在这两个值之间均态分布
  • 设置delaymaxDealymultiplier时,使用ExponentialBackOffPolicy(倍数等待策略)
  • 当设置multiplier不等于0时,同时也设置了random时,使用ExponentialRandomBackOffPolicy(随机倍数等待策略),从 [1, multiplier-1] 中的均匀分布中为每个延迟选择乘数
  • @Recover标记方法为@Retryable失败时的“兜底”处理方法
  • 传参与@Retryable的配置的value必须一样。
  • @Recover的标记方法的参数必须要与@Retryable注解value “形参”保持一致,第一入参为要重试的异常(一定要是@Retryable方法里抛出的异常或者异常父类),其他参数与@Retryable保持一致,返回值也要一样,否则无法执行!
  • @CircuitBreaker:用于标记方法,实现熔断模式
    • include 指定处理的异常类。默认为空
    • exclude指定不需要处理的异常。默认为空
    • vaue指定要重试的异常。默认为空
    • maxAttempts 最大重试次数。默认3次
    • openTimeout 配置熔断器打开的超时时间,默认5s,当超过openTimeout之后熔断器电路变成半打开状态(只要有一次重试成功,则闭合电路)
    • resetTimeout 配置熔断器重新闭合的超时时间,默认20s,超过这个时间断路器关闭

注意事项

  • 使用了@Retryable注解的方法直接实例化调用不会触发重试,要先将实现类实例化到Spring容器中,然后通过注入等方式使用

  • Spring-Retry是通过捕获异常的方式来触发重试的,@Retryable标注方法产生的异常不能使用try-catch捕获,要在方法上抛出异常,不然不会触发重试

  • 查询可以进行重试,写操作要慎重,除非业务方支持重入

原理

引入

@EnableRetry注解,引入Retry能力,导入了RetryConfiguration类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {
	boolean proxyTargetClass() default false;

}
@Component
public class RetryConfiguration extends AbstractPointcutAdvisor
		implements IntroductionAdvisor, BeanFactoryAware, InitializingBean {

	private Advice advice;					//

	private Pointcut pointcut;

	private RetryContextCache retryContextCache;

	private List<RetryListener> retryListeners;

	private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;

	private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;

	private Sleeper sleeper;

	private BeanFactory beanFactory;


 
 

	@Override
	public ClassFilter getClassFilter() {
		return this.pointcut.getClassFilter();
	}

	@Override
	public Class<?>[] getInterfaces() {
		return new Class[] { org.springframework.retry.interceptor.Retryable.class };
	}

	@Override
	public void validateInterfaces() throws IllegalArgumentException {
	}
 


 




}

RetryConfiguration 继承 AbstractPointcutAdvisor,实现了 IntroductionAdvisor,它有一个pointcut和一个advice,在IOC过程中会根据PointcutAdvisor类来对Bean进行Pointcut的过滤,然后生成对应的AOP代理类,用advice来加强处理。

初始化

afterPropertiesSet方法进行初始化。

	@Override
	public void afterPropertiesSet() throws Exception {
        //RetryContextCache
		this.retryContextCache = findBean(RetryContextCache.class);
		this.methodArgumentsKeyGenerator = findBean(MethodArgumentsKeyGenerator.class);
		this.newMethodArgumentsIdentifier = findBean(NewMethodArgumentsIdentifier.class);
        //RetryListener
		this.retryListeners = findBeans(RetryListener.class);
		this.sleeper = findBean(Sleeper.class);
		Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
        //注解为  Retryable
		retryableAnnotationTypes.add(Retryable.class);
        //构造 pointcut
		this.pointcut = buildPointcut(retryableAnnotationTypes);
        //构造 advice
		this.advice = buildAdvice();
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(this.beanFactory);
		}
	}

buildPointcut 和 buildAdvice

	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {
            //根据 注解,构造 Pointcut
			Pointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);
			if (result == null) {
				result = new ComposablePointcut(filter);
			}
			else {
				result.union(filter);
			}
		}
		return result;
	}

	protected Advice buildAdvice() {
        //构造一个  AnnotationAwareRetryOperationsInterceptor
		AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
		if (this.retryContextCache != null) {
			interceptor.setRetryContextCache(this.retryContextCache);
		}
		if (this.retryListeners != null) {
			interceptor.setListeners(this.retryListeners);
		}
		if (this.methodArgumentsKeyGenerator != null) {
			interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);
		}
		if (this.newMethodArgumentsIdentifier != null) {
			interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);
		}
		if (this.sleeper != null) {
			interceptor.setSleeper(this.sleeper);
		}
		return interceptor;
	}

pointcut



	private final class AnnotationClassOrMethodPointcut extends StaticMethodMatcherPointcut {

		private final MethodMatcher methodResolver;

		AnnotationClassOrMethodPointcut(Class<? extends Annotation> annotationType) {
			this.methodResolver = new AnnotationMethodMatcher(annotationType);
			setClassFilter(new AnnotationClassOrMethodFilter(annotationType));
		}

		@Override
		public boolean matches(Method method, Class<?> targetClass) {
			return getClassFilter().matches(targetClass) || this.methodResolver.matches(method, targetClass);
		}

		@Override
		public boolean equals(Object other) {
			if (this == other) {
				return true;
			}
			if (!(other instanceof AnnotationClassOrMethodPointcut)) {
				return false;
			}
			AnnotationClassOrMethodPointcut otherAdvisor = (AnnotationClassOrMethodPointcut) other;
			return ObjectUtils.nullSafeEquals(this.methodResolver, otherAdvisor.methodResolver);
		}

	}


private final class AnnotationClassOrMethodFilter extends AnnotationClassFilter {

		private final AnnotationMethodsResolver methodResolver;

		AnnotationClassOrMethodFilter(Class<? extends Annotation> annotationType) {
			super(annotationType, true);
			this.methodResolver = new AnnotationMethodsResolver(annotationType);
		}

		@Override
		public boolean matches(Class<?> clazz) {
            // 类的方法上 标记了指定注解。
			return super.matches(clazz) || this.methodResolver.hasAnnotatedMethods(clazz);
		}

	}


	private static class AnnotationMethodsResolver {

		private Class<? extends Annotation> annotationType;

		public AnnotationMethodsResolver(Class<? extends Annotation> annotationType) {
			this.annotationType = annotationType;
		}

		public boolean hasAnnotatedMethods(Class<?> clazz) {
			final AtomicBoolean found = new AtomicBoolean(false);
            //遍历所有的方法,
			ReflectionUtils.doWithMethods(clazz, new MethodCallback() {
				@Override
				public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
					if (found.get()) {
						return;
					}
                    //method 有注解。
					Annotation annotation = AnnotationUtils.findAnnotation(method,
							AnnotationMethodsResolver.this.annotationType);
					if (annotation != null) {
						found.set(true);
					}
				}
			});
			return found.get();
		}

	}

AnnotationAwareRetryOperationsInterceptor

buildAdvice()方法会构造一个AnnotationAwareRetryOperationsInterceptor 实例。用于做增强操作。

public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {

	private static final TemplateParserContext PARSER_CONTEXT = new TemplateParserContext();

	private static final SpelExpressionParser PARSER = new SpelExpressionParser();

	private static final MethodInterceptor NULL_INTERCEPTOR = new MethodInterceptor() {
		@Override
		public Object invoke(MethodInvocation methodInvocation) throws Throwable {
			throw new OperationNotSupportedException("Not supported");
		}
	};

	private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();

    //用于缓存每个object的用于 增强的方法。
	private final ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>> delegates = new ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>>();

	private RetryContextCache retryContextCache = new MapRetryContextCache();

	private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;

	private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;

	private Sleeper sleeper;

	private BeanFactory beanFactory;

	private RetryListener[] globalListeners;
    
}

invoke
	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
        //构造代理
		MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
		if (delegate != null) {
			return delegate.invoke(invocation);
		}
		else {
			return invocation.proceed();
		}
	}
	
    private MethodInterceptor getDelegate(Object target, Method method) {
        //缓存
		ConcurrentMap<Method, MethodInterceptor> cachedMethods = this.delegates.get(target);
		if (cachedMethods == null) {
			cachedMethods = new ConcurrentHashMap<Method, MethodInterceptor>();
		}
		MethodInterceptor delegate = cachedMethods.get(method);
		if (delegate == null) {
			MethodInterceptor interceptor = NULL_INTERCEPTOR;
            //获取方法的 Retryable 注解。
			Retryable retryable = AnnotatedElementUtils.findMergedAnnotation(method, Retryable.class);
			if (retryable == null) {
                //父类?
				retryable = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Retryable.class);
			}
			if (retryable == null) {
                //在类上查找
				retryable = findAnnotationOnTarget(target, method, Retryable.class);
			}
			if (retryable != null) {
                //如果有 interceptor,则直接使用
				if (StringUtils.hasText(retryable.interceptor())) {
					interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
				}
				else if (retryable.stateful()) {
					interceptor = getStatefulInterceptor(target, method, retryable);
				}
				else {
					interceptor = getStatelessInterceptor(target, method, retryable);
				}
			}
			cachedMethods.putIfAbsent(method, interceptor);
			delegate = cachedMethods.get(method);
		}
		this.delegates.putIfAbsent(target, cachedMethods);
		return delegate == NULL_INTERCEPTOR ? null : delegate;
	}

getStatefulInterceptor 和 getStatelessInterceptor
	private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {
       //生成一个RetryTemplate
		RetryTemplate template = createTemplate(retryable.listeners());
        //生成retryPolicy  
		template.setRetryPolicy(getRetryPolicy(retryable));
        //生成backoffPolicy 
		template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
        //RetryOperationsInterceptor
		return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label())
				.recoverer(getRecoverer(target, method)).build();
	}

private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
		RetryTemplate template = createTemplate(retryable.listeners());
		template.setRetryContextCache(this.retryContextCache);

    	//
		CircuitBreaker circuit = AnnotatedElementUtils.findMergedAnnotation(method, CircuitBreaker.class);
		if (circuit == null) {
			circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);
		}
		if (circuit != null) {
            
			RetryPolicy policy = getRetryPolicy(circuit);
			CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
			breaker.setOpenTimeout(getOpenTimeout(circuit));
			breaker.setResetTimeout(getResetTimeout(circuit));
			template.setRetryPolicy(breaker);
			template.setBackOffPolicy(new NoBackOffPolicy());
			String label = circuit.label();
			if (!StringUtils.hasText(label)) {
				label = method.toGenericString();
			}
			return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit"))
					.retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();
		}
		RetryPolicy policy = getRetryPolicy(retryable);
		template.setRetryPolicy(policy);
		template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
		String label = retryable.label();
		return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator)
				.newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label)
				.recoverer(getRecoverer(target, method)).build();
	}

RetryOperationsInterceptor

invoke

	@Override
	public Object invoke(final MethodInvocation invocation) throws Throwable {

		String name;
		if (StringUtils.hasText(this.label)) {
			name = this.label;
		}
		else {
			name = invocation.getMethod().toGenericString();
		}
		final String label = name;
		// RetryCallback,主要调用了invocation的proceed()方法
		RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
				invocation, label) {

			@Override
			public Object doWithRetry(RetryContext context) throws Exception {

				context.setAttribute(RetryContext.NAME, this.label);

				/*
				 * If we don't copy the invocation carefully it won't keep a reference to
				 * the other interceptors in the chain. We don't have a choice here but to
				 * specialise to ReflectiveMethodInvocation (but how often would another
				 * implementation come along?).
				 */
				if (this.invocation instanceof ProxyMethodInvocation) {
					context.setAttribute("___proxy___", ((ProxyMethodInvocation) this.invocation).getProxy());
					try {
						return ((ProxyMethodInvocation) this.invocation).invocableClone().proceed();
					}
					catch (Exception e) {
						throw e;
					}
					catch (Error e) {
						throw e;
					}
					catch (Throwable e) {
						throw new IllegalStateException(e);
					}
				}
				else {
					throw new IllegalStateException(
							"MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "
									+ "so please raise an issue if you see this exception");
				}
			}

		};

        // recoverer
		if (this.recoverer != null) {
			ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(invocation.getArguments(),  // 真实调用的 参数
					this.recoverer);
			try {
				Object recovered = this.retryOperations.execute(retryCallback, recoveryCallback);
				return recovered;
			}
			finally {
				RetryContext context = RetrySynchronizationManager.getContext();
				if (context != null) {
					context.removeAttribute("__proxy__");
				}
			}
		}

        //最终还是进入到retryOperations的execute方法,这个retryOperations就是在之前的builder set进来的RetryTemplate。 
		return this.retryOperations.execute(retryCallback);

	}



private static final class ItemRecovererCallback implements RecoveryCallback<Object> {

		private final Object[] args;

		private final MethodInvocationRecoverer<?> recoverer;

		/**
		 * @param args the item that failed.
		 */
		private ItemRecovererCallback(Object[] args, MethodInvocationRecoverer<?> recoverer) {
			this.args = Arrays.asList(args).toArray();
			this.recoverer = recoverer;
		}

		@Override
		public Object recover(RetryContext context) {
            // this.args
			return this.recoverer.recover(this.args, context.getLastThrowable());
		}

	}

无论是RetryOperationsInterceptor还是StatefulRetryOperationsInterceptor,最终的拦截处理逻辑还是调用到RetryTemplateexecute方法,从名字也看出来,RetryTemplate作为一个模板类,里面包含了重试统一逻辑。

RetryTemplate

RetryTemplateexecute方法主要就是参数的不同。核心就是3个参数:RetryCallbackRecoveryCallbackRetryState

	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {
		return doExecute(retryCallback, null, null);
	}

	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback) throws E {
		return doExecute(retryCallback, recoveryCallback, null);
	}
	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState)
			throws E, ExhaustedRetryException {
		return doExecute(retryCallback, null, retryState);
	}

	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E, ExhaustedRetryException {
		return doExecute(retryCallback, recoveryCallback, retryState);
	}

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

		RetryPolicy retryPolicy = this.retryPolicy;
		BackOffPolicy backOffPolicy = this.backOffPolicy;

		//新建一个RetryContext来保存本轮重试的上下文 
		RetryContext context = open(retryPolicy, state);
		if (this.logger.isTraceEnabled()) {
			this.logger.trace("RetryContext retrieved: " + context);
		}

		// Make sure the context is available globally for clients who need
		// it...
		RetrySynchronizationManager.register(context);

		Throwable lastException = null;

		boolean exhausted = false;
		try {

			 //如果有注册RetryListener,则会调用它的open方法,给调用者一个通知。 
			boolean running = doOpenInterceptors(retryCallback, context);

			if (!running) {
				throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
			}

			// Get or Start the backoff context...
			BackOffContext backOffContext = null;
			Object resource = context.getAttribute("backOffContext");

			if (resource instanceof BackOffContext) {
				backOffContext = (BackOffContext) resource;
			}

			if (backOffContext == null) {
				backOffContext = backOffPolicy.start(context);
				if (backOffContext != null) {
					context.setAttribute("backOffContext", backOffContext);
				}
			}

  			//判断能否重试,就是调用RetryPolicy的canRetry方法来判断。  
   			//这个循环会直到原方法不抛出异常,或不需要再重试 
			while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

				try {
					if (this.logger.isDebugEnabled()) {
						this.logger.debug("Retry: count=" + context.getRetryCount());
					}
					//清除上次记录的异常
					// the close interceptors will not think we failed...
					lastException = null;
                      //doWithRetry方法,一般来说就是原方法 
					return retryCallback.doWithRetry(context);
				}
				catch (Throwable e) {
					//记录异常
					lastException = e;

					try {
                         //记录异常信息 
						registerThrowable(retryPolicy, state, context, e);
					}
					catch (Exception ex) {
						throw new TerminatedRetryException("Could not register throwable", ex);
					}
					finally {
                        //调用RetryListener的onError方法 
						doOnErrorInterceptors(retryCallback, context, e);
					}
					//再次判断能否重试 
					if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
						try {
                            //如果可以重试则走退避策略 
							backOffPolicy.backOff(backOffContext);
						}
						catch (BackOffInterruptedException ex) {
							lastException = e;
							// back off was prevented by another thread - fail the retry
							if (this.logger.isDebugEnabled()) {
								this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
							}
							throw ex;
						}
					}

					if (this.logger.isDebugEnabled()) {
						this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
					}

					if (shouldRethrow(retryPolicy, context, state)) {
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
						}
						throw RetryTemplate.<E>wrapIfNecessary(e);
					}

				}

				/*
				 * A stateful attempt that can retry may rethrow the exception before now,
				 * but if we get this far in a stateful retry there's a reason for it,
				 * like a circuit breaker or a rollback classifier.
				 */
				if (state != null && context.hasAttribute(GLOBAL_STATE)) {
					break;
				}
			}   // END WHILE

			if (state == null && this.logger.isDebugEnabled()) {
				this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
			}

			exhausted = true;
             //重试结束后如果有兜底Recovery方法则执行,否则抛异常 
			return handleRetryExhausted(recoveryCallback, context, state);

		}  //END FIRST TRY
		catch (Throwable e) {
			throw RetryTemplate.<E>wrapIfNecessary(e);
		}
		finally {
             //处理一些关闭逻辑
			close(retryPolicy, context, state, lastException == null || exhausted);
            //调用RetryListener的close方法 
			doCloseInterceptors(retryCallback, context, lastException);
			RetrySynchronizationManager.clear();
		}

	}

重试策略

用来判断当方法调用异常时是否需要重试。常用策略有:

  • SimpleRetryPolicy :默认最多重试3次
  • TimeoutRetryPolicy :默认在1秒内失败都会重试
  • ExpressionRetryPolicy :符合表达式就会重试
  • CircuitBreakerRetryPolicy :增加了熔断的机制,如果不在熔断状态,则允许重试
  • CompositeRetryPolicy :可以组合多个重试策略
  • NeverRetryPolicy :从不重试(也是一种重试策略哈)
  • AlwaysRetryPolicy :总是重试

重试策略最重要的方法就是 canRetry

public interface RetryPolicy extends Serializable {
	boolean canRetry(RetryContext context);

	RetryContext open(RetryContext parent);
	void close(RetryContext context);
	void registerThrowable(RetryContext context, Throwable throwable);
}
//SimpleRetryPolicy
	@Override
	public boolean canRetry(RetryContext context) {
		Throwable t = context.getLastThrowable();
        //判断抛出的异常是否符合重试的异常 
		return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;
	}

//ExpressionRetryPolicy extends SimpleRetryPolicy 
	public ExpressionRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
			boolean traverseCauses, String expressionString, boolean defaultValue) {
		super(maxAttempts, retryableExceptions, traverseCauses, defaultValue);
		Assert.notNull(expressionString, "'expressionString' cannot be null");
		this.expression = getExpression(expressionString);
	}

退避策略

控制下一次的间隔时间。常用策略有:

  • FixedBackOffPolicy 默认固定延迟1秒后执行下一次重试
  • ExponentialBackOffPolicy 指数递增延迟执行重试,默认初始0.1秒,系数是2,那么下次延迟0.2秒,再下次就是延迟0.4秒,如此类推,最大30秒。
  • ExponentialRandomBackOffPolicy 在上面那个策略上增加随机性
  • UniformRandomBackOffPolicy 这个跟上面的区别就是,上面的延迟会不停递增,这个只会在固定的区间随机
  • StatelessBackOffPolicy 这个说明是无状态的,所谓无状态就是对上次的退避无感知,从它下面的子类也能看出来

退避策略主要方法是 backOff

public interface BackOffPolicy {
	BackOffContext start(RetryContext context);

	void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;

}

FixedBackOffPolicy

public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {
	private static final long DEFAULT_BACK_OFF_PERIOD = 1000L;
	private volatile long backOffPeriod = DEFAULT_BACK_OFF_PERIOD;

	private Sleeper sleeper = new ThreadWaitSleeper();

	public FixedBackOffPolicy withSleeper(Sleeper sleeper) {
		FixedBackOffPolicy res = new FixedBackOffPolicy();
		res.setBackOffPeriod(backOffPeriod);
		res.setSleeper(sleeper);
		return res;
	}
 
	protected void doBackOff() throws BackOffInterruptedException {
		try {
            //sleep 指定时间  
            // 内部:Thread.sleep(backOffPeriod);
			sleeper.sleep(backOffPeriod);
		}
		catch (InterruptedException e) {
			throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
		}
	}

}
//ExponentialBackOffPolicy
	@Override
	public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException {
		ExponentialBackOffContext context = (ExponentialBackOffContext) backOffContext;
		try {
            // ExponentialBackOffContext
			long sleepTime = context.getSleepAndIncrement();
			if (this.logger.isDebugEnabled()) {
				this.logger.debug("Sleeping for " + sleepTime);
			}
			this.sleeper.sleep(sleepTime);
		}
		catch (InterruptedException e) {
			throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
		}
	}

//ExponentialBackOffContext
		public synchronized long getSleepAndIncrement() {
            // this.interval:本次间隔时间。在上一次结束时计算。
			long sleep = this.interval;
			if (sleep > this.maxInterval) {
				sleep = this.maxInterval;
			}
			else {
				this.interval = getNextInterval();
			}
			return sleep;
		}
		protected long getNextInterval() {
			return (long) (this.interval * this.multiplier);
		}

RetryContext

RetryContext主要用于记录一些状态。

public interface RetryContext extends AttributeAccessor {
	String NAME = "context.name";
	String STATE_KEY = "context.state";
	String CLOSED = "context.closed";
	String RECOVERED = "context.recovered";
	String EXHAUSTED = "context.exhausted";

	void setExhaustedOnly();
	boolean isExhaustedOnly();
	RetryContext getParent();
	int getRetryCount();
	Throwable getLastThrowable();

}

每一个策略都有对应的Context。在Spring Retry里,其实每一个策略都是单例来的。单例则会导致重试策略之间才产生冲突,不是单例,则多出了很多策略对象出来,增加了使用者的负担,这不是一个好的设计。

Spring Retry采用了一个更加轻量级的做法,就是针对每一个需要重试的方法只new一个上下文Context对象,然后在重试时,把这个Context传到策略里,策略再根据这个Context做重试,而且Spring Retry还对这个Context做了cache。这样就相当于对重试的上下文做了优化。

	private RetryContext doOpenInternal(RetryPolicy retryPolicy, RetryState state) {
		RetryContext context = retryPolicy.open(RetrySynchronizationManager.getContext());
		if (state != null) {
			context.setAttribute(RetryContext.STATE_KEY, state.getKey());
		}
		if (context.hasAttribute(GLOBAL_STATE)) {
			registerContext(context, state);
		}
		return context;
	}

附录

参考

Spring retry

Guava retry

参考:https://java.jverson.com/tools/guava-retryer.html

添加依赖

    <dependency>
      <groupId>com.github.rholder</groupId>
      <artifactId>guava-retrying</artifactId>
      <version>2.0.0</version>
    </dependency>

示例

    public boolean guavaTestTask(String param) {
        // 构建重试实例 可以设置重试源且可以支持多个重试源 可以配置重试次数或重试超时时间,以及可以配置等待时间间隔
        Retryer<Boolean> retriever = RetryerBuilder.<Boolean>newBuilder()
                // 重试的异常类以及子类
                .retryIfExceptionOfType(ServiceException.class)
                // 根据返回值进行重试
                .retryIfResult(result -> !result)
                // 设置等待间隔时间,每次请求间隔1s
                .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
                // 设置最大重试次数,尝试请求3次
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                .build();
        try {
            //调用 真实的服务。
            return retriever.call(() -> randomResult(param));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

原理

Retryer 接口

定义了执行方法重试的方法,并提供了多个配置方法来设置重试条件、等待策略、停止策略等。它包含一个call方法,将需要重试的操作以Callable形式传递给它。

public V call(Callable<V> callable) throws ExecutionException, RetryException {
    long startTime = System.nanoTime();
    //根据attemptNumber进行循环次数
    for (int attemptNumber = 1; ; attemptNumber++) {
        // 进入方法不等待,立即执行一次
        Attempt<V> attempt;
        try {
            // 执行callable中的具体业务
            // attemptTimeLimiter限制了每次尝试等待的时长
            V result = attemptTimeLimiter.call(callable);
            // 利用调用结果构造新的attempt
            attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
        } catch (Throwable t) {
            attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
        }
        // 遍历自定义的监听器
        for (RetryListener listener : listeners) {
            listener.onRetry(attempt);
        }
        // 判断是否满足重试条件,来决定是否继续等待并进行重试
        if (!rejectionPredicate.apply(attempt)) {
            return attempt.get();
        }
        // 此时满足停止策略,因为还没有得到想要的结果,因此抛出异常
        if (stopStrategy.shouldStop(attempt)) {
            throw new RetryException(attemptNumber, attempt);
        } else {
            // 行默认的停止策略——线程休眠
            long sleepTime = waitStrategy.computeSleepTime(attempt);
            try {
                // 也可以执行定义的停止策略
                blockStrategy.block(sleepTime);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RetryException(attemptNumber, attempt);
            }
        }
    }
}
RetryerBuilder 类

创建Retryer实例的构建器类。通过RetryerBuilder配置重试策略、条件和其他参数,最终构建出一个Retryer实例。

Guava-Retry和Spring Retry 比较

  • 框架来源:
    • Guava-Retry:Guava-Retry是Google Guava库的一部分,它提供了一种用于重试操作的机制。
    • Spring Retry:Spring Retry是Spring框架的一个模块,专门用于在Spring应用程序中实现重试逻辑。
  • 库依赖:
    • Guava-Retry:需要添加Guava库的依赖。
    • Spring Retry:需要添加spring-retry模块的依赖。
  • 配置和注解:
    • Guava-Retry:重试逻辑通过构建Retryer实例并定义重试条件、等待策略等来配置。
    • Spring Retry:Spring Retry提供了注解(如@Retryable@Recover等)和编程式配置来实现重试逻辑。
  • 重试策略:
    • Guava-RetryGuava-Retry基于结果异常类型来定义重试条件。使用RetryerBuilder来自定义重试策略。
    • Spring RetrySpring Retry使用注解来定义重试条件和相关属性,如最大重试次数、重试间隔等。
  • 等待策略:
    • Guava-RetryGuava-Retry提供了不同的等待策略(如固定等待、指数等待等),自己组合。
    • Spring RetrySpring Retry通过注解或编程式配置来指定等待时间。
  • 适用范围:
    • Guava-Retry:可以用于任何Java应用程序,不仅限于Spring框架。
    • Spring Retry:专门设计用于Spring应用程序中,可以与其他Spring功能(如Spring AOP)集成。
  • 依赖性:
    • Guava-Retry:相对较轻量级,如果只需要重试功能,可以考虑使用Guava库的一部分。
    • Spring Retry:如果已使用Spring框架,可以方便地集成Spring Retry,但可能需要更多的Spring依赖。

相关文章:

  • 鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统
  • vue3封装el-pagination分页组件
  • Python 一些常见的字符串操作
  • 【VS Code+Verilog+Vivado使用】(1)常用插件
  • 【python】爬取百度热搜排行榜Top50+可视化【附源码】【送数据分析书籍】
  • CSAPP shelllab
  • [docker] Docker 网络
  • qemu + vscode图形化调试linux kernel
  • openssl3.2/test/certs - 056 - all DNS-like CNs allowed by CA1, no SANs
  • 项目解决方案:市小区高清视频监控平台联网整合设计方案(上)
  • python flask request教程
  • JS-Window常见对象
  • 精要图示:园区金融数字化服务蓝图,以园区为支点推动信贷业务增长
  • Linux:简单聊聊线程调度
  • 百度搜索智能精选是什么东西、怎么加入?
  • Linux实验记录:使用RAID(独立冗余磁盘阵列)
  • PyInstaller 将 Python 程序生成可直接运行的程序
  • Emergent Abilities of Large Language Models 机翻mark
  • IDEA 构建开发环境
  • 《微信小程序开发从入门到实战》学习九十六
  • 保证断电、碰撞等事故中车门系统能够开启!汽车车门把手将迎来强制性国家标准
  • 75岁亚当·费舍尔坐镇,再现80分钟马勒《第九交响曲》
  • 司法部:持续规范行政执法行为,加快制定行政执法监督条例
  • 太原一高中生指出博物馆多件藏品标识不当,馆方已邀请他和专家共同探讨
  • “子宫内膜异位症”相关论文男性患者样本超六成?福建省人民医院发布情况说明
  • 水中托举救出落水孩童后遇难,42岁退役军人高武被确认为见义勇为