1.简易版RPC框架开发

简易消费者

  • 1.注册服务
  • 2.启动HTTP服务器,等待请求
  • 3.通过rpc框架获取代理对象并调用代理对象的方法
  • 4.指定序列化器,构造RPC Request
  • 5.将请求序列化为字节数组,发送HTTP请求
  • 6.等待接收HTTP响应
  • 7.通过web服务器监听并接收consumer的请求
  • 8.经过请求处理器将请求反序列化RPC Request
  • 9.使用本地服务注册器查询 provider 中的对应服务并进行反射调用
  • 10.将 provider 返回的结果构造为RPC Response,并将RPC Response序列化为字节数组
  • 11.rpc发送HTTP响应
  • 12.consumer接收响应
  • 13.consumer将响应反序列化为RPC Request
  • 14.consumer获取响应中的结果

简易RPC实现

1.1 模块划分与介绍

首先需要初始化四个模块,分别是:

  • example-common:公共模块,提供公共的对象和方法
  • example-provider:提供者模块,提供服务给消费者调用
  • example-consumer:消费者模块,向提供者请求服务调用
  • todaysaturday-rpc-easy:提供 rpc 框架的核心方法

公共模块提供公共的对象及属性,提供者提供服务,消费者请求服务,rpc 作为中介平衡提供者和消费者之间的请求与调用

简易RPC

2.1 模块创建流程

新建项目文件夹(todaysaturday-rpc) –> 打开IDEA –> File -> New -> Module -> Maven Archetype -> Name:输入模块、Location:选择项目位置、JDK:选择11、Archetype:选择QuickStart -> 点击Create

3.1 基本模块

3.1.1 公共模块

用户实体类(User)

/**
 * 用户
 */
public class User implements Serializable {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

用户功能接口(UserService)

/**
 * 用户服务
 */
public interface UserService {

    /**
     * 获取用户
     *
     * @param user
     * @return
     */
    User getUser(User user);
}

3.1.2 提供者模块

引入依赖

<dependencies>
    <dependency>
      <!-- RPC 框架 -->
      <groupId>com.todaysaturday</groupId>
      <artifactId>todaysaturday-rpc-easy</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
      <!-- 公共模块 -->
      <groupId>com.todaysaturday</groupId>
      <artifactId>example-common</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
    <!-- https://doc.hutool.cn/ -->
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.8.16</version>
    </dependency>
    <!-- https://projectlombok.org/ -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.30</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
</dependencies>

服务实现类(UserServiceImpl)

/**
 * 用户服务实现类
 */
public class UserServiceImpl implements UserService {
    @Override
    public User getUser(User user) {
        System.out.println("用户名:"+user.getName());
        return null;
    }
}

服务提供者启动类(EasyProviderExample)

/**
 *  简易服务提供者示例
 */
public class EasyProviderExample {
    public static void main( String[] args ){
        // 注册服务
        LocalRegistry.register(UserService.class.getName(), UserServiceImpl.class);
        // 启动 web 服务
        VertxHttpServer httpServer = new VertxHttpServer();
        httpServer.doStart(8080);
    }
}

3.1.3 消费者模块

引入依赖

<dependencies>
    <dependency>
      <!-- RPC 框架 -->
      <groupId>com.todaysaturday</groupId>
      <artifactId>todaysaturday-rpc-easy</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
      <!-- 公共模块 -->
      <groupId>com.todaysaturday</groupId>
      <artifactId>example-common</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
    <!-- https://doc.hutool.cn/ -->
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.8.16</version>
    </dependency>
    <!-- https://projectlombok.org/ -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.30</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
</dependencies>

服务消费者启动类(EasyConsumerExample)

/**
 * 服务消费者启动类
 */
public class EasyConsumerExample {
    public static void main( String[] args ){
        // 创建静态代理
//        UserService userService = new UserServiceProxy();
        // 创建JDK动态代理
//        UserService userService = JdkServiceProxyFactory.getProxy(UserService.class);
        // 创建CGlib动态代理
        UserService userService = CGlibServiceProxyFactory.getProxy(UserService.class);
        User user = new User();
        user.setName("todaysaturday");

        User newUser = userService.getUser(user);
        if(newUser == null){
            System.out.println("user == null");
        }else{
            System.out.println("user == "+ newUser.getName());
        }
    }
}

4.1 RPC核心模块

创建Web服务器(Vert.x)

如果需要让服务提供者提供可远程访问的服务,那么就需要创建web服务器,能够接受处理请求、并返回响应

定义HTTP服务接口(HttpServer)

/**
 * HTTP服务器接口
 */
public interface HttpServer {
    /**
     * 启动服务器
     * @param port
     */
    void doStart(int port);
}

创建Vert.x服务实现类(VertxHttpServer)

/**
 * vertx 服务器
 */
public class VertxHttpServer implements HttpServer{

