5.注册中心基本实现

注册中心基本实现

1.注册中心基本实现

1.1 理解 Etcd

  • 定义:Etcd 是一个开源的、分布式的键值存储系统,常用于服务发现、配置管理和分布式锁。,它是 Go 语言实现的,性能高,并且是云原生领域(如 Kubernetes)的核心组件。
  • 数据模型:层次化的键值对,类似于文件系统路径,支持按前缀查询,Key 是唯一标识,Value 可以是任意数据(通常是字符串,如 JSON)。
  • 核心特性
    • **Lease (租约)**:用于给键值对设置 TTL (Time-To-Live,存活时间),租约到期后相关键值对会自动删除,这对于实现服务提供者的自动下线非常有用。
    • **Watch (监听)**:可以监视特定键或前缀下键的变化,当数据发生变化时,客户端会收到通知,这对于消费者动态更新服务列表很有用。
    • 强一致性:使用 Raft 一致性算法保证数据在集群中的一致性和可靠性。
  • 端口:默认服务端口 2379 (供客户端API交互),集群间通信端口 2380

注:服务提供者(Provider)启动后,将自己能提供的服务信息(如服务名、版本、IP、端口)注册到 Etcd;服务消费者(Consumer)在需要调用某个服务时,从 Etcd 查询该服务的可用实例地址,然后选择一个进行远程调用。

image-20250525221502829

1.2 相关文档

1.3 Etcd下载

1.3.1 验证是否下载成功

image-20250525170158216

1.3.2 配置系统环境变量

设置 -> 编辑系统环境变量 -> 环境变量 -> 系统变量 -> Path(编辑) -> 新建 -> 复制安装路径(D:\Etcd\etcd-v3.5.20-windows-amd64)-> 确定

1.4 EtcdKeeper下载

下载后打开文件夹,双击exe文件,会显示本地的打开地址,复制,然后在浏览器输入框中输入http://localhost:8080/etcdkeeper/

image-20250525171347790

image-20250525171433007

1.5 处理Etcd

1.5.1 Etcd 相关操作

  1. kvClient:用于对 etcd 中的键值对进行操作。通过 kvClient 可以进行设置值、获取值、删除值、列出目录等操作。
  2. leaseClient:用于管理 etcd 的租约机制。租约是 etcd 中的一种时间片,用于为键值对分配生存时间,并在租约到期时自动删除相关的键值对。通过 leaseClient 可以创建、获取、续约和撤销租约。
  3. watchClient:用于监视 etcd 中键的变化,并在键的值发生变化时接收通知。
  4. clusterClient:用于与 etcd 集群进行交互,包括添加、移除、列出成员、设置选举、获取集群的健康状态、获取成员列表信息等操作。
  5. authClient:用于管理 etcd 的身份验证和授权。通过 authClient 可以添加、删除、列出用户、角色等身份信息,以及授予或撤销用户或角色的权限。
  6. maintenanceClient:用于执行 etcd 的维护操作,如健康检查、数据库备份、成员维护、数据库快照、数据库压缩等。
  7. lockClient:用于实现分布式锁功能,通过 lockClient 可以在 etcd 上创建、获取、释放锁,能够轻松实现并发控制。
  8. electionClient:用于实现分布式选举功能,可以在 etcd 上创建选举、提交选票、监视选举结果等。

1.5.2 存储结构设计

1. 层级结构‍

  • 将服务理解为文件夹、将服务‍对应的多个节点理解为文件夹⁠下的文件,那么可以通过服务؜名称,用前缀查询的方式查询‌到某个服务的所有节点,键名的规则可以是 /业务前缀/服务名/服务节点地址
  • 键(Key)的设计
    • 每个服务提供者节点的键(Key)都遵循一种类似路径的模式。
    • 键名的规则是:``
    • **/业务前缀/ (例如 /rpc/)**:这是一个顶层命名空间,用于在Etcd中组织所有与RPC相关的数据,避免当Etcd也用于其他目的时发生冲突。
    • **服务名 (例如 com.today.example.common.service.UserService)**:服务的唯一名称。
    • **服务节点地址 (例如 127.0.0.1:8080)**:服务提供者运行实例的具体网络地址。
  • 值(Value)的设计
    • 与每个服务节点键关联的值(Value)通常是该特定节点的 ServiceMetaInfo 对象的序列化表示(例如,一个JSON字符串)。 这个JSON字符串会包含服务名称、版本、主机、端口以及任何其他元数据等详细信息。
  • 优点
    • 细粒度更新:每个服务节点都是一个独立的键。添加、移除节点或更新其元数据只会影响该特定键。
    • 发挥Etcd/ZooKeeper的优势:该模型与Etcd和ZooKeeper的设计理念非常契合,特别是它们对层级键和高效前缀查询的支持。
    • **精细化监听 (Watch)**:消费者可以监听特定的前缀以获取变更,如果只有某些服务或节点发生变化,可以实现更有针对性的更新。

