10.容错机制

容错机制

1. 容错机制说明

  • 容错是指系‍统在出现异常情况时‍,可以通过一定的策⁠略保证系统仍然稳定؜运行,从而提高系统‌的可靠性和健壮性。

1.1 容错策略

1.1.1 Fai‍l-Over 故障‍转移

  • 一次调用失败⁠后,切换一个其他节؜点再次进行调用,也算是‌一种重试。

1.1.2 Fail-‍Back 失败自动恢复

  • 系统‍的某个功能出现调用失败或错误⁠时,通过其他的方法,恢复该功能؜的正常。可以理解为降级,比‌如重试、调用其他服务等。

1.1.3 Fai‍l-Safe 静默处‍理

  • 系统出现部分非重⁠要功能的异常时,直接؜忽略掉,不做任何处理‌,就像错误没有发生过一样。

1.1.4 Fai‍l-Fast 快速‍失败

  • 系统出现调用⁠错误时,立刻报错,؜交给外层调用方处理‌。

1.2 容错实现方式

1.2.1 重试

  • 重试本质上也是一种‍容错的降级策略,系⁠统错误后再试一次。

1.2.2 限流

  • 当系统压力过大、已‍经出现部分错误时,⁠通过限制执行操作(؜接受请求)的频率或‌数量,对系统进行保护。

1.2.3 降级

  • 系统出现‍错误后,改为执行其他更稳定可用的操‍作。也可以叫做 “兜底” 或 “有损服⁠务”,这种方式的本质是:即使牺牲一定؜的服务质量,也要保证系统的部分功能可‌用,保证基本的功能需求得到满足。

1.2.4 熔断

  • 系统‍出现故障或异常时,暂时中断对‍该服务的请求,而是执行其他操⁠作,以避免连锁故障     ؜              ‌

             

1.2.5 超时控‍制

  • 如果请求或操作‍长时间没处理完成,⁠就进行中断,防止阻؜塞和资源占用。

1.3 容错方案设计

1.3.1 先容错再重试

  • 当系统发生‍异常时,首先会触发‍容错机制,比如记录⁠日志、进行告警等,؜然后可以选择是否‌进行重试。

1.3.2 先重试再容错

  • 在发生错误后,首先尝试重试操作,⁠如果重试多次仍然失败,则触发容错؜机制,比如记录日志、进行告警等 ‌、

1.3.3 举例

  1. 系统调用服务 A 出现网络错误,使用容错策略 - 重试。
  2. 重试 3 次失败后,使用其他容错策略 - 降级。
  3. 系统改为调用不依赖网络的服务 B,完成操作。

2.代码实现

2.1 多种容错策略实现

2.1.1 容错策略接口

/**
 * 容错策略
 */
public interface TolerantStrategy {

    /**
     * 容错
     * @param context
     * @param e
     * @return
     */
    RpcResponse doTolerant(Map<String,Object> context,Exception e);
}

2.1.2 Fail-Fast 快速失败

  • 遇到异常,将异常再次抛出,交给外层处理
/**
 * 快速失败-容错策略(立即通知外层调用方)
 */
public class FailFastTolerantStrategy implements TolerantStrategy{

    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        throw new RuntimeException("服务报错",e);
    }
}

2.1.3 Fail-Safe 静默处理

  • 遇到异常,记录日志,正常返回响应对象
/**
 * 静默处理-容错策略
 */
@Slf4j
public class FailSafeTolerantStrategy implements TolerantStrategy{

    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        log.info("静默处理异常",e);
        return new RpcResponse();
    }
}

2.1.4 Fail-Back 故障恢复

/**
 * 故障恢复-容错策略(降级到其他服务)
 */
public class FailBackTolerantStrategy implements TolerantStrategy{

    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        
        return null;
    }
}

2.1.5 Fail-Over 故障转移

/**
 * 故障转移-容错策略(转移到其他服务节点)
 */
public class FailOverTolerantStrategy implements TolerantStrategy{

    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        return null;
    }
}

2.2 容错策略键名常量

/**
 * 容错策略键名常量
 */
public interface TolerantStrategyKeys {
    /**
     * 故障恢复
     */
    String FAIL_BACK = "failBack";

    /**
     * 快速失败
     */
    String FAIL_FAST = "failFast";

    /**
     * 故障转移
     */
    String FAIL_Over = "failOver";

    /**
     * 静默处理
     */
    String FAIL_SAFE = "failSafe";

}

2.3 容错策略工厂

/**
 * 容错策略工厂(工厂模式,用于获取容错策略对象)
 */
public class TolerantStrategyFactory {
    static{
        SpiLoader.load(TolerantStrategy.class);
    }

    /**
     * 默认容错策略
     */
    private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();


    /**
     * 获取实例
     * @param key
     * @return
     */
    public static TolerantStrategy getInstance(String key){
        if(key != null){
            return SpiLoader.getInstance(TolerantStrategy.class,key);
        }else{
            return DEFAULT_RETRY_STRATEGY;
        }
    }
}

2.4 修改SPI配置文件

com.todaysaturday.fault.retry.TolerantStrategy

failBack=com.todaysaturday.fault.tolerant.FailBackTolerantStrategy
failFast=com.todaysaturday.fault.tolerant.FailFastTolerantStrategy
failSafe=com.todaysaturday.fault.tolerant.FailSafeTolerantStrategy
failOver=com.todaysaturday.fault.tolerant.FailOverTolerantStrategy

2.5 修改RpcConfig

/**
 * 容错策略
 */
private String tolerantStrategy = TolerantStrategyKeys.FAIL_FAST;

2.6 修改服务代理对象

/**
 * JDK 动态代理
 */
public class JdkServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 本地处理 Object 类的方法,避免 RPC 分派
        if (method.getDeclaringClass() == Object.class) {
            switch (method.getName()) {
                case "toString":
                    // 为代理对象本身提供一个有意义的字符串表示
                    return proxy.getClass().getName() + "@" +
                            Integer.toHexString(System.identityHashCode(proxy)) +
                            ", with InvocationHandler " + this;
                case "hashCode":
                    // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
                    return System.identityHashCode(proxy);
                case "equals":
                    // 委托给处理器的 equals 或提供代理特定的相等性判断
                    return proxy == args[0]; // 代理对象的默认引用相等性
                default:
                    // 对于其他 Object 方法,可以考虑在处理器上调用它们
                    return method.invoke(this, args);
            }
        }
        // 1.指定序列化器
//        JdkSerializer serializer = new JdkSerializer();
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());


        String serviceName = method.getDeclaringClass().getName();

        // 2.发出请求(全部使用动态的参数)
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(serviceName)
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        try {
            // 从注册中心获取服务提供者请求地址
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            // 实例化注册中心对象
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            // 服务发现
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

            if(CollUtil.isEmpty(serviceMetaInfoList)){
                throw new RuntimeException("暂无服务地址");
            }

            // 负载均衡
            LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
            // 将调用方法名(请求路径)作为负载均衡参数
            Map<String, Object> requestParams = new HashMap<>();
            requestParams.put("methodName",rpcRequest.getMethodName());
            ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList);

            // 不取默认的
//            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // rpc 请求 使用重试机制
            RpcResponse rpcResponse = null;
            try {
                RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
                rpcResponse = retryStrategy.doRetry(() ->
                    VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
                );
            } catch (Exception e) {
                // 使用容错机制
                TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
                rpcResponse = tolerantStrategy.doTolerant(null,e);
            }
            // 发出TCP请求
//            RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
            return rpcResponse.getData();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }
}