9.重试机制

重试机制

1.重试机制

  • 什么时候、什么条件下重试?
  • 确定下一次的重试时间是什么时候?
  • 什么时候、什么条件下停止重试?
  • 重试后要做什么?

1.1 重试时间

1.1.1 固定重试间隔

  • 在؜每次重试之间使用固定的‌时间间隔。

1.1.2 指数退避重试

  • 在每次失败后؜,重试的时间间隔会以指数‌级增加,以避免请求过于密集。

1.1.3 随机延迟重试

  • 在每次重试之؜间使用随机的时间间隔‌,以避免请求的同时发生。

1.1.4 可变延迟重试

  • 根据先前⁠重试的成功或失败情况,动态调整下一次重؜试的延迟时间。比如,根据前一次的响应时‌间调整下一次重试的等待时间。

1.2 停止重试

  • 重试次数是有上限的‍,否则随着报错的增⁠多,系统同时发生的؜重试也会越来越多,‌造成雪崩

1.2.1 最大尝试次数

  • 一般重试当达到最大次数时不再重试。

1.2.2 超时停止

  • 重试达到最大时间的时候,停止重试。

1.3 重试工作

  • 当重试次数超过‍上限时,往往还要进⁠行其他的操作

1.3.1 通知告警

  • 让开发者人工介入

1.3.2 降级容错

  • 改为调用其他接口、或者执行其他操作

2.代码实现

2.1 多种重试策略实现

2.1.0 重试策略接口

/**
 * 重试策略
 */
public interface RetryStrategy {

    /**
     * 重试
     * @param callable
     * @return
     * @throws Exception
     */
    RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}

2.1.1 引入Guava-Retrying 重试库

<!-- https://github.com/rholder/guava-retrying -->
<dependency>
    <groupId>com.github.rholder</groupId>
    <artifactId>guava-retrying</artifactId>
    <version>2.0.0</version>
</dependency>

2.1.2 不重试

/**
 * 不重试-重试策略
 */
public class NoRetryStrategy implements RetryStrategy{
    @Override
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
        return callable.call();
    }
}

2.1.2 固定重试间隔


/**
 * 固定时间间隔-重试策略
 */
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy{

    /**
     * 重试条件:使用 retryIfExceptionOfType 方法指定当出现 Exception 异常时重试。
     * 重试等待策略:使用 withWaitStrategy 方法指定策略,选择 fixedWait 固定时间间隔策略。
     * 重试停止策略:使用 withStopStrategy 方法指定策略,选择 stopAfterAttempt 超过最大重试次数停止。
     * 重试工作:使用 withRetryListener 监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数。
     * @param callable
     * @return
     * @throws Exception
     */
    @Override
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {

        Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
                .retryIfExceptionOfType(Exception.class)
                .withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        log.info("重试次数 {}", attempt.getAttemptNumber());
                    }
                })
                .build();
        return retryer.call(callable);
    }
}

2.1.3 指数间隔等待时间

// 指数
WaitStrategies.exponentialWait(100, 30, TimeUnit.SECONDS)

2.1.4 随机间隔等待时间

// 随机等待时间
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 5, TimeUnit.SECONDS)

2.1.5 斐波那契等待时间

// 斐波那契退避
WaitStrategies.fibonacciWait(100, 2, TimeUnit.MINUTES)

2.2 重试策略键名常量

/**
 * 重试策略键名常量
 */
public interface RetryStrategyKeys {

    /**
     * 不重试
     */
    String NO = "no";

    /**
     * 固定时间间隔
     */
    String FIXED_INTERVAL = "fixedInterval";
}

2.3 重试策略工厂

/**
 * 重试策略工厂(用于获取重试器对象)
 */
public class RetryStrategyFactory {
    static{
        SpiLoader.load(RetryStrategy.class);
    }

    /**
     * 默认重试器
     */
    private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();

    /**
     * 获取实例
     * @param key
     * @return
     */
    public static RetryStrategy getInstance(String key){
        return SpiLoader.getInstance(RetryStrategy.class,key)
    }
    
}

2.4 创建SPI配置文件

  • com.todaysaturday.fault.retry.RetryStrategy

no=com.todaysaturday.fault.retry.NoRetryStrategy
fixedInterval=com.todaysaturday.fault.retry.FixedIntervalRetryStrategy

2.5 修改RpcConfig

/**
 * 重试策略
 */
private String retryStrategy = RetryStrategyKeys.NO;

2.6 修改服务代理对象

/**
 * JDK 动态代理
 */
public class JdkServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 本地处理 Object 类的方法,避免 RPC 分派
        if (method.getDeclaringClass() == Object.class) {
            switch (method.getName()) {
                case "toString":
                    // 为代理对象本身提供一个有意义的字符串表示
                    return proxy.getClass().getName() + "@" +
                            Integer.toHexString(System.identityHashCode(proxy)) +
                            ", with InvocationHandler " + this;
                case "hashCode":
                    // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
                    return System.identityHashCode(proxy);
                case "equals":
                    // 委托给处理器的 equals 或提供代理特定的相等性判断
                    return proxy == args[0]; // 代理对象的默认引用相等性
                default:
                    // 对于其他 Object 方法,可以考虑在处理器上调用它们
                    return method.invoke(this, args);
            }
        }
        // 1.指定序列化器
//        JdkSerializer serializer = new JdkSerializer();
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());


        String serviceName = method.getDeclaringClass().getName();

        // 2.发出请求(全部使用动态的参数)
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(serviceName)
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        try {
            // 从注册中心获取服务提供者请求地址
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            // 实例化注册中心对象
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            // 服务发现
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

            if(CollUtil.isEmpty(serviceMetaInfoList)){
                throw new RuntimeException("暂无服务地址");
            }

            // 负载均衡
            LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
            // 将调用方法名(请求路径)作为负载均衡参数
            Map<String, Object> requestParams = new HashMap<>();
            requestParams.put("methodName",rpcRequest.getMethodName());
            ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList);

            // 不取默认的
//            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // rpc 请求 使用重试机制
            RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
            RpcResponse rpcResponse = retryStrategy.doRetry(() ->
                VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
            );
            // 发出TCP请求
//            RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
            return rpcResponse.getData();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }
}