6.注册中心优化

注册中心优化

1. 心跳检测和续期机制

  • 实‍现心跳检测一般需要⁠ 2 个关键:定时、网络请求

    但是使用 Etcd 实现心跳检测会更简单一些,因为 Etcd 自带了 key 过期机制,我们不妨换个思路:给节点注册信息一个 “生命倒计时”,让节点定期 续期,重置 自己的 倒计时。如果节点已宕机,一直不续期,Etcd 就会对 key 进行过期删除

1.1 提供心跳检测方法

/**
 * 心跳检测(服务端)
 */
void heartBeat();

1.2 修改EtcdRegistry

1.2.1 添加本地缓存

/**
 * 本机注册的节点Key集合(用于维护续期)
 */
private final Set<String> localRegisterNodeKeySet = new HashSet<>();

1.2.2 修改注册方法

@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {

    // 创建Lease 和 KV 客户端
    Lease leaseClient = client.getLeaseClient();

    // 创建 30秒 的租约
    long leaseId = leaseClient.grant(30).get().getID();

    // 设置存储的键值对
    // 根路径拼接serviceKey
    String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();

    ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
    ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

    // 将键值对与租约关联起来,并设置过期时间
    PutOption putOption = PutOption.builder()
            .withLeaseId(leaseId)
            .build();
    kvClient.put(key,value,putOption).get();

	// 添加节点信息到本地缓存
    localRegisterNodeKeySet.add(registerKey);
}

1.2.2 修改注销方法

@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {

    String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
    kvClient.delete(ByteSequence.from(registerKey,StandardCharsets.UTF_8));

    // 移除本地缓存中的节点信息
    localRegisterNodeKeySet.remove(registerKey);
}

1.2.3 实现heartBeat方法

@Override
public void heartBeat() {
    // 10秒续签一次
    CronUtil.schedule("*/10 * * * * *", new Task() {
        @Override
        public void execute() {
            // 遍历本节点所有的key
            for (String key : localRegisterNodeKeySet) {


                try {
                    // 获取节点值来进行判断
                    List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
                            .get()
                            .getKvs();

                    // 该节点已过期(需要重启节点才能重新注册)
                    if(CollUtil.isEmpty(keyValues)){
                        continue;
                    }

                    // 节点未过期,重新注册(相当于续签)
                    KeyValue keyValue = keyValues.get(0);
                    String value = keyValue.getValue().toString();
                    ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);

                    // 重新注册节点
                    register(serviceMetaInfo);
                } catch (Exception e) {
                    throw new RuntimeException(key + "续签失败", e);
                }
            }
        }
    });

    // 支持秒级别的定时任务
    CronUtil.setMatchSecond(true);
    CronUtil.start();
}

1.2.4 修改init方法

@Override
public void init(RegistryConfig registryConfig) {
    client = Client.builder()
            .endpoints(registryConfig.getAddress())
            .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
            .build();

    kvClient = client.getKVClient();

    // 添加心跳检测
    heartBeat();
}

1.2.5 测试代码

@Test
public void heartBeat() throws Exception {
    // init 方法中已经执行心跳检测了
    register();
    // 阻塞 1 分钟
    Thread.sleep(60 * 1000L);
}

2. 服务节点下线机制

服务节点下线又分为:

  • 主动下线:服务提供者项目正常退出时,主动从注册中心移除注册信息。
  • 被动下线:服务提供者项目异常推出时,利用 Etcd 的 key 过期机制自动移除。

2.1 修改EtcdRegistry代码完善下线节点

@Override
public void destroy() {
    System.out.println("当前节点下线");

    // 下线节点
    // 遍历本节点所有的key
    for (String key : localRegisterNodeKeySet) {
        try {
            kvClient.delete(ByteSequence.from(key,StandardCharsets.UTF_8)).get();
        } catch (Exception e) {
            throw new RuntimeException(key + "节点下线失败");
        }
    }

    // 释放资源
    if(kvClient != null){
        kvClient.close();
    }

    if(client != null){
        client.close();
    }
}

2.2 修改RpcApplication的init方法

/**
 * 框架初始化,支持传入自定义配置
 * @param newRpcConfig
 */
public static void init(RpcConfig newRpcConfig){
    rpcConfig = newRpcConfig;
    log.info("rpc init, config = {}", newRpcConfig.toString());
    // 注册中心初始化
    RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
    Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
    registry.init(registryConfig);
    log.info("registry init,config = {}",registryConfig);

    // 创建并注册 Shutdown Hook,JVM 退出时执行操作
    Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));
}

3. 消费端服务缓存

3.1 增加注册中心服务本地缓存


/**
 * 注册中心服务本地缓存
 */
public class RegistryServiceCache {

    /**
     * 服务缓存
     */
    List<ServiceMetaInfo> serviceCache;

    /**
     * 写缓存
     */
    void writeCache(List<ServiceMetaInfo> newServiceCache){
        this.serviceCache = newServiceCache;
    }

