注册中心优化
1. 心跳检测和续期机制
实现心跳检测一般需要 2 个关键:定时、网络请求
但是使用 Etcd 实现心跳检测会更简单一些,因为 Etcd 自带了 key 过期机制,我们不妨换个思路:给节点注册信息一个 “生命倒计时”,让节点定期 续期,重置 自己的 倒计时。如果节点已宕机,一直不续期,Etcd 就会对 key 进行过期删除
1.1 提供心跳检测方法
/**
* 心跳检测(服务端)
*/
void heartBeat();
1.2 修改EtcdRegistry
1.2.1 添加本地缓存
/**
* 本机注册的节点Key集合(用于维护续期)
*/
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
1.2.2 修改注册方法
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();
// 创建 30秒 的租约
long leaseId = leaseClient.grant(30).get().getID();
// 设置存储的键值对
// 根路径拼接serviceKey
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder()
.withLeaseId(leaseId)
.build();
kvClient.put(key,value,putOption).get();
// 添加节点信息到本地缓存
localRegisterNodeKeySet.add(registerKey);
}
1.2.2 修改注销方法
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
kvClient.delete(ByteSequence.from(registerKey,StandardCharsets.UTF_8));
// 移除本地缓存中的节点信息
localRegisterNodeKeySet.remove(registerKey);
}
1.2.3 实现heartBeat方法
@Override
public void heartBeat() {
// 10秒续签一次
CronUtil.schedule("*/10 * * * * *", new Task() {
@Override
public void execute() {
// 遍历本节点所有的key
for (String key : localRegisterNodeKeySet) {
try {
// 获取节点值来进行判断
List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
.get()
.getKvs();
// 该节点已过期(需要重启节点才能重新注册)
if(CollUtil.isEmpty(keyValues)){
continue;
}
// 节点未过期,重新注册(相当于续签)
KeyValue keyValue = keyValues.get(0);
String value = keyValue.getValue().toString();
ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
// 重新注册节点
register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(key + "续签失败", e);
}
}
}
});
// 支持秒级别的定时任务
CronUtil.setMatchSecond(true);
CronUtil.start();
}
1.2.4 修改init方法
@Override
public void init(RegistryConfig registryConfig) {
client = Client.builder()
.endpoints(registryConfig.getAddress())
.connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
.build();
kvClient = client.getKVClient();
// 添加心跳检测
heartBeat();
}
1.2.5 测试代码
@Test
public void heartBeat() throws Exception {
// init 方法中已经执行心跳检测了
register();
// 阻塞 1 分钟
Thread.sleep(60 * 1000L);
}
2. 服务节点下线机制
服务节点下线又分为:
- 主动下线:服务提供者项目正常退出时,主动从注册中心移除注册信息。
- 被动下线:服务提供者项目异常推出时,利用 Etcd 的 key 过期机制自动移除。
2.1 修改EtcdRegistry代码完善下线节点
@Override
public void destroy() {
System.out.println("当前节点下线");
// 下线节点
// 遍历本节点所有的key
for (String key : localRegisterNodeKeySet) {
try {
kvClient.delete(ByteSequence.from(key,StandardCharsets.UTF_8)).get();
} catch (Exception e) {
throw new RuntimeException(key + "节点下线失败");
}
}
// 释放资源
if(kvClient != null){
kvClient.close();
}
if(client != null){
client.close();
}
}
2.2 修改RpcApplication的init方法
/**
* 框架初始化,支持传入自定义配置
* @param newRpcConfig
*/
public static void init(RpcConfig newRpcConfig){
rpcConfig = newRpcConfig;
log.info("rpc init, config = {}", newRpcConfig.toString());
// 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info("registry init,config = {}",registryConfig);
// 创建并注册 Shutdown Hook,JVM 退出时执行操作
Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));
}
3. 消费端服务缓存
3.1 增加注册中心服务本地缓存
/**
* 注册中心服务本地缓存
*/
public class RegistryServiceCache {
/**
* 服务缓存
*/
List<ServiceMetaInfo> serviceCache;
/**
* 写缓存
*/
void writeCache(List<ServiceMetaInfo> newServiceCache){
this.serviceCache = newServiceCache;
}
/**
* 读缓存
*/
List<ServiceMetaInfo> readCache(){
return this.serviceCache;
}
/**
* 清空缓存
*/
void clearCache(){
this.serviceCache = null;
}
}
3.2 使用本地缓存
3.2.1 新增注册中心服务缓存
/**
* 注册中心服务缓存
*/
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
3.2.2 修改服务发现
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 优先从注册中心缓存中获取服务
List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
if(CollUtil.isNotEmpty(cachedServiceMetaInfoList)){
return cachedServiceMetaInfoList;
}
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
try {
// 前缀查询
GetOption getOption = GetOption.builder()
.isPrefix(true)
.build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption)
.get()
.getKvs();
// 解析服务信息
List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream()
.map(keyValue -> {
// 第一次访问该服务时才进行前缀监听
if (!registryServiceCache.contains(serviceKey)) {
// 监听key 的变化
watch(serviceKey);
}
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value, ServiceMetaInfo.class);
})
.collect(Collectors.toList());
// 写入注册中心服务缓存
registryServiceCache.writeCache(serviceMetaInfoList);
return serviceMetaInfoList;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
3.2.3 注册中心服务新增contains方法
/**
* 校验缓存中是否包含重复的key
*/
boolean contains(String key){
if(this.serviceCache.contains(key)){
return true;
}
return false;
}
3.3 服务缓存更新-监听机制
3.3.1 注册中心新增监听方法
/**
* 监听(消费端)
*
* @param serviceNodeKey
*/
void watch(String serviceNodeKey);
3.3.2 创建监听集合
3.3.3 实现监听方法
@Override
public void watch(String serviceNodeKey) {
Watch watchClient = client.getWatchClient();
// 之前未被监听,开启监听
boolean newWatch = watchingKeySet.add(serviceNodeKey);
if(newWatch){
watchClient.watch(ByteSequence.from(serviceNodeKey,StandardCharsets.UTF_8),watchResponse -> {
for (WatchEvent event : watchResponse.getEvents()) {
switch (event.getEventType()){
// key 删除时触发
case DELETE:
// 清除注册服务缓存
registryServiceCache.clearCache();
break;
case PUT:
default:
break;
}
}
});
}
}
4. ZooKeeper 注册中心实现
- 官网:https://zookeeper.apache.org/releases.html
- 下载:https://zookeeper.apache.org/releases.html#download
- https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
- 参考文档:
4.1 引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.6.0</version>
</dependency>
4.2 ZooKeeperRegistry实现
package com.yupi.yurpc.registry;
import cn.hutool.core.collection.ConcurrentHashSet;
import com.yupi.yurpc.config.RegistryConfig;
import com.yupi.yurpc.model.ServiceMetaInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* zookeeper 注册中心
* 操作文档:<a href="https://curator.apache.org/docs/getting-started">Apache Curator</a>
* 代码示例:<a href="https://github.com/apache/curator/blob/master/curator-examples/src/main/java/discovery/DiscoveryExample.java">DiscoveryExample.java</a>
* 监听 key 示例:<a href="https://github.com/apache/curator/blob/master/curator-examples/src/main/java/cache/CuratorCacheExample.java">CuratorCacheExample.java</a>
*
* @author <a href="https://github.com/liyupi">coder_yupi</a>
* @from <a href="https://yupi.icu">编程导航学习圈</a>
* @learn <a href="https://codefather.cn">yupi 的编程宝典</a>
*/
@Slf4j
public class ZooKeeperRegistry implements Registry {
private CuratorFramework client;
private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;
/**
* 本机注册的节点 key 集合(用于维护续期)
*/
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
/**
* 注册中心服务缓存
*/
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
/**
* 正在监听的 key 集合
*/
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
/**
* 根节点
*/
private static final String ZK_ROOT_PATH = "/rpc/zk";
@Override
public void init(RegistryConfig registryConfig) {
// 构建 client 实例
client = CuratorFrameworkFactory
.builder()
.connectString(registryConfig.getAddress())
.retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3))
.build();
// 构建 serviceDiscovery 实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class)
.client(client)
.basePath(ZK_ROOT_PATH)
.serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class))
.build();
try {
// 启动 client 和 serviceDiscovery
client.start();
serviceDiscovery.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 注册到 zk 里
serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));
// 添加节点信息到本地缓存
String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
localRegisterNodeKeySet.add(registerKey);
}
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
try {
serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo));
} catch (Exception e) {
throw new RuntimeException(e);
}
// 从本地缓存移除
String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
localRegisterNodeKeySet.remove(registerKey);
}
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 优先从缓存获取服务
List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
if (cachedServiceMetaInfoList != null) {
return cachedServiceMetaInfoList;
}
try {
// 查询服务信息
Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceKey);
// 解析服务信息
List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream()
.map(ServiceInstance::getPayload)
.collect(Collectors.toList());
// 写入服务缓存
registryServiceCache.writeCache(serviceMetaInfoList);
return serviceMetaInfoList;
} catch (Exception e) {
throw new RuntimeException("获取服务列表失败", e);
}
}
@Override
public void heartBeat() {
// 不需要心跳机制,建立了临时节点,如果服务器故障,则临时节点直接丢失
}
/**
* 监听(消费端)
*
* @param serviceNodeKey 服务节点 key
*/
@Override
public void watch(String serviceNodeKey) {
String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey;
boolean newWatch = watchingKeySet.add(watchKey);
if (newWatch) {
CuratorCache curatorCache = CuratorCache.build(client, watchKey);
curatorCache.start();
curatorCache.listenable().addListener(
CuratorCacheListener
.builder()
.forDeletes(childData -> registryServiceCache.clearCache())
.forChanges(((oldNode, node) -> registryServiceCache.clearCache()))
.build()
);
}
}
@Override
public void destroy() {
log.info("当前节点下线");
// 下线节点(这一步可以不做,因为都是临时节点,服务下线,自然就被删掉了)
for (String key : localRegisterNodeKeySet) {
try {
client.delete().guaranteed().forPath(key);
} catch (Exception e) {
throw new RuntimeException(key + "节点下线失败");
}
}
// 释放资源
if (client != null) {
client.close();
}
}
private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) {
String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort();
try {
return ServiceInstance
.<ServiceMetaInfo>builder()
.id(serviceAddress)
.name(serviceMetaInfo.getServiceKey())
.address(serviceAddress)
.payload(serviceMetaInfo)
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
4.3添加spi的支持
zookeeper=com.todaysaturday.registry.ZooKeeperRegistry
4.4 修改Properties文件
rpc.registryConfig.registry=zookeeper
rpc.registryConfig.address=localhost:2181
4.5 修改CGlibServiceProxy
/**
* @param o
* @param method
* @param objects
* @param methodProxy
* @return
*/
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) {
// 本地处理 Object 类的方法,避免 RPC 分派
try {
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return o.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(o)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(o);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return o == objects[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, objects);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(objects)
.build();
try {
// 3.序列化对象
byte[] bodyBytes = serializer.serialize(rpcRequest);
// start
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 4.发出HTTP请求
try(HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
.body(bodyBytes)
.execute()){
byte[] result = httpResponse.bodyBytes();
// 5.反序列化对象
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (User) rpcResponse.getData();
}
// end
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
4.6 使用ZooKeeper 后出现报错
暂无服务地址的情况,在代理类中的反射方法中添加如下代码
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}