注册中心基本实现
1.注册中心基本实现
1.1 理解 Etcd
- 定义:Etcd 是一个开源的、分布式的键值存储系统,常用于服务发现、配置管理和分布式锁。,它是 Go 语言实现的,性能高,并且是云原生领域(如 Kubernetes)的核心组件。
- 数据模型:层次化的键值对,类似于文件系统路径,支持按前缀查询,Key 是唯一标识,Value 可以是任意数据(通常是字符串,如 JSON)。
- 核心特性:
- **Lease (租约)**:用于给键值对设置 TTL (Time-To-Live,存活时间),租约到期后相关键值对会自动删除,这对于实现服务提供者的自动下线非常有用。
- **Watch (监听)**:可以监视特定键或前缀下键的变化,当数据发生变化时,客户端会收到通知,这对于消费者动态更新服务列表很有用。
- 强一致性:使用 Raft 一致性算法保证数据在集群中的一致性和可靠性。
- 端口:默认服务端口
2379(供客户端API交互),集群间通信端口2380。
注:服务提供者(Provider)启动后,将自己能提供的服务信息(如服务名、版本、IP、端口)注册到 Etcd;服务消费者(Consumer)在需要调用某个服务时,从 Etcd 查询该服务的可用实例地址,然后选择一个进行远程调用。

1.2 相关文档
- etcd地址:https://github.com/etcd-io/etcd
- etcd下载地址:https://github.com/etcd-io/etcd/releases
- etcd官方文档:https://etcd.io/docs/v3.4/dev-guide/interacting_v3/
- etcd官方文档中文版:https://doczhcn.gitbook.io/etcd/
- etcdkeeper:️https://github.com/evildecay/etcdkeeper/
1.3 Etcd下载
1.3.1 验证是否下载成功

1.3.2 配置系统环境变量
设置 -> 编辑系统环境变量 -> 环境变量 -> 系统变量 -> Path(编辑) -> 新建 -> 复制安装路径(D:\Etcd\etcd-v3.5.20-windows-amd64)-> 确定
1.4 EtcdKeeper下载
下载后打开文件夹,双击exe文件,会显示本地的打开地址,复制,然后在浏览器输入框中输入http://localhost:8080/etcdkeeper/