2. 列表结构

  • 将所有的服务节‍点以列表的形式整体⁠作为 value
  • 键(Key)的设计
    • 键通常代表服务本身,示例:`/业务前缀/服务名/服务节点地址
  • 值(Value)的设计
    • 与此键关联的值将是一个包含所有服务提供者实例的列表
    • 这个列表可以是一个JSON数组,其中数组的每个元素都是一个代表特定节点 ServiceMetaInfo 信息的对象
  • 优点
    • 简单的Get操作:服务发现对于特定服务仅涉及一次 get 操作,可能非常快速。
    • 适用于某些键值存储:这种模型很适合那些原生支持列表数据结构(如Redis)或者前缀查询效率较低或不直接支持复杂层级结构的键值存储系统。
  • 缺点
    • 并发和更新问题:当添加、删除或更新单个节点时,需要读取整个列表(键的值),修改它,然后再写回。如果多个提供者同时尝试更新列表,这可能导致竞争条件,需要乐观锁或其他并发控制机制。
    • 值的大小:如果一个服务拥有非常大量的实例,那么这个键的值(所有节点的列表)可能会变得非常大,这可能影响性能,并可能触及Etcd的值大小限制(尽管Etcd的默认限制通常对这种情况来说足够大)。
    • 监听不够精细:监听变更通常意味着监听整个列表。即使只有一个节点发生变化,整个列表也被视为已更改,这可能导致消费者在监听键时产生更多的网络流量。

2. 具体实现

2.1注册中心开发

2.1.1 注册信息定义

/**
 * 服务元信息(注册信息)
 */
public class ServiceMetaInfo {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 服务版本号
     */
    private String serviceVersion = "1.0";

    /**
     * 服务域名
     */
    private String serviceHost;

    /**
     * 服务端口号
     */
    private Integer servicePort;

    /**
     * 服务分组(暂未实现)
     */
    private String serviceGroup = "default";


    /**
     * 获取服务键名
     *
     * @return
     */
    public String getServiceKey() {
        // 后续可扩展服务分组
        // return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
        return String.format("%s:%s", serviceName, serviceVersion);
    }

    /**
     * 获取服务注册节点键名
     *
     * @return
     */
    public String getServiceNodeKey() {
        return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
    }
    
    /**
     * 获取完整服务地址
     *
     * @return
     */
    public String getServiceAddress() {
        if (!StrUtil.contains(serviceHost, "http")) {
            return String.format("http://%s:%s", serviceHost, servicePort);
        }
        return String.format("%s:%s", serviceHost, servicePort);
    }
}

2.1.2 新增RPC常量

/**
 * 默认服务版本
 */
public static String DEFAULT_SERVICE_VERSION = "1.0";

2.1.3 RPCRequest 新增变量

/**
 * 服务版本
 */
private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;

2.2 注册中心配置

2.2.1 RegistryConfig 注册中心配置

/**
 * RPC 框架注册中心配置
 */
@Data
public class RegistryConfig {
    /**
     * 注册中心类别
     */
    private String registry = "etcd";

    /**
     * 注册中心地址
     */
    private String address = "http://localhost:2380";

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 超时时间(单位毫秒)
     */
    private Long timeout = 10000L;
}

2.2.2 RpcConfig 配置

/**
 * 注册中心配置
 */
private RegistryConfig registryConfig = new RegistryConfig();

2.2.3 Registry 注册中心接口

/**
 * 注册中心
 */
public interface Registry {
    /**
     * 初始化
     *
     * @param registryConfig
     */
    void init(RegistryConfig registryConfig);

    /**
     * 注册服务(服务端)
     *
     * @param serviceMetaInfo
     */
    void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

    /**
     * 注销服务(服务端)
     *
     * @param serviceMetaInfo
     */
    void unRegister(ServiceMetaInfo serviceMetaInfo);

    /**
     * 服务发现(获取某服务的所有节点,消费端)
     *
     * @param serviceKey 服务键名
     * @return
     */
    List<ServiceMetaInfo> serviceDiscovery(String serviceKey);

    /**
     * 服务销毁
     */
    void destroy();
}

2.2.4 Etcd 注册中心实现

/**
 * Etcd 注册中心
 */
public class EtcdRegistry implements Registry{

    /**
     * etcd 客户端对象
     */
    private Client client;

    /**
     * 用于对 etcd 中的键值对进行操作
     */
    private KV kvClient;

    /**
     * 根节点,区分不同的项目
     */
    private static final String ETCD_ROOT_PATH = "/rpc/";
    @Override
    public void init(RegistryConfig registryConfig) {
        client = Client.builder()
                .endpoints(registryConfig.getAddress())
                .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
                .build();

        kvClient = client.getKVClient();
    }

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {

        // 创建Lease 和 KV 客户端
        Lease leaseClient = client.getLeaseClient();

        // 创建 30秒 的租约
        long leaseId = leaseClient.grant(30).get().getID();

        // 设置存储的键值对
        // 根路径拼接serviceKey
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();

        ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

        // 将键值对与租约关联起来,并设置过期时间
        PutOption putOption = PutOption.builder()
                .withLeaseId(leaseId)
                .build();
        kvClient.put(key,value,putOption).get();
    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        kvClient.delete(ByteSequence.from(
                ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(),
                StandardCharsets.UTF_8));
    }

    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 前缀搜索,结尾一定要加 '/'
        String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

        try {
            // 前缀查询
            GetOption getOption = GetOption.builder()
                    .isPrefix(true)
                    .build();

            List<KeyValue> keyValues = kvClient.get(
                            ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption)
                    .get()
                    .getKvs();

            return keyValues.stream()
                    .map(keyValue -> {
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        return JSONUtil.toBean(value,ServiceMetaInfo.class);
                    })
                    .collect(Collectors.toList());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void destroy() {
        System.out.println("当前节点下线");
        // 释放资源
        if(kvClient != null){
            kvClient.close();
        }

        if(client != null){
            client.close();
        }
    }
}

2.3 支持配置和扩展注册中心

2.3.1 注册中心键名常量

/**
 * 注册中心键名常量
 */
public interface RegistryKeys {
    String ETCD = "etcd";
    String ZOOKEEPER = "zookeeper";
}

2.3.2 注册中心工厂

/**
 * 注册中心工厂 (用于获取注册中心对象)
 */
public class RegistryFactory {
    /**
     * 1.添加 volatile 静态私有实例变量
     */
    private static volatile Registry registry;

    /**
     * 2.私有构造函数
     */
    private RegistryFactory() {}


    /**
     * 3.公共静态获取实例的方法
     * @param key
     * @return
     */
    public static Registry getInstance(String key) {
        if(registry == null){ // 第一次检查
            synchronized (RegistryFactory.class){
                if(registry == null){            // 第二次检查
                    SpiLoader.load(Registry.class);
                    if(!StrUtil.isEmpty(key)){
                        registry = SpiLoader.getInstance(Registry.class,key);
                    }else{
                        registry = new EtcdRegistry();
                    }
                }
            }
        }
        return SpiLoader.getInstance(Registry.class, key);
    }
}

2.3.3 修改RpcApplication

/**
 * 框架初始化,支持传入自定义配置
 *
 * @param newRpcConfig
 */
public static void init(RpcConfig newRpcConfig) {
    rpcConfig = newRpcConfig;
    log.info("rpc init, config = {}", newRpcConfig.toString());
    // 注册中心初始化
    RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
    Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
    registry.init(registryConfig);
    log.info("registry init, config = {}", registryConfig);
}

3.完整调用流程

3.1 修改服务代理JdkServiceProxy

String serviceName = method.getDeclaringClass().getName();

// start
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList =
        registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

if(CollUtil.isEmpty(serviceMetaInfoList)){
    throw new RuntimeException("暂无服务地址");
}

ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

// 4.发出HTTP请求
try(HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
        .body(bodyBytes)
        .execute()){
    byte[] result = httpResponse.bodyBytes();
    // 5.反序列化对象
    RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
    return (User) rpcResponse.getData();
}
// end

3.2 修改EasyProviderExample

// RPC 框架初始化
RpcApplication.init();
// 注册服务
String serviceName = UserService.class.getName();
LocalRegistry.register(serviceName,UserServiceImpl.class);

// 注册服务到注册中心
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
serviceMetaInfo.setServicePort(rpcConfig.getServerPort());

try {
    registry.register(serviceMetaInfo);
} catch (Exception e) {
    throw new RuntimeException(e);
}

// 启动 web 服务
VertxHttpServer httpServer = new VertxHttpServer();
httpServer.doStart(RpcApplication.getRpcConfig().getServerPort());

3.3 system新增文件

com.todaysaturday.registry.registry

etcd=com.todaysaturday.registry.EtcdRegistry

3.4 注册中心测试

/**
 * 注册中心测试
 */
public class RegistryTest {

    final Registry registry = new EtcdRegistry();

    @Before
    public void init() {
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("http://localhost:2379");
        registry.init(registryConfig);
    }

    @Test
    public void register() throws Exception {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1235);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("2.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
    }

    @Test
    public void unRegister() {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.unRegister(serviceMetaInfo);
    }

    @Test
    public void serviceDiscovery() {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        String serviceKey = serviceMetaInfo.getServiceKey();
        List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceKey);
        Assert.assertNotNull(serviceMetaInfoList);
    }
}