容错机制
1. 容错机制说明
- 容错是指系统在出现异常情况时,可以通过一定的策略保证系统仍然稳定运行,从而提高系统的可靠性和健壮性。
1.1 容错策略
1.1.1 Fail-Over 故障转移
- 一次调用失败后,切换一个其他节点再次进行调用,也算是一种重试。
1.1.2 Fail-Back 失败自动恢复
- 系统的某个功能出现调用失败或错误时,通过其他的方法,恢复该功能的正常。可以理解为降级,比如重试、调用其他服务等。
1.1.3 Fail-Safe 静默处理
- 系统出现部分非重要功能的异常时,直接忽略掉,不做任何处理,就像错误没有发生过一样。
1.1.4 Fail-Fast 快速失败
- 系统出现调用错误时,立刻报错,交给外层调用方处理。
1.2 容错实现方式
1.2.1 重试
- 重试本质上也是一种容错的降级策略,系统错误后再试一次。
1.2.2 限流
- 当系统压力过大、已经出现部分错误时,通过限制执行操作(接受请求)的频率或数量,对系统进行保护。
1.2.3 降级
- 系统出现错误后,改为执行其他更稳定可用的操作。也可以叫做 “兜底” 或 “有损服务”,这种方式的本质是:即使牺牲一定的服务质量,也要保证系统的部分功能可用,保证基本的功能需求得到满足。
1.2.4 熔断
- 系统出现故障或异常时,暂时中断对该服务的请求,而是执行其他操作,以避免连锁故障
1.2.5 超时控制
- 如果请求或操作长时间没处理完成,就进行中断,防止阻塞和资源占用。
1.3 容错方案设计
1.3.1 先容错再重试
- 当系统发生异常时,首先会触发容错机制,比如记录日志、进行告警等,然后可以选择是否进行重试。
1.3.2 先重试再容错
- 在发生错误后,首先尝试重试操作,如果重试多次仍然失败,则触发容错机制,比如记录日志、进行告警等 、
1.3.3 举例
- 系统调用服务 A 出现网络错误,使用容错策略 - 重试。
- 重试 3 次失败后,使用其他容错策略 - 降级。
- 系统改为调用不依赖网络的服务 B,完成操作。
2.代码实现
2.1 多种容错策略实现
2.1.1 容错策略接口
/**
* 容错策略
*/
public interface TolerantStrategy {
/**
* 容错
* @param context
* @param e
* @return
*/
RpcResponse doTolerant(Map<String,Object> context,Exception e);
}
2.1.2 Fail-Fast 快速失败
- 遇到异常,将异常再次抛出,交给外层处理
/**
* 快速失败-容错策略(立即通知外层调用方)
*/
public class FailFastTolerantStrategy implements TolerantStrategy{
@Override
public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
throw new RuntimeException("服务报错",e);
}
}
2.1.3 Fail-Safe 静默处理
- 遇到异常,记录日志,正常返回响应对象
/**
* 静默处理-容错策略
*/
@Slf4j
public class FailSafeTolerantStrategy implements TolerantStrategy{
@Override
public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
log.info("静默处理异常",e);
return new RpcResponse();
}
}
2.1.4 Fail-Back 故障恢复
/**
* 故障恢复-容错策略(降级到其他服务)
*/
public class FailBackTolerantStrategy implements TolerantStrategy{
@Override
public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
return null;
}
}
2.1.5 Fail-Over 故障转移
/**
* 故障转移-容错策略(转移到其他服务节点)
*/
public class FailOverTolerantStrategy implements TolerantStrategy{
@Override
public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
return null;
}
}
2.2 容错策略键名常量
/**
* 容错策略键名常量
*/
public interface TolerantStrategyKeys {
/**
* 故障恢复
*/
String FAIL_BACK = "failBack";
/**
* 快速失败
*/
String FAIL_FAST = "failFast";
/**
* 故障转移
*/
String FAIL_Over = "failOver";
/**
* 静默处理
*/
String FAIL_SAFE = "failSafe";
}
2.3 容错策略工厂
/**
* 容错策略工厂(工厂模式,用于获取容错策略对象)
*/
public class TolerantStrategyFactory {
static{
SpiLoader.load(TolerantStrategy.class);
}
/**
* 默认容错策略
*/
private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();
/**
* 获取实例
* @param key
* @return
*/
public static TolerantStrategy getInstance(String key){
if(key != null){
return SpiLoader.getInstance(TolerantStrategy.class,key);
}else{
return DEFAULT_RETRY_STRATEGY;
}
}
}
2.4 修改SPI配置文件
com.todaysaturday.fault.retry.TolerantStrategy
failBack=com.todaysaturday.fault.tolerant.FailBackTolerantStrategy
failFast=com.todaysaturday.fault.tolerant.FailFastTolerantStrategy
failSafe=com.todaysaturday.fault.tolerant.FailSafeTolerantStrategy
failOver=com.todaysaturday.fault.tolerant.FailOverTolerantStrategy
2.5 修改RpcConfig
/**
* 容错策略
*/
private String tolerantStrategy = TolerantStrategyKeys.FAIL_FAST;
2.6 修改服务代理对象
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
// 负载均衡
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
// 将调用方法名(请求路径)作为负载均衡参数
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("methodName",rpcRequest.getMethodName());
ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList);
// 不取默认的
// ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// rpc 请求 使用重试机制
RpcResponse rpcResponse = null;
try {
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);
} catch (Exception e) {
// 使用容错机制
TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
rpcResponse = tolerantStrategy.doTolerant(null,e);
}
// 发出TCP请求
// RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}