    /**
     * 启动服务器
     * @param port
     */
    @Override
    public void doStart(int port) {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.创建HTTP服务器
        io.vertx.core.http.HttpServer server = vertx.createHttpServer();

        // 3.监听端口并处理请求
        server.requestHandler(new HttpServerHandler());

        // 4.启动HTTP服务器并监听端口
        server.listen(port,result->{
           if(result.succeeded()){
               System.out.println("Server is now listening on port " + port);
           }else{
               System.err.println("Failed to start server: " + result.cause());
           }
        });
    }
}

创建本地服务注册器(LocalRegistry)

本地服务注册器 服务注册中心
根据服务名获取到对应的实现类
ConcurrentHashMap<String,Class<?>>
管理注册的服务,提供服务信息给消费者
存储注册信息(Map<String,Class<?>>)
注册服务(map.put())
获取服务(map.get())
删除服务(map.remove())
/**
 * 本地服务注册器
 */
public class LocalRegistry {
    /**
     * 使用线程安全的 concurrentHashMap 存‍储服务注册信息,
     * key 为服务名称、value 为服务的实现类
     */

    /**
     * 存储注册信息
     */
    private static final Map<String,Class<?>> map =new ConcurrentHashMap<>();

    /**
     * 注册服务
     * @param serviceName 服务名称
     * @param implClass 服务
     */
    public static void register(String serviceName,Class<?> implClass){
        map.put(serviceName,implClass);
    }

    /**
     * 获取服务
     * @param serviceName
     * @return
     */
    public static Class<?> get(String serviceName){
        return map.get(serviceName);
    }

    /**
     * 删除服务
     * @param serviceName
     */
    public static void remove(String serviceName){
        map.remove(serviceName);
    }
}

创建序列化器

创建序列化器接口,定义序列化 serializer 方法和反序列化 deserialize 方法

序列化器接口(Serializer)

/**
 * 序列化器接口
 */
public interface Serializer {

    /**
     * 序列化
     *
     * @param object
     * @param <T>
     * @return
     * @throws IOException
     */
    <T> byte[] serialize(T object) throws IOException;

    /**
     * 反序列化
     *
     * @param bytes
     * @param type
     * @param <T>
     * @return
     * @throws IOException
     */
    <T> T deserialize(byte[] bytes, Class<T> type) throws IOException;
}

JDK 序列化器实现类(JdkSerializer)

/**
 * JDK 序列化器
 */
public class JdkSerializer implements Serializer{

    /**
     * 将对象序列化为字节数组
     * @param object
     * @return
     * @param <T>
     * @throws IOException
     */
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.writeObject(object);
        objectOutputStream.close();
        return outputStream.toByteArray();
    }

    /**
     * 将字节数组反序列化为对象
     * @param bytes
     * @param type
     * @return
     * @param <T>
     * @throws IOException
     */
    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        try {
            return (T) objectInputStream.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            objectInputStream.close();
        }
    }
}

创建请求处理器

RPCRequest请求类

/**
 * RPC 请求
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {

    /**
     * 服务名称
     */
    private String serviceName;


    /**
     * 方法名称
     */
    private String methodName;

    /**
     * 参数类型列表
     */
    private Class<?>[] parameterTypes;

    /**
     * 参数列表
     */
    private Object[] args;
}

RPCResponse响应类

/**
 * RPC 响应
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {

    /**
     * 响应数据
     */
    private Object data;

    /**
     * 响应数据类型(预留)
     */
    private Class<?> dataType;

    /**
     * 响应信息
     */
    private String message;

    /**
     * 异常信息
     */
    private Exception exception;

}

请求处理器类(HttpServerHandler)

/**
 * 请求处理器
 */
public class HttpServerHandler implements Handler<HttpServerRequest> {
    /**
     * 反序列化请求为对象,并从请求对象中获取参数。
     * 根据服务名称从本地注册器中获取到对应的服务实现类。
     * 通过反射机制调用方法,得到返回结果。
     * 对返回结果进行封装和序列化,并写入到响应中。
     */

