4.序列化器与SPI机制

序列化器与SPI机制

定义一个 序列化器名称 => 序列化器实现类对象 的 Map,然后根据名称从 Map 中获取对象即可

  • 1.加载全局配置并初始化RPC框架
  • 2.注册服务,通过SPI机制并获取序列化器
  • 3.启动HTTP服务器,等待请求
  • 4.通过加载全局配置并初始化rpc框架获取代理对象并调用代理对象的方法,mock为true执行mockserviceproxy代理,mock为false,执行正常代理
  • 5.通过SPI机制并获取序列化器,构造RPC Request
  • 6.将请求序列化为字节数组,发送HTTP请求
  • 7.等待接收HTTP响应
  • 8.通过web服务器监听并接收consumer的请求
  • 9.经过请求处理器将请求反序列化RPC Request
  • 10.使用本地服务注册器查询 provider 中的对应服务并进行反射调用
  • 11.将 provider 返回的结果构造为RPC Response,并将RPC Response序列化为字节数组
  • 12.rpc发送HTTP响应
  • 13.consumer接收响应
  • 14.consumer将响应反序列化为RPC Request
  • 15.consumer获取响应中的结果,mock为true返回默认结果,为false返回实际结果

序列化器SPI机制

SPI机制

  • Service
    • 是一个公开的接口或抽象类,定义了抽象的功能模块
  • Service Provider
    • 是Service接口的实现类
  • ServiceLoader
    • 负责在运行时发现并加载Service Provider

SPI(Se‍rvice Provid‍er Interface⁠)服务提供接口是 Java 的机制,主要用于实现‌模块化开发和插件化扩展,SPI 机制允许‍服务提供者通过特定的配置文件将自‍己的实现注册到系统中,然后系统通⁠过反射机制动态加载这些实现,而不需要修改原始框架的代码,从而实现‌了系统的解耦、提高了可扩展性

  • 直接加载在META-INF/services下的文件中的完整类路径,不用管怎么实现的,只需要通过反射调用这个接口实现类就行

image-20250602100911758

image-20250602101047812image-20250602101050058

具体实现

系统实现

1. 创建目录

  • resources 资源目录下创建 META-INF/services 目录,并且创建一个名称为要实现的接口的空文件

2. 创建文件,填写接口实现类的完整类路径

com.todaysaturday.serializer.JdkSerializer

3. 使‍用系统内置的 Se‍rviceLoad⁠er 动态加载指定接口的实现类

// 指定序列化器
Serializer serializer = null;
ServiceLoader<Serializer> serviceLoader = ServiceLoader.load(Serializer.class);
for (Serializer service : serviceLoader) {
    serializer = service;
}

自定义SPI实现

根据配置加载到类,读取配置文件,得到 序列化器名称 => 序列化器实现类对象 的映射

序列化器名称1 = 序列化器实现类对象的映射1
序列化器名称2 = 序列化器实现类对象的映射2
序列化器名称3 = 序列化器实现类对象的映射3

1.引入依赖

<!-- 序列化 -->
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.66</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>5.6.0</version>
</dependency>

2. JSON序列化器

/**
 * JSON 序列化器
 */
public class JsonSerializer implements Serializer{
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        return OBJECT_MAPPER.writeValueAsBytes(object);
    }


    @Override
    public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
        T obj = OBJECT_MAPPER.readValue(bytes, classType);
        if (obj instanceof RpcRequest) {
            return handleRequest((RpcRequest) obj, classType);
        }
        if (obj instanceof RpcResponse) {
            return handleResponse((RpcResponse) obj, classType);
        }
        return obj;
    }

    /**
     * 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
     *
     * @param rpcRequest rpc 请求
     * @param type       类型
     * @return {@link T}
     * @throws IOException IO异常
     */
    private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] args = rpcRequest.getArgs();

        // 循环处理每个参数的类型
        for (int i = 0; i < parameterTypes.length; i++) {
            Class<?> clazz = parameterTypes[i];
            // 如果类型不同,则重新处理一下类型
            if (!clazz.isAssignableFrom(args[i].getClass())) {
                byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
                args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
            }
        }
        return type.cast(rpcRequest);
    }

    /**
     * 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
     *
     * @param rpcResponse rpc 响应
     * @param type        类型
     * @return {@link T}
     * @throws IOException IO异常
     */
    private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
        // 处理响应数据
        byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
        rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
        return type.cast(rpcResponse);
    }
}

3. Kryo序列化器

/**
 * Kryo 序列化器
 */
public class KryoSerializer implements Serializer{

