负载均衡
1. 负载均衡说明
何为负载?可以把负载理解为要处理的工作和压力,比如网络请求、事务、数据处理任务等。
何为均衡?把工作和压力平均地分配给多个工作者,从而分摊每个工作者的压力,保证大家正常工作。
所以,负载均衡是一种用来分配网络或计算负载到多个资源上的技术,它的目的是确保每个资源都能够有效地处理负载、增加系统的并发量、避免某些资源过载而导致性能下降或服务不可用的情况。
1.1常见负载均衡算法及问题
1.1.1 轮询
- 按照循环的顺序将请求分配给每个服务器,适用于各服务器性能相近的情况。
1.1.2 随机
- 随机选择一个服务器来处理请求,适用于服务器性能相近且负载均匀的情况。
1.1.3 加权轮询
- 根据服务器的性能或权重分配请求,性能更好的服务器会获得更多的请求,适用于服务器性能不均的情况。
1.1.4 加权随机
- 根据服务器的权重随机选择一个服务器处理请求,适用于服务器性能不均的情况。
1.1.5 最小连接数
- 选择当前连接数最少的服务器来处理请求,适用于长连接场景。
1.1.6 IP Hash
- 根据客户端 IP 地址的哈希值选择服务器处理请求,确保同一客户端的请求始终被分配到同一台服务器上,适用于需要保持会话一致性的场景。
1.1.7 节点下线
- 当某个节点下线时,其负载会被平均分摊到其他节点上,而不会影响到整个系统的稳定性,因为只有部分请求会受到影响。
1.1.8 倾斜问题
- 通过虚拟节点的引入,将每个物理节点映射到多个虚拟节点上,使得节点在哈希环上的 分布更加均匀,减少了节点间的负载差异。
2.代码实现
2.1 负载均衡器接口
/**
* 负载均衡器(消费端使用)
*/
public interface LoadBalancer {
/**
* 选择服务调用
* @param requestParams
* @param serviceMetaInfoList
* @return
*/
ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList);
}
2.2 轮询负载均衡器
- 使用 JUC 包的
AtomicInteger
实现原子计数器,防止并发冲突问题
/**
* 轮询负载均衡器
*/
public class RoundRobinLoadBalancer implements LoadBalancer{
/**
* 当前轮询的下标
*/
private final AtomicInteger currentIndex = new AtomicInteger(0);
@Override
public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList) {
if(serviceMetaInfoList.isEmpty()){
return null;
}
// 只有一个服务,无需轮询
int size = serviceMetaInfoList.size();
if(size == 1){
return serviceMetaInfoList.get(0);
}
// 取模算法轮询
int index = currentIndex.getAndIncrement() % size;
return serviceMetaInfoList.get(index);
}
}
2.3 随机负载均衡器
- 使用 Java 自带的 Random 类实现随机选取即可
/**
* 随机负载均衡器
*/
public class RandomLoadBalancer implements LoadBalancer{
private final Random random = new Random();
@Override
public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList) {
int size = serviceMetaInfoList.size();
if(size == 0){
return null;
}
if(size == 1){
return serviceMetaInfoList.get(0);
}
return serviceMetaInfoList.get(random.nextInt(size));
}
}
2.4 一致性Hash负载均衡器
- 使用 TreeMap 实现一致性 Hash 环,该数据结构提供了 ceilingEntry 和 firstEntry 两个方法,便于获取符合算法要求的节点
/**
* 一致性Hash负载均衡器
*/
public class ConsistentHashLoadBalancer implements LoadBalancer{
/**
* 一致性 Hash 环,存放虚拟节点
*/
private final TreeMap<Integer,ServiceMetaInfo> virtualNodes = new TreeMap<>();
/**
* 虚拟节点数
*/
private static final int VIRTUAL_NODE_NUM = 100;
@Override
public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList) {
if(serviceMetaInfoList.isEmpty()){
return null;
}
// 构建虚拟节点环
// 1. 每次调用负载均衡器时,都会重新构造 Hash 环,这是为了能够即时处理节点的变化
for (ServiceMetaInfo serviceMetaInfo : serviceMetaInfoList) {
for (int i=0;i<VIRTUAL_NODE_NUM;i++){
int hash = getHash(serviceMetaInfo.getServiceAddress());
virtualNodes.put(hash,serviceMetaInfo);
}
}
// 获取调用请求的hash值
// 2. 根据 requestParams 对象计算 Hash 值
int hash = getHash(requestParams);
// 选择最接近且大于等于调用请求hash值的虚拟节点
Map.Entry<Integer, ServiceMetaInfo> entry = virtualNodes.ceilingEntry(hash);
if(entry == null){
// 若没有大于等于调用请求 hash 值的虚拟节点,则返回环首部的节点
entry = virtualNodes.firstEntry();
}
return entry.getValue();
}
/**
* Hash算法
* @param key
* @return
*/
private int getHash(Object key){
return key.hashCode();
}
}
2.5 负载均衡器常量
/**
* 负载均衡器键名常量
*/
public interface LoadBalancerKeys {
/**
* 轮询
*/
String ROUND_ROBIN = "roundRobin";
/**
* 随机
*/
String RANDOM = "random";
/**
* 一致性哈希
*/
String CONSISTENT_HASH = "consistentHash";
}
2.6 负载均衡器工厂
/**
* 负载均衡器工厂(工厂模式,用于获取负载均衡器对象)
*/
public class LoadBalancerFactory {
static {
SpiLoader.load(LoadBalancer.class);
}
/**
* 默认负载均衡器
*/
private static final LoadBalancer DEFAULT_LOAD_BALANCER = new RoundRobinLoadBalancer();
/**
* 获取实例
* @param key
* @return
*/
public static LoadBalancer getInstance(String key){
return SpiLoader.getInstance(LoadBalancer.class,key);
}
}
2.7 负载均衡器接口的SPI配置文件
roundRobin=com.todaysaturday.loadbalancer.RoundRobinLoadBalancer
random=com.todaysaturday.loadbalancer.RandomLoadBalancer
consistentHash=com.todaysaturday.loadbalancer.ConsistentHashLoadBalancer
2.8 修改RpcConfig
/**
* 负载均衡器
*/
private String loadBalancer = LoadBalancerKeys.ROUND_ROBIN;
2.9 修改服务代理对象
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
// 负载均衡
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
// 将调用方法名(请求路径)作为负载均衡参数
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("methodName",rpcRequest.getMethodName());
ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList);
// 不取默认的
// ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 发出TCP请求
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}