    @Override
    public void handle(HttpServerRequest request) {
        // 1.指定序列化器
        final JdkSerializer serializer = new JdkSerializer();
        // 记录日志
        System.out.println("Received request: " + request.method() + " " + request.uri());

        // 2.异步处理 HTTP 请求
        request.bodyHandler(body->{
            byte[] bytes = body.getBytes();
            RpcRequest rpcRequest = null;

            try {
                // 3.反序列化对象
                rpcRequest = serializer.deserialize(bytes,RpcRequest.class);
            } catch (IOException e) {
                e.printStackTrace();
            }

            // 4.构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();
            // 如果请求为null,直接返回
            if(rpcRequest == null){
                rpcResponse.setMessage("rpcRequest is null");
                doResponse(request,rpcResponse,serializer);
                return;
            }

            // 5.获取要调用的服务实现类,通过反射调用
            Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
            try {
                // 6.通过方法名找到目标方法
                Method method = implClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
                // 7.创建实例并反射调用方法
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());

                // 8.封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType());
                rpcResponse.setMessage("success");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }
            // 响应
            doResponse(request, rpcResponse, serializer);
        });


    }

    /**
     * 响应
     * @param request
     * @param rpcResponse
     * @param serializer
     */
    private void doResponse(HttpServerRequest request, RpcResponse rpcResponse, JdkSerializer serializer) {
        HttpServerResponse httpServerResponse = request.response()
                .putHeader("content-type","application/json");

        try {
            // 序列化
            byte[] serializered = serializer.serializer(rpcResponse);
            httpServerResponse.end(Buffer.buffer(serializered));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

创建代理

静态代理(UserServiceProxy)

/**
 * 静态代理,只代理某个接口/功能
 */
public class UserServiceProxy implements UserService {
    @Override
    public User getUser(User user) {
        // 1.指定序列化器
        JdkSerializer serializer = new JdkSerializer();

        // 2.发出请求
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(UserService.class.getName())
                .methodName("getUser")
                .parameterTypes(new Class[]{User.class})
                .args(new Object[]{user})
                .build();

        try {
            // 3.序列化对象
            byte[] bodyBytes = serializer.serializer(rpcRequest);
            byte[] result;
            // 4.发出HTTP请求
            try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
                    .body(bodyBytes)
                    .execute()){
                result = httpResponse.bodyBytes();
            }

            // 5.反序列化对象
            RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
            return (User) rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }
}

动态代理

JDK动态代理

/**
 * JDK 动态代理
 */
public class JdkServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 1.指定序列化器
        JdkSerializer serializer = new JdkSerializer();

        // 2.发出请求(全部使用动态的参数)
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        try {
            // 3.序列化对象
            byte[] bodyBytes = serializer.serializer(rpcRequest);
            byte[] result;
            // 4.发出HTTP请求
            try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
                    .body(bodyBytes)
                    .execute()){
                result = httpResponse.bodyBytes();
            }

            // 5.反序列化对象
            RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
            return (User) rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }
}

JDK动态代理工厂

/**
 * 服务代理工厂(用于创建代理对象)
 */
public class JdkServiceProxyFactory {

    /**
     * 根据服务类获取代理对象
     * @param serviceClass
     * @return
     * @param <T>
     */
    public static<T> T getProxy(Class<T> serviceClass){
        T t = (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader(),
                new Class[]{serviceClass},
                new JdkServiceProxy());
        return t;
    }

}

5.1 项目扩展

CGlib代理

引入依赖

<!-- CGlib 动态代理依赖 -->
<dependency>
  <groupId>cglib</groupId>
  <artifactId>cglib</artifactId>
  <version>3.3.0</version>
</dependency>

CGlib代理实现

/**
 * CGlib 动态代理
 */
public class CGlibServiceProxy implements MethodInterceptor {

    /**
     *
     * @param o
     * @param method
     * @param objects
     * @param methodProxy
     * @return
     */
    @Override
    public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy){
        // 指定序列化器
        JdkSerializer serializer = new JdkSerializer();

        // 发请求
        // builder构造链
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(method.getDeclaringClass().getName()) // 调用的远程服务接口名
                .methodName(method.getName()) // 方法名
                .parameterTypes(method.getParameterTypes()) // 获取方法参数类型
                .args(objects) // 实际参数
                .build();

        try {
            // 将请求序列化为字节数组
            byte[] bodyBytes = serializer.serialize(rpcRequest);
            byte[] result;

            // 将序列化的请求作为请求体发送
            try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
                    .body(bodyBytes)
                    .execute()){

                // 接收响应
                result = httpResponse.bodyBytes();
            }

            // 将返回的结果进行反序列化为响应对象
            RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);

            // 返回响应数据
            return rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }

CGlib代理工厂实现

/**
 * CGlib代理工厂
 */
public class CGlibServiceProxyFactory {

    /**
     * 根据服务类获取代理对象
     * @param serviceClass
     * @return
     * @param <T>
     */
    public static <T> T getProxy(Class<T> serviceClass) {
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(serviceClass);  // 设置(被代理类)
        enhancer.setCallback(new CGlibServiceProxy()); // 设置回调
        return (T) enhancer.create();  // 创建代理对象
    }
}