1.5 处理Etcd
1.5.1 Etcd 相关操作
- kvClient:用于对 etcd 中的键值对进行操作。通过 kvClient 可以进行设置值、获取值、删除值、列出目录等操作。
- leaseClient:用于管理 etcd 的租约机制。租约是 etcd 中的一种时间片,用于为键值对分配生存时间,并在租约到期时自动删除相关的键值对。通过 leaseClient 可以创建、获取、续约和撤销租约。
- watchClient:用于监视 etcd 中键的变化,并在键的值发生变化时接收通知。
- clusterClient:用于与 etcd 集群进行交互,包括添加、移除、列出成员、设置选举、获取集群的健康状态、获取成员列表信息等操作。
- authClient:用于管理 etcd 的身份验证和授权。通过 authClient 可以添加、删除、列出用户、角色等身份信息,以及授予或撤销用户或角色的权限。
- maintenanceClient:用于执行 etcd 的维护操作,如健康检查、数据库备份、成员维护、数据库快照、数据库压缩等。
- lockClient:用于实现分布式锁功能,通过 lockClient 可以在 etcd 上创建、获取、释放锁,能够轻松实现并发控制。
- electionClient:用于实现分布式选举功能,可以在 etcd 上创建选举、提交选票、监视选举结果等。
1.5.2 存储结构设计
1. 层级结构
- 将服务理解为文件夹、将服务对应的多个节点理解为文件夹下的文件,那么可以通过服务名称,用前缀查询的方式查询到某个服务的所有节点,键名的规则可以是
/业务前缀/服务名/服务节点地址 - 键(Key)的设计:
- 每个服务提供者节点的键(Key)都遵循一种类似路径的模式。
- 键名的规则是:``
- **
/业务前缀/(例如/rpc/)**:这是一个顶层命名空间,用于在Etcd中组织所有与RPC相关的数据,避免当Etcd也用于其他目的时发生冲突。 - **
服务名(例如com.today.example.common.service.UserService)**:服务的唯一名称。 - **
服务节点地址(例如127.0.0.1:8080)**:服务提供者运行实例的具体网络地址。
- 值(Value)的设计:
- 与每个服务节点键关联的值(Value)通常是该特定节点的
ServiceMetaInfo对象的序列化表示(例如,一个JSON字符串)。 这个JSON字符串会包含服务名称、版本、主机、端口以及任何其他元数据等详细信息。
- 与每个服务节点键关联的值(Value)通常是该特定节点的
- 优点:
- 细粒度更新:每个服务节点都是一个独立的键。添加、移除节点或更新其元数据只会影响该特定键。
- 发挥Etcd/ZooKeeper的优势:该模型与Etcd和ZooKeeper的设计理念非常契合,特别是它们对层级键和高效前缀查询的支持。
- **精细化监听 (Watch)**:消费者可以监听特定的前缀以获取变更,如果只有某些服务或节点发生变化,可以实现更有针对性的更新。
2. 列表结构
- 将所有的服务节点以列表的形式整体作为 value
- 键(Key)的设计:
- 键通常代表服务本身,示例:`/业务前缀/服务名/服务节点地址
- 值(Value)的设计:
- 与此键关联的值将是一个包含所有服务提供者实例的列表。
- 这个列表可以是一个JSON数组,其中数组的每个元素都是一个代表特定节点
ServiceMetaInfo信息的对象
- 优点:
- 简单的Get操作:服务发现对于特定服务仅涉及一次
get操作,可能非常快速。 - 适用于某些键值存储:这种模型很适合那些原生支持列表数据结构(如Redis)或者前缀查询效率较低或不直接支持复杂层级结构的键值存储系统。
- 简单的Get操作:服务发现对于特定服务仅涉及一次
- 缺点:
- 并发和更新问题:当添加、删除或更新单个节点时,需要读取整个列表(键的值),修改它,然后再写回。如果多个提供者同时尝试更新列表,这可能导致竞争条件,需要乐观锁或其他并发控制机制。
- 值的大小:如果一个服务拥有非常大量的实例,那么这个键的值(所有节点的列表)可能会变得非常大,这可能影响性能,并可能触及Etcd的值大小限制(尽管Etcd的默认限制通常对这种情况来说足够大)。
- 监听不够精细:监听变更通常意味着监听整个列表。即使只有一个节点发生变化,整个列表也被视为已更改,这可能导致消费者在监听键时产生更多的网络流量。
2. 具体实现
2.1注册中心开发
2.1.1 注册信息定义
/**
* 服务元信息(注册信息)
*/
public class ServiceMetaInfo {
/**
* 服务名称
*/
private String serviceName;
/**
* 服务版本号
*/
private String serviceVersion = "1.0";
/**
* 服务域名
*/
private String serviceHost;
/**
* 服务端口号
*/
private Integer servicePort;
/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";
/**
* 获取服务键名
*
* @return
*/
public String getServiceKey() {
// 后续可扩展服务分组
// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
return String.format("%s:%s", serviceName, serviceVersion);
}
/**
* 获取服务注册节点键名
*
* @return
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
}
/**
* 获取完整服务地址
*
* @return
*/
public String getServiceAddress() {
if (!StrUtil.contains(serviceHost, "http")) {
return String.format("http://%s:%s", serviceHost, servicePort);
}
return String.format("%s:%s", serviceHost, servicePort);
}
}
2.1.2 新增RPC常量
/**
* 默认服务版本
*/
public static String DEFAULT_SERVICE_VERSION = "1.0";
2.1.3 RPCRequest 新增变量
/**
* 服务版本
*/
private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;
2.2 注册中心配置
2.2.1 RegistryConfig 注册中心配置
/**
* RPC 框架注册中心配置
*/
@Data
public class RegistryConfig {
/**
* 注册中心类别
*/
private String registry = "etcd";
/**
* 注册中心地址
*/
private String address = "http://localhost:2380";
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 超时时间(单位毫秒)
*/
private Long timeout = 10000L;
}
2.2.2 RpcConfig 配置
/**
* 注册中心配置
*/
private RegistryConfig registryConfig = new RegistryConfig();
2.2.3 Registry 注册中心接口
/**
* 注册中心
*/
public interface Registry {
/**
* 初始化
*
* @param registryConfig
*/
void init(RegistryConfig registryConfig);
/**
* 注册服务(服务端)
*
* @param serviceMetaInfo
*/
void register(ServiceMetaInfo serviceMetaInfo) throws Exception;
/**
* 注销服务(服务端)
*
* @param serviceMetaInfo
*/
void unRegister(ServiceMetaInfo serviceMetaInfo);
/**
* 服务发现(获取某服务的所有节点,消费端)
*
* @param serviceKey 服务键名
* @return
*/
List<ServiceMetaInfo> serviceDiscovery(String serviceKey);
/**
* 服务销毁
*/
void destroy();
}
2.2.4 Etcd 注册中心实现
/**
* Etcd 注册中心
*/
public class EtcdRegistry implements Registry{
/**
* etcd 客户端对象
*/
private Client client;
/**
* 用于对 etcd 中的键值对进行操作
*/
private KV kvClient;
/**
* 根节点,区分不同的项目
*/
private static final String ETCD_ROOT_PATH = "/rpc/";
@Override
public void init(RegistryConfig registryConfig) {
client = Client.builder()
.endpoints(registryConfig.getAddress())
.connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
.build();
kvClient = client.getKVClient();
}
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();
// 创建 30秒 的租约
long leaseId = leaseClient.grant(30).get().getID();
// 设置存储的键值对
// 根路径拼接serviceKey
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder()
.withLeaseId(leaseId)
.build();
kvClient.put(key,value,putOption).get();
}
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
kvClient.delete(ByteSequence.from(
ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(),
StandardCharsets.UTF_8));
}
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
try {
// 前缀查询
GetOption getOption = GetOption.builder()
.isPrefix(true)
.build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption)
.get()
.getKvs();
return keyValues.stream()
.map(keyValue -> {
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value,ServiceMetaInfo.class);
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void destroy() {
System.out.println("当前节点下线");
// 释放资源
if(kvClient != null){
kvClient.close();
}
if(client != null){
client.close();
}
}
}
2.3 支持配置和扩展注册中心
2.3.1 注册中心键名常量
/**
* 注册中心键名常量
*/
public interface RegistryKeys {
String ETCD = "etcd";
String ZOOKEEPER = "zookeeper";
}
2.3.2 注册中心工厂
/**
* 注册中心工厂 (用于获取注册中心对象)
*/
public class RegistryFactory {
/**
* 1.添加 volatile 静态私有实例变量
*/
private static volatile Registry registry;
/**
* 2.私有构造函数
*/
private RegistryFactory() {}
/**
* 3.公共静态获取实例的方法
* @param key
* @return
*/
public static Registry getInstance(String key) {
if(registry == null){ // 第一次检查
synchronized (RegistryFactory.class){
if(registry == null){ // 第二次检查
SpiLoader.load(Registry.class);
if(!StrUtil.isEmpty(key)){
registry = SpiLoader.getInstance(Registry.class,key);
}else{
registry = new EtcdRegistry();
}
}
}
}
return SpiLoader.getInstance(Registry.class, key);
}
}
2.3.3 修改RpcApplication
/**
* 框架初始化,支持传入自定义配置
*
* @param newRpcConfig
*/
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info("rpc init, config = {}", newRpcConfig.toString());
// 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info("registry init, config = {}", registryConfig);
}
3.完整调用流程
3.1 修改服务代理JdkServiceProxy
String serviceName = method.getDeclaringClass().getName();
// start
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList =
registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 4.发出HTTP请求
try(HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
.body(bodyBytes)
.execute()){
byte[] result = httpResponse.bodyBytes();
// 5.反序列化对象
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (User) rpcResponse.getData();
}
// end
3.2 修改EasyProviderExample
// RPC 框架初始化
RpcApplication.init();
// 注册服务
String serviceName = UserService.class.getName();
LocalRegistry.register(serviceName,UserServiceImpl.class);
// 注册服务到注册中心
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
serviceMetaInfo.setServicePort(rpcConfig.getServerPort());
try {
registry.register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
// 启动 web 服务
VertxHttpServer httpServer = new VertxHttpServer();
httpServer.doStart(RpcApplication.getRpcConfig().getServerPort());
3.3 system新增文件
com.todaysaturday.registry.registry
etcd=com.todaysaturday.registry.EtcdRegistry
3.4 注册中心测试
/**
* 注册中心测试
*/
public class RegistryTest {
final Registry registry = new EtcdRegistry();
@Before
public void init() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("http://localhost:2379");
registry.init(registryConfig);
}
@Test
public void register() throws Exception {
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName("myService");
serviceMetaInfo.setServiceVersion("1.0");
serviceMetaInfo.setServiceHost("localhost");
serviceMetaInfo.setServicePort(1234);
registry.register(serviceMetaInfo);
serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName("myService");
serviceMetaInfo.setServiceVersion("1.0");
serviceMetaInfo.setServiceHost("localhost");
serviceMetaInfo.setServicePort(1235);
registry.register(serviceMetaInfo);
serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName("myService");
serviceMetaInfo.setServiceVersion("2.0");
serviceMetaInfo.setServiceHost("localhost");
serviceMetaInfo.setServicePort(1234);
registry.register(serviceMetaInfo);
}
@Test
public void unRegister() {
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName("myService");
serviceMetaInfo.setServiceVersion("1.0");
serviceMetaInfo.setServiceHost("localhost");
serviceMetaInfo.setServicePort(1234);
registry.unRegister(serviceMetaInfo);
}
@Test
public void serviceDiscovery() {
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName("myService");
serviceMetaInfo.setServiceVersion("1.0");
String serviceKey = serviceMetaInfo.getServiceKey();
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceKey);
Assert.assertNotNull(serviceMetaInfoList);
}
}