    /**
     * kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
     */
    private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        // 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
        kryo.setRegistrationRequired(false);
        return kryo;
    });
    @Override
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        KRYO_THREAD_LOCAL.get().writeObject(output, obj);
        output.close();
        return byteArrayOutputStream.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> classType) {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);
        T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
        input.close();
        return result;
    }
}

4. Hessian序列化器

/**
 * Hessian 序列化器
 */
public class HessianSerializer implements Serializer{
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        HessianOutput ho = new HessianOutput(bos);
        ho.writeObject(object);
        return bos.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        HessianInput hi = new HessianInput(bis);
        return (T) hi.readObject(tClass);
    }
}

5. 序列化器常量类(SerializerKeys)

/**
 * 序列化器键名
 */
public class SerializerKeys {
    public final static String JDK = "jdk";
    public final static String JSON = "json";
    public final static String KRYO = "kryo";
    public final static String HESSIAN = "hessian";
}

6. 序列化器工厂

/**
 * 序列化器工厂(用于获取序列化器对象)
 */
public class SerializerFactory {

    /**
     * 序列化映射(用于实现单例)
     */
    private static final Map<String,Serializer> KEY_SERIALIZER_MAP = new HashMap<String,Serializer>(){{
        put(SerializerKeys.JDK,new JdkSerializer());
        put(SerializerKeys.JSON,new JsonSerializer());
        put(SerializerKeys.KRYO,new KryoSerializer());
        put(SerializerKeys.HESSIAN,new HessianSerializer());
    }};

    /**
     * 默认序列化器
     */
    private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get(SerializerKeys.JDK);

    /**
     * 获取实例,如果没有传key,则将默认序列化器返回
     * @param key
     * @return
     */
    public static Serializer getInstance(String key){
        return KEY_SERIALIZER_MAP.getOrDefault(key,DEFAULT_SERIALIZER);
    }
}

7. 修改RpcConfig

/**
 * 序列化器
 */
private String serializer = SerializerKeys.JDK;

8. 修改JdkServiceProxy

// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

9. 修改HttpServerHandler

// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
// 修改参数类型
private void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer)

自定义序列化器

1. 指定SPI配置目录

系统内置SPI机制 自定义序列化器
加载 resources 资源目录下的 META-INF/services 目录 读取 META-INF/rpc 目录
用户自定义 SPI 系统内置 SPI
META-INF/rpc/custom META-INF/rpc/system
  • resources 下新建META-INF/rpc/customMETA-INF/rpc/custom 目录

  • 新建文件:com.todaysaturday.serializer.JdkSerializer

  • 文件内容中创建完整类路径名

    • jdk=com.todaysaturday.serializer.JdkSerializer
      json=com.todaysaturday.serializer.JsonSerializer
      kryo=com.todaysaturday.serializer.KryoSerializer
      hessian=com.todaysaturday.serializer.HessianSerializer
      
      
      ### 2. 编写SpiLoader加载器
      
      - 用 Map 来存储已加载的配置信息 `键名 => 实现类`。
      - 扫描指定路径,读取每个配置文件,获取到 `键名 => 实现类` 信息并存储在 Map 中。
      - 定义获取实例方法,根据用户传入的接口和键名,从 Map 中找到对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可
      

