重试机制
1.重试机制
- 什么时候、什么条件下重试?
- 确定下一次的重试时间是什么时候?
- 什么时候、什么条件下停止重试?
- 重试后要做什么?
1.1 重试时间
1.1.1 固定重试间隔
- 在每次重试之间使用固定的时间间隔。
1.1.2 指数退避重试
- 在每次失败后,重试的时间间隔会以指数级增加,以避免请求过于密集。
1.1.3 随机延迟重试
- 在每次重试之间使用随机的时间间隔,以避免请求的同时发生。
1.1.4 可变延迟重试
- 根据先前重试的成功或失败情况,动态调整下一次重试的延迟时间。比如,根据前一次的响应时间调整下一次重试的等待时间。
1.2 停止重试
- 重试次数是有上限的,否则随着报错的增多,系统同时发生的重试也会越来越多,造成雪崩
1.2.1 最大尝试次数
- 一般重试当达到最大次数时不再重试。
1.2.2 超时停止
- 重试达到最大时间的时候,停止重试。
1.3 重试工作
- 当重试次数超过上限时,往往还要进行其他的操作
1.3.1 通知告警
- 让开发者人工介入
1.3.2 降级容错
- 改为调用其他接口、或者执行其他操作
2.代码实现
2.1 多种重试策略实现
2.1.0 重试策略接口
/**
* 重试策略
*/
public interface RetryStrategy {
/**
* 重试
* @param callable
* @return
* @throws Exception
*/
RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}
2.1.1 引入Guava-Retrying 重试库
<!-- https://github.com/rholder/guava-retrying -->
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
2.1.2 不重试
/**
* 不重试-重试策略
*/
public class NoRetryStrategy implements RetryStrategy{
@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
return callable.call();
}
}
2.1.2 固定重试间隔
/**
* 固定时间间隔-重试策略
*/
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy{
/**
* 重试条件:使用 retryIfExceptionOfType 方法指定当出现 Exception 异常时重试。
* 重试等待策略:使用 withWaitStrategy 方法指定策略,选择 fixedWait 固定时间间隔策略。
* 重试停止策略:使用 withStopStrategy 方法指定策略,选择 stopAfterAttempt 超过最大重试次数停止。
* 重试工作:使用 withRetryListener 监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数。
* @param callable
* @return
* @throws Exception
*/
@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
.retryIfExceptionOfType(Exception.class)
.withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试次数 {}", attempt.getAttemptNumber());
}
})
.build();
return retryer.call(callable);
}
}
2.1.3 指数间隔等待时间
// 指数
WaitStrategies.exponentialWait(100, 30, TimeUnit.SECONDS)
2.1.4 随机间隔等待时间
// 随机等待时间
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 5, TimeUnit.SECONDS)
2.1.5 斐波那契等待时间
// 斐波那契退避
WaitStrategies.fibonacciWait(100, 2, TimeUnit.MINUTES)
2.2 重试策略键名常量
/**
* 重试策略键名常量
*/
public interface RetryStrategyKeys {
/**
* 不重试
*/
String NO = "no";
/**
* 固定时间间隔
*/
String FIXED_INTERVAL = "fixedInterval";
}
2.3 重试策略工厂
/**
* 重试策略工厂(用于获取重试器对象)
*/
public class RetryStrategyFactory {
static{
SpiLoader.load(RetryStrategy.class);
}
/**
* 默认重试器
*/
private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();
/**
* 获取实例
* @param key
* @return
*/
public static RetryStrategy getInstance(String key){
return SpiLoader.getInstance(RetryStrategy.class,key)
}
}
2.4 创建SPI配置文件
no=com.todaysaturday.fault.retry.NoRetryStrategy
fixedInterval=com.todaysaturday.fault.retry.FixedIntervalRetryStrategy
2.5 修改RpcConfig
/**
* 重试策略
*/
private String retryStrategy = RetryStrategyKeys.NO;
2.6 修改服务代理对象
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
// 负载均衡
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
// 将调用方法名(请求路径)作为负载均衡参数
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("methodName",rpcRequest.getMethodName());
ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList);
// 不取默认的
// ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// rpc 请求 使用重试机制
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);
// 发出TCP请求
// RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}