序列化器与SPI机制
定义一个 序列化器名称 => 序列化器实现类对象
的 Map,然后根据名称从 Map 中获取对象即可
- 1.加载全局配置并初始化RPC框架
- 2.注册服务,通过SPI机制并获取序列化器
- 3.启动HTTP服务器,等待请求
- 4.通过加载全局配置并初始化rpc框架获取代理对象并调用代理对象的方法,mock为true执行mockserviceproxy代理,mock为false,执行正常代理
- 5.通过SPI机制并获取序列化器,构造RPC Request
- 6.将请求序列化为字节数组,发送HTTP请求
- 7.等待接收HTTP响应
- 8.通过web服务器监听并接收consumer的请求
- 9.经过请求处理器将请求反序列化RPC Request
- 10.使用本地服务注册器查询 provider 中的对应服务并进行反射调用
- 11.将 provider 返回的结果构造为RPC Response,并将RPC Response序列化为字节数组
- 12.rpc发送HTTP响应
- 13.consumer接收响应
- 14.consumer将响应反序列化为RPC Request
- 15.consumer获取响应中的结果,mock为true返回默认结果,为false返回实际结果
SPI机制
- Service
- 是一个公开的接口或抽象类,定义了抽象的功能模块
- Service Provider
- 是Service接口的实现类
- ServiceLoader
- 负责在运行时发现并加载Service Provider
SPI(Service Provider Interface)服务提供接口是 Java 的机制,主要用于实现模块化开发和插件化扩展,SPI 机制允许服务提供者通过特定的配置文件将自己的实现注册到系统中,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码,从而实现了系统的解耦、提高了可扩展性
- 直接加载在META-INF/services下的文件中的完整类路径,不用管怎么实现的,只需要通过反射调用这个接口实现类就行
具体实现
系统实现
1. 创建目录
- 在
resources
资源目录下创建META-INF/services
目录,并且创建一个名称为要实现的接口的空文件
2. 创建文件,填写接口实现类的完整类路径
com.todaysaturday.serializer.JdkSerializer
3. 使用系统内置的 ServiceLoader 动态加载指定接口的实现类
// 指定序列化器
Serializer serializer = null;
ServiceLoader<Serializer> serviceLoader = ServiceLoader.load(Serializer.class);
for (Serializer service : serviceLoader) {
serializer = service;
}
自定义SPI实现
根据配置加载到类,读取配置文件,得到 序列化器名称 => 序列化器实现类对象
的映射
序列化器名称1 = 序列化器实现类对象的映射1
序列化器名称2 = 序列化器实现类对象的映射2
序列化器名称3 = 序列化器实现类对象的映射3
1.引入依赖
<!-- 序列化 -->
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.0</version>
</dependency>
2. JSON序列化器
/**
* JSON 序列化器
*/
public class JsonSerializer implements Serializer{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public <T> byte[] serialize(T object) throws IOException {
return OBJECT_MAPPER.writeValueAsBytes(object);
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
T obj = OBJECT_MAPPER.readValue(bytes, classType);
if (obj instanceof RpcRequest) {
return handleRequest((RpcRequest) obj, classType);
}
if (obj instanceof RpcResponse) {
return handleResponse((RpcResponse) obj, classType);
}
return obj;
}
/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcRequest rpc 请求
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] args = rpcRequest.getArgs();
// 循环处理每个参数的类型
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> clazz = parameterTypes[i];
// 如果类型不同,则重新处理一下类型
if (!clazz.isAssignableFrom(args[i].getClass())) {
byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
}
}
return type.cast(rpcRequest);
}
/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcResponse rpc 响应
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
// 处理响应数据
byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
return type.cast(rpcResponse);
}
}
3. Kryo序列化器
/**
* Kryo 序列化器
*/
public class KryoSerializer implements Serializer{
/**
* kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
*/
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
// 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
kryo.setRegistrationRequired(false);
return kryo;
});
@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
KRYO_THREAD_LOCAL.get().writeObject(output, obj);
output.close();
return byteArrayOutputStream.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
input.close();
return result;
}
}
4. Hessian序列化器
/**
* Hessian 序列化器
*/
public class HessianSerializer implements Serializer{
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(bos);
ho.writeObject(object);
return bos.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(bis);
return (T) hi.readObject(tClass);
}
}
5. 序列化器常量类(SerializerKeys)
/**
* 序列化器键名
*/
public class SerializerKeys {
public final static String JDK = "jdk";
public final static String JSON = "json";
public final static String KRYO = "kryo";
public final static String HESSIAN = "hessian";
}
6. 序列化器工厂
/**
* 序列化器工厂(用于获取序列化器对象)
*/
public class SerializerFactory {
/**
* 序列化映射(用于实现单例)
*/
private static final Map<String,Serializer> KEY_SERIALIZER_MAP = new HashMap<String,Serializer>(){{
put(SerializerKeys.JDK,new JdkSerializer());
put(SerializerKeys.JSON,new JsonSerializer());
put(SerializerKeys.KRYO,new KryoSerializer());
put(SerializerKeys.HESSIAN,new HessianSerializer());
}};
/**
* 默认序列化器
*/
private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get(SerializerKeys.JDK);
/**
* 获取实例,如果没有传key,则将默认序列化器返回
* @param key
* @return
*/
public static Serializer getInstance(String key){
return KEY_SERIALIZER_MAP.getOrDefault(key,DEFAULT_SERIALIZER);
}
}
7. 修改RpcConfig
/**
* 序列化器
*/
private String serializer = SerializerKeys.JDK;
8. 修改JdkServiceProxy
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
9. 修改HttpServerHandler
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
// 修改参数类型
private void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer)
自定义序列化器
1. 指定SPI配置目录
系统内置SPI机制 | 自定义序列化器 |
---|---|
加载 resources 资源目录下的 META-INF/services 目录 |
读取 META-INF/rpc 目录 |
用户自定义 SPI | 系统内置 SPI |
---|---|
META-INF/rpc/custom | META-INF/rpc/system |
在
resources
下新建META-INF/rpc/custom
和META-INF/rpc/custom
目录新建文件:
com.todaysaturday.serializer.JdkSerializer
文件内容中创建完整类路径名
jdk=com.todaysaturday.serializer.JdkSerializer json=com.todaysaturday.serializer.JsonSerializer kryo=com.todaysaturday.serializer.KryoSerializer hessian=com.todaysaturday.serializer.HessianSerializer
### 2. 编写SpiLoader加载器 - 用 Map 来存储已加载的配置信息 `键名 => 实现类`。 - 扫描指定路径,读取每个配置文件,获取到 `键名 => 实现类` 信息并存储在 Map 中。 - 定义获取实例方法,根据用户传入的接口和键名,从 Map 中找到对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可
/**
SPI加载器(支持键值对映射)
*/
@Slf4j
public class SpiLoader {/**
- 存储已加载的类:接口名 => (key => 实现类)
*/
private static Map<String, Map<String,Class<?>>> loaderMap = new ConcurrentHashMap<>();
/**
- 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
*/
private static Map<String,Object> instanceCache = new ConcurrentHashMap<>();
/**
- 系统SPI目录
*/
private static final String RPC_SYSTEM_SPI_DIR = “META-INF/rpc/system/“;
/**
- 用户自定义SPI目录
*/
private static final String RPC_CUSTOM_SPI_DIR = “META-INF/rpc/custom/“;
/**
- 扫描路径
*/
private static final String[] SCAN_DIRS = new String[]{
RPC_CUSTOM_SPI_DIR,
RPC_SYSTEM_SPI_DIR
};
/**
- 动态加载的类列表
*/
private static final List<Class<?>> LOAD_CLASS_LIST = Arrays.asList(Serializer.class);
/**
- 加载所有类型
*/
public static void loadAll(){
log.info(“加载所有的 SPI”);
for (Class<?> aClass : LOAD_CLASS_LIST) {
load(aClass);
}
}
/**
获取某个接口的实例对象
@param tClass
@param key
@return
@param
*/
public staticT getInstance(Class<?> tClass,String key){
// 获取全限定名
String tClassName = tClass.getName();Map<String, Class<?>> keyClassMap = loaderMap.get(tClassName);
if (keyClassMap == null) {
throw new RuntimeException(String.format(“SpiLoader 未加载 %s 类型”, tClassName));
}
if (!keyClassMap.containsKey(key)) {
throw new RuntimeException(String.format(“SpiLoader 的 %s 不存在 key=%s 的类型”, tClassName, key));
}// 获取到要加载的实现类型
Class<?> implClass = keyClassMap.get(key);
// 从实例缓存中加载指定类型的实例
String implClassName = implClass.getName();
// 如果缓存中不存在,直接实例化并进行put
if(!instanceCache.containsKey(implClassName)){
try {
instanceCache.put(implClassName,implClass.newInstance());
} catch (Exception e) {
String errorMsg = String.format(“%s 类实例化失败”, implClassName);
throw new RuntimeException(errorMsg, e);
}
}// 否则就直接返回
return (T) instanceCache.get(implClassName);
}
/**
- 加载某个类型
- @param loadClass
- @return
*/
public static Map<String, Class>> load(Class> loadClass) {
log.info(“加载类型为 {} 的 SPI”, loadClass.getName());
HashMap<String, Class<?>> keyClassMap = new HashMap<>();
for (String scanDir : SCAN_DIRS) {
// 获取当前路径下的文件
Listresources = ResourceUtil.getResources(scanDir + loadClass.getName());
// 读取每个资源文件
for (URL resource : resources) {
try {
InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line;
// 读取文件内容
while ((line = bufferedReader.readLine())!=null){
String[] strArray = line.split(“=”);
if(strArray.length > 1){
String key = strArray[0];
String className = strArray[1];
keyClassMap.put(key,Class.forName(className));
}
}
} catch (Exception e) {
log.error(“spi resource load error”, e);
}
}
}
loaderMap.put(loadClass.getName(),keyClassMap);
return keyClassMap;
}
- 存储已加载的类:接口名 => (key => 实现类)
}
### 3. 重构序列化器工厂
```java
/**
* 序列化器工厂(用于获取序列化器对象)
*/
public class SerializerFactory {
static {
SpiLoader.load(Serializer.class);
}
/**
* 默认序列化器
*/
private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();
/**
* 获取实例对象
*/
public static Serializer getInstance(String key){
if(!StringUtils.isEmpty(key)){
return SpiLoader.getInstance(Serializer.class,key);
}else{
return DEFAULT_SERIALIZER;
}
}
// /**
// * 序列化映射(用于实现单例)
// */
// private static final Map<String,Serializer> KEY_SERIALIZER_MAP = new HashMap<String,Serializer>(){{
// put(SerializerKeys.JDK,new JdkSerializer());
// put(SerializerKeys.JSON,new JsonSerializer());
// put(SerializerKeys.KRYO,new KryoSerializer());
// put(SerializerKeys.HESSIAN,new HessianSerializer());
// }};
//
// /**
// * 默认序列化器
// */
// private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get(SerializerKeys.JDK);
//
// /**
// * 获取实例,如果没有传key,则将默认序列化器返回
// * @param key
// * @return
// */
// public static Serializer getInstance(String key){
// return KEY_SERIALIZER_MAP.getOrDefault(key,DEFAULT_SERIALIZER);
// }
}
扩展
1. 新增 protostuff 序列化器
1.1 引入依赖
<!-- Protostuff核心库 -->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.8.0</version>
</dependency>
<!-- Protostuff运行时库,用于动态生成Schema -->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.8.0</version>
</dependency>
1.2 新增类路径
protostuff=com.todaysaturday.serializer.ProtostuffSerializer
1.3 修改properties
rpc.serializer = protostuff
1.4 修改SerializerKeys
public final static String PROTOSTUFF = "protostuff";
1.5 新增ProtostuffSerializer
package com.todaysaturday.serializer;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Protostuff 序列化器
* 基于Protostuff库实现的高性能序列化器,相比JDK序列化具有更好的性能和更小的序列化体积
*/
public class ProtostuffSerializer implements Serializer {
/**
* Schema缓存,避免重复创建Schema对象,提升性能
* 使用ConcurrentHashMap保证线程安全
*/
private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>();
/**
* 序列化:对象->字节数组
* 使用Protostuff将Java对象序列化为字节数组
*
* @param object 待序列化的对象
* @return 序列化后的字节数组
* @param <T> 对象类型
* @throws IOException 序列化过程中发生IO异常时抛出
*/
@Override
public <T> byte[] serialize(T object) throws IOException {
if (object == null) {
throw new IllegalArgumentException("序列化对象不能为null");
}
// 获取对象的Class类型
Class<T> clazz = (Class<T>) object.getClass();
// 获取对应的Schema,优先从缓存中获取
Schema<T> schema = getSchema(clazz);
// 创建LinkedBuffer用于序列化,使用默认大小
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
// 使用ProtostuffIOUtil进行序列化
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
} finally {
// 清理buffer资源
buffer.clear();
}
}
/**
* 反序列化:字节数组->对象
* 使用Protostuff将字节数组反序列化为Java对象
*
* @param bytes 待反序列化的字节数组
* @param type 目标对象的Class类型
* @return 反序列化后的对象
* @param <T> 对象类型
* @throws IOException 反序列化过程中发生异常时抛出
*/
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("反序列化字节数组不能为空");
}
if (type == null) {
throw new IllegalArgumentException("目标类型不能为null");
}
try {
// 获取对应的Schema
Schema<T> schema = getSchema(type);
// 创建目标类型的实例
T instance = schema.newMessage();
// 使用ProtostuffIOUtil进行反序列化
ProtostuffIOUtil.mergeFrom(bytes, instance, schema);
return instance;
} catch (Exception e) {
throw new IOException("反序列化失败: " + e.getMessage(), e);
}
}
/**
* 获取指定类型的Schema,优先从缓存中获取以提升性能
* Schema是Protostuff中描述对象结构的元数据,用于序列化和反序列化过程
*
* @param clazz 目标类型
* @return 对应的Schema对象
* @param <T> 类型参数
*/
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> clazz) {
// 先从缓存中尝试获取
Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz);
if (schema == null) {
// 缓存中不存在,则创建新的Schema
schema = RuntimeSchema.createFrom(clazz);
// 放入缓存中,使用putIfAbsent避免并发情况下的重复创建
Schema<T> existing = (Schema<T>) SCHEMA_CACHE.putIfAbsent(clazz, schema);
if (existing != null) {
schema = existing;
}
}
return schema;
}
}
2.改造序列化器工厂
1. 使用volatile修饰静态变量
/**
* 1.添加 volatile 静态私有实例变量
*/
private static volatile Serializer serializer;
2. 创建私有构造函数
/**
* 2.私有构造函数
*/
private SerializerFactory() {}
3. 创建访问入口(进行两次检查)
/**
* 3.公共静态获取实例的方法
* @param key
* @return
*/
public static Serializer getInstance(String key){
if(serializer == null){ // 第一次检查
synchronized (SerializerFactory.class){
if(serializer == null){ // 第二次检查
SpiLoader.load(Serializer.class);
if(!StrUtil.isEmpty(key)){
serializer = SpiLoader.getInstance(Serializer.class,key);
}else{
serializer = new JdkSerializer();
}
}
}
}
return serializer;
}
3.改造SPILoader
/**
* 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
*/
private static volatile Map<String,Object> instanceCache = new ConcurrentHashMap<>();
/**
* 获取某个接口的实例对象
* @param tClass
* @param key
* @return
* @param <T>
*/
public static <T> T getInstance(Class<?> tClass,String key){
// 获取全限定名
String tClassName = tClass.getName();
Map<String, Class<?>> keyClassMap = loaderMap.get(tClassName);
if (keyClassMap == null) {
throw new RuntimeException(String.format("SpiLoader 未加载 %s 类型", tClassName));
}
if (!keyClassMap.containsKey(key)) {
throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 key=%s 的类型", tClassName, key));
}
// 获取到要加载的实现类型
Class<?> implClass = keyClassMap.get(key);
// 从实例缓存中加载指定类型的实例
String implClassName = implClass.getName();
// 如果缓存中不存在,直接实例化并进行put
if(!instanceCache.containsKey(implClassName)){ // 第一次检查
synchronized (instanceCache){
if(!instanceCache.containsKey(implClassName)){// 第二次检查
try {
instanceCache.put(implClassName,implClass.newInstance());
} catch (Exception e) {
String errorMsg = String.format("%s 类实例化失败", implClassName);
throw new RuntimeException(errorMsg, e);
}
}
}
}
// 否则就直接返回
return (T) instanceCache.get(implClassName);
}