/**

  • SPI加载器(支持键值对映射)
    */
    @Slf4j
    public class SpiLoader {

    /**

    • 存储已加载的类:接口名 => (key => 实现类)
      */
      private static Map<String, Map<String,Class<?>>> loaderMap = new ConcurrentHashMap<>();

    /**

    • 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
      */
      private static Map<String,Object> instanceCache = new ConcurrentHashMap<>();

    /**

    • 系统SPI目录
      */
      private static final String RPC_SYSTEM_SPI_DIR = “META-INF/rpc/system/“;

    /**

    • 用户自定义SPI目录
      */
      private static final String RPC_CUSTOM_SPI_DIR = “META-INF/rpc/custom/“;

    /**

    • 扫描路径
      */
      private static final String[] SCAN_DIRS = new String[]{
      RPC_CUSTOM_SPI_DIR,
      RPC_SYSTEM_SPI_DIR

    };

    /**

    • 动态加载的类列表
      */
      private static final List<Class<?>> LOAD_CLASS_LIST = Arrays.asList(Serializer.class);

    /**

    • 加载所有类型
      */
      public static void loadAll(){
      log.info(“加载所有的 SPI”);
      for (Class<?> aClass : LOAD_CLASS_LIST) {
      load(aClass);
      }
      }

    /**

    • 获取某个接口的实例对象

    • @param tClass

    • @param key

    • @return

    • @param
      */
      public static T getInstance(Class<?> tClass,String key){
      // 获取全限定名
      String tClassName = tClass.getName();

      Map<String, Class<?>> keyClassMap = loaderMap.get(tClassName);
      if (keyClassMap == null) {
      throw new RuntimeException(String.format(“SpiLoader 未加载 %s 类型”, tClassName));
      }
      if (!keyClassMap.containsKey(key)) {
      throw new RuntimeException(String.format(“SpiLoader 的 %s 不存在 key=%s 的类型”, tClassName, key));
      }

      // 获取到要加载的实现类型
      Class<?> implClass = keyClassMap.get(key);
      // 从实例缓存中加载指定类型的实例
      String implClassName = implClass.getName();
      // 如果缓存中不存在,直接实例化并进行put
      if(!instanceCache.containsKey(implClassName)){
      try {
      instanceCache.put(implClassName,implClass.newInstance());
      } catch (Exception e) {
      String errorMsg = String.format(“%s 类实例化失败”, implClassName);
      throw new RuntimeException(errorMsg, e);
      }
      }

      // 否则就直接返回
      return (T) instanceCache.get(implClassName);

    }

    /**

    • 加载某个类型
    • @param loadClass
    • @return
      */
      public static Map<String, Class> load(Class loadClass) {
      log.info(“加载类型为 {} 的 SPI”, loadClass.getName());
      HashMap<String, Class<?>> keyClassMap = new HashMap<>();
      for (String scanDir : SCAN_DIRS) {
      // 获取当前路径下的文件
      List resources = ResourceUtil.getResources(scanDir + loadClass.getName());
      // 读取每个资源文件
      for (URL resource : resources) {
      try {
      InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
      BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
      String line;
      // 读取文件内容
      while ((line = bufferedReader.readLine())!=null){
      String[] strArray = line.split(“=”);
      if(strArray.length > 1){
      String key = strArray[0];
      String className = strArray[1];
      keyClassMap.put(key,Class.forName(className));
      }
      }
      } catch (Exception e) {
      log.error(“spi resource load error”, e);
      }
      }
      }
      loaderMap.put(loadClass.getName(),keyClassMap);
      return keyClassMap;
      }

}




### 3. 重构序列化器工厂