    /**
     * 读缓存
     */
    List<ServiceMetaInfo> readCache(){
        return this.serviceCache;
    }

    /**
     * 清空缓存
     */
    
    void clearCache(){
        this.serviceCache = null;
    }
    
}

3.2 使用本地缓存

3.2.1 新增注册中心服务缓存

/**
 * 注册中心服务缓存
 */
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();

3.2.2 修改服务发现

@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
    // 优先从注册中心缓存中获取服务
    List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
    if(CollUtil.isNotEmpty(cachedServiceMetaInfoList)){
        return cachedServiceMetaInfoList;
    }

    // 前缀搜索,结尾一定要加 '/'
    String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

    try {
        // 前缀查询
        GetOption getOption = GetOption.builder()
                .isPrefix(true)
                .build();

        List<KeyValue> keyValues = kvClient.get(
                        ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption)
                .get()
                .getKvs();

        // 解析服务信息
        List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream()
                .map(keyValue -> {
                    // 第一次访问该服务时才进行前缀监听
                    if (!registryServiceCache.contains(serviceKey)) {
                        // 监听key 的变化
                        watch(serviceKey);
                    }

                    String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                    return JSONUtil.toBean(value, ServiceMetaInfo.class);
                })
                .collect(Collectors.toList());

        // 写入注册中心服务缓存
        registryServiceCache.writeCache(serviceMetaInfoList);

        return serviceMetaInfoList;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

3.2.3 注册中心服务新增contains方法

/**
 * 校验缓存中是否包含重复的key
 */
boolean contains(String key){
    if(this.serviceCache.contains(key)){
        return true;
    }
    return false;
}

3.3 服务缓存更新-监听机制

3.3.1 注册中心新增监听方法

/**
 * 监听(消费端)
 *
 * @param serviceNodeKey
 */
void watch(String serviceNodeKey);

3.3.2 创建监听集合

3.3.3 实现监听方法

@Override
public void watch(String serviceNodeKey) {
    Watch watchClient = client.getWatchClient();
    // 之前未被监听,开启监听
    boolean newWatch = watchingKeySet.add(serviceNodeKey);
    if(newWatch){
        watchClient.watch(ByteSequence.from(serviceNodeKey,StandardCharsets.UTF_8),watchResponse -> {
            for (WatchEvent event : watchResponse.getEvents()) {
                switch (event.getEventType()){
                    // key 删除时触发
                    case DELETE:
                        // 清除注册服务缓存
                        registryServiceCache.clearCache();
                        break;
                    case PUT:
                    default:
                        break;
                }
            } 
        });
    }
}

4. ZooKeeper 注册中心实现

4.1 引入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>5.6.0</version>
</dependency>

4.2 ZooKeeperRegistry实现

package com.yupi.yurpc.registry;

import cn.hutool.core.collection.ConcurrentHashSet;
import com.yupi.yurpc.config.RegistryConfig;
import com.yupi.yurpc.model.ServiceMetaInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * zookeeper 注册中心
 * 操作文档:<a href="https://curator.apache.org/docs/getting-started">Apache Curator</a>
 * 代码示例:<a href="https://github.com/apache/curator/blob/master/curator-examples/src/main/java/discovery/DiscoveryExample.java">DiscoveryExample.java</a>
 * 监听 key 示例:<a href="https://github.com/apache/curator/blob/master/curator-examples/src/main/java/cache/CuratorCacheExample.java">CuratorCacheExample.java</a>
 *
 * @author <a href="https://github.com/liyupi">coder_yupi</a>
 * @from <a href="https://yupi.icu">编程导航学习圈</a>
 * @learn <a href="https://codefather.cn">yupi 的编程宝典</a>
 */
@Slf4j
public class ZooKeeperRegistry implements Registry {

    private CuratorFramework client;

    private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;

    /**
     * 本机注册的节点 key 集合(用于维护续期)
     */
    private final Set<String> localRegisterNodeKeySet = new HashSet<>();

    /**
     * 注册中心服务缓存
     */
    private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();

    /**
     * 正在监听的 key 集合
     */
    private final Set<String> watchingKeySet = new ConcurrentHashSet<>();

    /**
     * 根节点
     */
    private static final String ZK_ROOT_PATH = "/rpc/zk";


    @Override
    public void init(RegistryConfig registryConfig) {
        // 构建 client 实例
        client = CuratorFrameworkFactory
                .builder()
                .connectString(registryConfig.getAddress())
                .retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3))
                .build();

        // 构建 serviceDiscovery 实例
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class)
                .client(client)
                .basePath(ZK_ROOT_PATH)
                .serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class))
                .build();

        try {
            // 启动 client 和 serviceDiscovery
            client.start();
            serviceDiscovery.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        // 注册到 zk 里
        serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));

        // 添加节点信息到本地缓存
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.add(registerKey);
    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        try {
            serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        // 从本地缓存移除
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.remove(registerKey);
    }

    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 优先从缓存获取服务
        List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
        if (cachedServiceMetaInfoList != null) {
            return cachedServiceMetaInfoList;
        }

        try {
            // 查询服务信息
            Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceKey);

            // 解析服务信息
            List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream()
                    .map(ServiceInstance::getPayload)
                    .collect(Collectors.toList());

            // 写入服务缓存
            registryServiceCache.writeCache(serviceMetaInfoList);
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new RuntimeException("获取服务列表失败", e);
        }
    }

    @Override
    public void heartBeat() {
        // 不需要心跳机制,建立了临时节点,如果服务器故障,则临时节点直接丢失
    }

    /**
     * 监听(消费端)
     *
     * @param serviceNodeKey 服务节点 key
     */
    @Override
    public void watch(String serviceNodeKey) {
        String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey;
        boolean newWatch = watchingKeySet.add(watchKey);
        if (newWatch) {
            CuratorCache curatorCache = CuratorCache.build(client, watchKey);
            curatorCache.start();
            curatorCache.listenable().addListener(
                    CuratorCacheListener
                            .builder()
                            .forDeletes(childData -> registryServiceCache.clearCache())
                            .forChanges(((oldNode, node) -> registryServiceCache.clearCache()))
                            .build()
            );
        }
    }

    @Override
    public void destroy() {
        log.info("当前节点下线");
        // 下线节点(这一步可以不做,因为都是临时节点,服务下线,自然就被删掉了)
        for (String key : localRegisterNodeKeySet) {
            try {
                client.delete().guaranteed().forPath(key);
            } catch (Exception e) {
                throw new RuntimeException(key + "节点下线失败");
            }
        }

        // 释放资源
        if (client != null) {
            client.close();
        }
    }

    private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) {
        String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort();
        try {
            return ServiceInstance
                    .<ServiceMetaInfo>builder()
                    .id(serviceAddress)
                    .name(serviceMetaInfo.getServiceKey())
                    .address(serviceAddress)
                    .payload(serviceMetaInfo)
                    .build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

4.3添加spi的支持

zookeeper=com.todaysaturday.registry.ZooKeeperRegistry

4.4 修改Properties文件

rpc.registryConfig.registry=zookeeper
rpc.registryConfig.address=localhost:2181

4.5 修改CGlibServiceProxy

/**
 * @param o
 * @param method
 * @param objects
 * @param methodProxy
 * @return
 */
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) {

    // 本地处理 Object 类的方法,避免 RPC 分派
    try {
        if (method.getDeclaringClass() == Object.class) {
            switch (method.getName()) {
                case "toString":
                    // 为代理对象本身提供一个有意义的字符串表示
                    return o.getClass().getName() + "@" +
                            Integer.toHexString(System.identityHashCode(o)) +
                            ", with InvocationHandler " + this;
                case "hashCode":
                    // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
                    return System.identityHashCode(o);
                case "equals":
                    // 委托给处理器的 equals 或提供代理特定的相等性判断
                    return o == objects[0]; // 代理对象的默认引用相等性
                default:
                    // 对于其他 Object 方法,可以考虑在处理器上调用它们
                    return method.invoke(this, objects);
            }
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());


    String serviceName = method.getDeclaringClass().getName();

    // 2.发出请求(全部使用动态的参数)
    RpcRequest rpcRequest = RpcRequest.builder()
            .serviceName(serviceName)
            .methodName(method.getName())
            .parameterTypes(method.getParameterTypes())
            .args(objects)
            .build();

    try {
        // 3.序列化对象
        byte[] bodyBytes = serializer.serialize(rpcRequest);


        // start
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        // 实例化注册中心对象
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
        // 服务发现
        List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

        if(CollUtil.isEmpty(serviceMetaInfoList)){
            throw new RuntimeException("暂无服务地址");
        }

        ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

        // 4.发出HTTP请求
        try(HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                .body(bodyBytes)
                .execute()){
            byte[] result = httpResponse.bodyBytes();
            // 5.反序列化对象
            RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
            return (User) rpcResponse.getData();
        }
        // end
    } catch (IOException e) {
        e.printStackTrace();
    }

    return null;
}
}

4.6 使用ZooKeeper 后出现报错

暂无服务地址的情况,在代理类中的反射方法中添加如下代码

// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
    switch (method.getName()) {
        case "toString":
            // 为代理对象本身提供一个有意义的字符串表示
            return proxy.getClass().getName() + "@" +
                   Integer.toHexString(System.identityHashCode(proxy)) +
                   ", with InvocationHandler " + this;
        case "hashCode":
            // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
            return System.identityHashCode(proxy);
        case "equals":
            // 委托给处理器的 equals 或提供代理特定的相等性判断
            return proxy == args[0]; // 代理对象的默认引用相等性
        default:
            // 对于其他 Object 方法,可以考虑在处理器上调用它们
            return method.invoke(this, args);
    }
}