```java
/**
 * 序列化器工厂(用于获取序列化器对象)
 */
public class SerializerFactory {
    static {
        SpiLoader.load(Serializer.class);
    }
    /**
     * 默认序列化器
     */
    private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();

    /**
     * 获取实例对象
     */
    public static Serializer getInstance(String key){
        if(!StringUtils.isEmpty(key)){
            return SpiLoader.getInstance(Serializer.class,key);
        }else{
            return DEFAULT_SERIALIZER;
        }
    }

//    /**
//     * 序列化映射(用于实现单例)
//     */
//    private static final Map<String,Serializer> KEY_SERIALIZER_MAP = new HashMap<String,Serializer>(){{
//        put(SerializerKeys.JDK,new JdkSerializer());
//        put(SerializerKeys.JSON,new JsonSerializer());
//        put(SerializerKeys.KRYO,new KryoSerializer());
//        put(SerializerKeys.HESSIAN,new HessianSerializer());
//    }};
//
//    /**
//     * 默认序列化器
//     */
//    private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get(SerializerKeys.JDK);
//
//    /**
//     * 获取实例,如果没有传key,则将默认序列化器返回
//     * @param key
//     * @return
//     */
//    public static Serializer getInstance(String key){
//        return KEY_SERIALIZER_MAP.getOrDefault(key,DEFAULT_SERIALIZER);
//    }
}

扩展

1. 新增 protostuff 序列化器

1.1 引入依赖

<!-- Protostuff核心库 -->
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.8.0</version>
</dependency>

<!-- Protostuff运行时库,用于动态生成Schema -->
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.8.0</version>
</dependency>

1.2 新增类路径

protostuff=com.todaysaturday.serializer.ProtostuffSerializer

1.3 修改properties

rpc.serializer = protostuff

1.4 修改SerializerKeys

public final static String PROTOSTUFF = "protostuff";

1.5 新增ProtostuffSerializer

package com.todaysaturday.serializer;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Protostuff 序列化器
 * 基于Protostuff库实现的高性能序列化器,相比JDK序列化具有更好的性能和更小的序列化体积
 */
public class ProtostuffSerializer implements Serializer {
    
    /**
     * Schema缓存,避免重复创建Schema对象,提升性能
     * 使用ConcurrentHashMap保证线程安全
     */
    private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>();
    
    /**
     * 序列化:对象->字节数组
     * 使用Protostuff将Java对象序列化为字节数组
     * 
     * @param object 待序列化的对象
     * @return 序列化后的字节数组
     * @param <T> 对象类型
     * @throws IOException 序列化过程中发生IO异常时抛出
     */
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        if (object == null) {
            throw new IllegalArgumentException("序列化对象不能为null");
        }
        
        // 获取对象的Class类型
        Class<T> clazz = (Class<T>) object.getClass();
        
        // 获取对应的Schema,优先从缓存中获取
        Schema<T> schema = getSchema(clazz);
        
        // 创建LinkedBuffer用于序列化,使用默认大小
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        
        try {
            // 使用ProtostuffIOUtil进行序列化
            return ProtostuffIOUtil.toByteArray(object, schema, buffer);
        } finally {
            // 清理buffer资源
            buffer.clear();
        }
    }

    /**
     * 反序列化:字节数组->对象
     * 使用Protostuff将字节数组反序列化为Java对象
     * 
     * @param bytes 待反序列化的字节数组
     * @param type 目标对象的Class类型
     * @return 反序列化后的对象
     * @param <T> 对象类型
     * @throws IOException 反序列化过程中发生异常时抛出
     */
    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        if (bytes == null || bytes.length == 0) {
            throw new IllegalArgumentException("反序列化字节数组不能为空");
        }
        
        if (type == null) {
            throw new IllegalArgumentException("目标类型不能为null");
        }
        
        try {
            // 获取对应的Schema
            Schema<T> schema = getSchema(type);
            
            // 创建目标类型的实例
            T instance = schema.newMessage();
            
            // 使用ProtostuffIOUtil进行反序列化
            ProtostuffIOUtil.mergeFrom(bytes, instance, schema);
            
            return instance;
        } catch (Exception e) {
            throw new IOException("反序列化失败: " + e.getMessage(), e);
        }
    }
    
    /**
     * 获取指定类型的Schema,优先从缓存中获取以提升性能
     * Schema是Protostuff中描述对象结构的元数据,用于序列化和反序列化过程
     * 
     * @param clazz 目标类型
     * @return 对应的Schema对象
     * @param <T> 类型参数
     */
    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> clazz) {
        // 先从缓存中尝试获取
        Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz);
        
        if (schema == null) {
            // 缓存中不存在,则创建新的Schema
            schema = RuntimeSchema.createFrom(clazz);
            // 放入缓存中,使用putIfAbsent避免并发情况下的重复创建
            Schema<T> existing = (Schema<T>) SCHEMA_CACHE.putIfAbsent(clazz, schema);
            if (existing != null) {
                schema = existing;
            }
        }
        
        return schema;
    }
}

2.改造序列化器工厂

1. 使用volatile修饰静态变量

/**
 * 1.添加 volatile 静态私有实例变量
 */
private static volatile Serializer serializer;

2. 创建私有构造函数

/**
 * 2.私有构造函数
 */
private SerializerFactory() {}

3. 创建访问入口(进行两次检查)

/**
 * 3.公共静态获取实例的方法
 * @param key
 * @return
 */
public static Serializer getInstance(String key){
    if(serializer == null){ // 第一次检查
        synchronized (SerializerFactory.class){
            if(serializer == null){            // 第二次检查
                SpiLoader.load(Serializer.class);
                if(!StrUtil.isEmpty(key)){
                    serializer = SpiLoader.getInstance(Serializer.class,key);
                }else{
                    serializer = new JdkSerializer();
                }
            }
        }
    }
    return serializer;
}

3.改造SPILoader

/**
 * 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
 */
private static volatile Map<String,Object> instanceCache = new ConcurrentHashMap<>();
/**
 * 获取某个接口的实例对象
 * @param tClass
 * @param key
 * @return
 * @param <T>
 */
public static <T> T getInstance(Class<?> tClass,String key){
    // 获取全限定名
    String tClassName = tClass.getName();

    Map<String, Class<?>> keyClassMap = loaderMap.get(tClassName);
    if (keyClassMap == null) {
        throw new RuntimeException(String.format("SpiLoader 未加载 %s 类型", tClassName));
    }
    if (!keyClassMap.containsKey(key)) {
        throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 key=%s 的类型", tClassName, key));
    }

    // 获取到要加载的实现类型
    Class<?> implClass = keyClassMap.get(key);
    // 从实例缓存中加载指定类型的实例
    String implClassName = implClass.getName();

    // 如果缓存中不存在,直接实例化并进行put
    if(!instanceCache.containsKey(implClassName)){ // 第一次检查
        synchronized (instanceCache){
            if(!instanceCache.containsKey(implClassName)){// 第二次检查
                try {
                    instanceCache.put(implClassName,implClass.newInstance());
                } catch (Exception e) {
                    String errorMsg = String.format("%s 类实例化失败", implClassName);
                    throw new RuntimeException(errorMsg, e);
                }
            }
        }
    }

    // 否则就直接返回
    return (T) instanceCache.get(implClassName);
}