7.自定义协议

自定义协议

1. 网络传输设计

  • 网络传输设‍计的目标是:选择一‍个能够高性能通信的⁠网络协议和传输方式؜

2. 消息结构设计

  • 消息结构设计的目标是:用 最少的 空间传递 需要的 信息

2.1 RPC 消息所需内容:

  • 魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似 HTTPS 的安全证书)
  • 版本号:保证请求和响应的一致性(类似 HTTP 协议有 1.0/2.0 等版本)
  • 序列化方式:来告诉服务端和客户端如何解析数据(类似 HTTP 的 Content-Type 内容类型)
  • 类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP 有请求头和响应头)
  • 状态:如果是响应,记录响应的结果(类似 HTTP 的 200 状态代码)
  • 请求ID:唯‍一标识某个请求,因⁠为 TCP 是双向؜通信的,需要有个唯‌一标识来追踪每个请求
  • 请求体:要发送的 body 内容数据
  • 请求数据长度:保证能够完整地获取 body 内容信息
flowchart TD
    subgraph header["请求头"]
        A["魔数(8bit) | 版本(8bit) | 序列化方式(8bit) | 类型(8bit) | 状态(8bit)"]
        B["请求id (64 bit)"]
        C["请求体数据长度 (32 bit)"]
    end
    
    D["请求体(内容)"]
    
    header --> D
    
    style A fill:#e1f5fe
    style B fill:#f3e5f5
    style C fill:#e8f5e8
    style D fill:#fff3e0

2.2 写入和读取

请求头信息总长 17 个字节,也就是说,上述消息结构,本质上就是拼接在一起的一个字节数组。我们后续实现时,需要有 消息编码器消息解码器,编码器先 new 一个空的 Buffer 缓冲区,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据,通过这种约定的‍方式,我们就不用记录头信息‍了

3.代码实现-消息结构

新建protocol包,将所有和自定义协议有关的代码放到该包下

3.1 新建协议消息类 ProtocolMessage

  • 将消息头单‍独封装为一个内部类‍,消息体可以使用泛⁠型类型
/**
 * 协议消息结构
 * @param <T>
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage <T>{

    /**
     * 消息头
     */
    private Header header;

    /**
     * 消息体
     */
    private T body;

    /**
     * 协议消息头
     */
    @Data
    public static class Header{
        /**
         * 魔数,保证安全性
         */
        private byte magic;

        /**
         * 版本号
         */
        private byte version;

        /**
         * 序列化器
         */
        private byte serializer;

        /**
         * 消息类型(请求 / 响应)
         */
        private byte type;

        /**
         * 状态
         */
        private byte status;

        /**
         * 请求 id
         */
        private long requestId;

        /**
         * 消息体长度
         */
        private int bodyLength;
    }
}

3.2 新建协议常量类 ProtocolConstant

  • 记录了和自‍定义协议有关的关键‍信息,比如消息头长⁠度、魔数、版本号。
/**
 * 协议常量
 */
public interface ProtocolConstant {
    /**
     * 消息头长度
     */
    int MESSAGE_HEADER_LENGTH = 17;

    /**
     * 协议魔数
     */
    byte PROTOCOL_MAGIC = 0X1;

    /**
     * 协议版本号
     */
    byte PROTOCOL_VERSION = 0X1;
}

3.3 新建消息字段的状态枚举类 ProtocolMessageStatusEnum

  • 协议状态枚‍举,暂时只定义成功‍、请求失败、响应失⁠败三种枚举值
/**
 * 协议消息的状态枚举
 */
@Getter
public enum ProtocolMessageStatusEnum {
    SUCCESS("success", 20),
    BAD_REQUEST("badRequest", 40),
    BAD_RESPONSE("badResponse", 50);

    private final String text;
    private final int value;

    ProtocolMessageStatusEnum(String text, int value) {
        this.text = text;
        this.value = value;
    }

    /**
     * 根据value获取枚举
     * @param value
     * @return
     */
    public static ProtocolMessageStatusEnum getEnumByValue(int value){
        for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {
            if(anEnum.getValue() == value){
                return anEnum;
            }
        }
        return null;
    }
}

3.4 新建消息字段的类型枚举类 ProtocolMessageTypeEnum

  • 协议消息类型枚举,包括请求、响应、心跳、其他
/**
 * 协议消息的类型枚举
 */
@Getter
public enum ProtocolMessageTypeEnum {
    REQUEST(0),
    RESPONSE(1),
    HEART_BEAT(2),
    OTHERS(3);

    private final int key;

    ProtocolMessageTypeEnum(int key) {
        this.key = key;
    }

    /**
     * 根据 key 获取枚举
     * @param key
     * @return
     */
    public static ProtocolMessageTypeEnum getEnumByValue(int key){
        for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {
            if(anEnum.key == key){
                return anEnum;
            }
        }
        return null;
    }
}

3.5 新建消息字段的序列化器枚举类 ProtocolMessageSerializerEnum

  • 协议消息的‍序列化器枚举,和 RPC 框架已⁠支持的序列化器对应
/**
 * 协议消息的序列化器枚举
 */
@Getter
public enum ProtocolMessageSerializerEnum {
    JDK(0,"jdk"),
    JSON(1,"json"),
    KRYO(2,"kryo"),
    HESSIAN(3,"hessian"),
    PROTOSTUFF(4, "protostuff"); 

    private final int key;
    private final String value;

    ProtocolMessageSerializerEnum(int key,String value) {
        this.key = key;
        this.value = value;
    }

    /**
     * 获取值列表
     */
    public static List<String> getValues(){
        return Arrays.stream(values()).map(item->item.value).collect(Collectors.toList());
    }

    /**
     * 根据 key 获取枚举
     * @param key
     * @return
     */
    public static ProtocolMessageSerializerEnum getEnumByKey(int key){
        for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
            if(anEnum.key == key){
                return anEnum;
            }
        }
        return null;
    }

    /**
     * 根据value获取枚举
     * @param value
     * @return
     */
    public static ProtocolMessageSerializerEnum getEnumByValue(String value){
        if (value == null) {
            return null;
        }
        for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
            // 重点:使用 .equals() 来正确比较字符串内容
            if(anEnum.getValue().equals(value)){
                return anEnum;
            }
        }
        return null;
    }
}

4.代码实现-网络传输

4.1 TCP服务器实现

/**
 * Vertx Tcp 服务器
 */
public class VertxTcpServer implements HttpServer {

    private byte[] handleRequest(byte[] requestData){
        return "Hello Client".getBytes();
    }
    @Override
    public void doStart(int port) {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.创建TCP服务器
        NetServer server = vertx.createNetServer();

        // 3.处理请求
        server.connectHandler(socket ->{
           // 处理连接
           socket.handler(buffer -> {
               // 处理接收到的字节数组
               byte[] requestData = buffer.getBytes();

               // 进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应
               byte[] responseData = handleRequest(requestData);

               // 发送响应
               socket.write(Buffer.buffer(responseData));
           });
        });

        // 4.启动TCP服务器并监听端口
        server.listen(port,result->{
            if(result.succeeded()){
                System.out.println("Server is now listening on port " + port);
            }else{
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().doStart(8888);
    }
}

4.2 TCP客户端实现

/**
 * Vertx Tcp 客户端
 */
public class VertxTcpClient {

    public void start() {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.监听端口处理请求
        vertx.createNetClient().connect(8888,"localhost",result->{
           if(result.succeeded()){
               System.out.println("connected to TCP server");
               NetSocket socket = result.result();
               // 发送数据
               socket.write("Hello Server");
               // 接收响应
               socket.handler(buffer -> {
                   System.out.println("Received response from server: " + buffer.toString());
               });
           }else{
               System.err.println("Failed to connect to TCP server");
           }
        });
    }

    public static void main(String[] args) {
        new VertxTcpClient().start();
    }
}

5.代码实现-编码/解码器

5.1 消息编码器

/**
 * 编码
 */
public class protocolMessageEncoder {

    /**
     * 编码
     * @param protocolMessage
     * @return
     * @throws IOException
     */
    public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
        Buffer buffer = Buffer.buffer();

        if(protocolMessage == null || protocolMessage.getHeader() == null){
            return buffer;
        }
        // 获取请求头
        ProtocolMessage.Header header = protocolMessage.getHeader();
        // 依次向缓冲区写入字节
        buffer.appendByte(header.getMagic());
        buffer.appendByte(header.getVersion());
        buffer.appendByte(header.getSerializer());
        buffer.appendByte(header.getType());
        buffer.appendByte(header.getStatus());
        buffer.appendLong(header.getRequestId());
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(serializerEnum == null){
            throw new RuntimeException("序列化协议不存在");
        }

        // 获取序列化对象
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        // 序列化请求体
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());

        // 写入body 长度和数据
        buffer.appendInt(bodyBytes.length);
        buffer.appendBytes(bodyBytes);

        return buffer;
    }
}

5.2 消息解码器

/**
 * 消息解码器
 */
public class protocolMessageDecoder {


    /**
     * 解码
     * @param buffer
     * @return
     */
    public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();

        // 校验魔数
        byte magic = buffer.getByte(0);
        if(magic != ProtocolConstant.PROTOCOL_MAGIC){
            throw new RuntimeException("消息 magic 非法");
        }

        // 设置响应
        header.setMagic(magic);
        header.setVersion(buffer.getByte(1));
        header.setSerializer(buffer.getByte(2));
        header.setType(buffer.getByte(3));
        header.setStatus(buffer.getByte(4));
        header.setRequestId(buffer.getLong(5));
        header.setBodyLength(buffer.getInt(13));

        // 解决粘包问题,只读指定长度的数据
        byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());

        // 解析消息体
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(serializerEnum == null){
            throw new RuntimeException("序列化消息的协议不存在");
        }

        // 实例化序列化对象
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        // 获取序列化类型
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByValue(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
                return new ProtocolMessage<>(header, request);
            case RESPONSE:
                RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
                return new ProtocolMessage<>(header, response);
            case HEART_BEAT:
            case OTHERS:
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }
}

5.3 测试编码解码功能

public class ProtocolMessageTest {

    @Test
    public void testEncodeAndDecode() throws IOException {
        // 构造消息
        ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
        header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
        header.setSerializer((byte) ProtocolMessageSerializerEnum.JDK.getKey());
        header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
        header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
        header.setRequestId(IdUtil.getSnowflakeNextId());
        header.setBodyLength(0);
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setServiceName("myService");
        rpcRequest.setMethodName("myMethod");
        rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
        rpcRequest.setParameterTypes(new Class[]{String.class});
        rpcRequest.setArgs(new Object[]{"aaa", "bbb"});
        protocolMessage.setHeader(header);
        protocolMessage.setBody(rpcRequest);

        Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
        ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer);
        Assert.assertNotNull(message);
    }

}

5.4 请求处理器(服务提供者)

/**
 * 请求处理器
 */
public class TcpServerHandler implements Handler<NetSocket> {

    @Override
    public void handle(NetSocket netSocket) {
        // 处理连接
        netSocket.handler(buffer -> {

            // 接受请求,解码
            ProtocolMessage<RpcRequest> protocolMessage;

            try {
                protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
            } catch (IOException e) {
                throw new RuntimeException("协议消息解码错误");
            }

            RpcRequest rpcRequest = protocolMessage.getBody();

            // 处理请求,解码
            // 构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();

            try {
                Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
                Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());

                // 封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType());
                rpcResponse.setMessage("Success");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }

            // 发送响应,编码
            ProtocolMessage.Header header = protocolMessage.getHeader();
            header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
            ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);

            try {
                Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
                netSocket.write(encode);
            } catch (IOException e) {
                throw new RuntimeException("协议消息编码错误");
            }

        });
    }
}

5.5 请求发送(服务消费者)

/**
 * JDK 动态代理
 */
public class JdkServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 本地处理 Object 类的方法,避免 RPC 分派
        if (method.getDeclaringClass() == Object.class) {
            switch (method.getName()) {
                case "toString":
                    // 为代理对象本身提供一个有意义的字符串表示
                    return proxy.getClass().getName() + "@" +
                            Integer.toHexString(System.identityHashCode(proxy)) +
                            ", with InvocationHandler " + this;
                case "hashCode":
                    // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
                    return System.identityHashCode(proxy);
                case "equals":
                    // 委托给处理器的 equals 或提供代理特定的相等性判断
                    return proxy == args[0]; // 代理对象的默认引用相等性
                default:
                    // 对于其他 Object 方法,可以考虑在处理器上调用它们
                    return method.invoke(this, args);
            }
        }
        // 1.指定序列化器
//        JdkSerializer serializer = new JdkSerializer();
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());


        String serviceName = method.getDeclaringClass().getName();

        // 2.发出请求(全部使用动态的参数)
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(serviceName)
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        try {
            // 3.序列化对象
            byte[] bodyBytes = serializer.serialize(rpcRequest);


            // 从注册中心获取服务提供者请求地址
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            // 实例化注册中心对象
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            // 服务发现
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

            if(CollUtil.isEmpty(serviceMetaInfoList)){
                throw new RuntimeException("暂无服务地址");
            }

            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // 4.发出TCP请求
            Vertx vertx = Vertx.vertx();
            NetClient netClient = vertx.createNetClient();
            CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
            netClient.connect(selectedServiceMetaInfo.getServicePort(),selectedServiceMetaInfo.getServiceAddress(),
                    result->{
                        if(result.succeeded()){
                            System.out.println("Connected to TCP server");

                            NetSocket socket = result.result();
                            // 发送数据
                            // 构造消息
                            ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
                            ProtocolMessage.Header header = new ProtocolMessage.Header();
                            header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
                            header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
                            header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(
                                    RpcApplication.getRpcConfig().getSerializer()).getKey());
                            header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
                            header.setRequestId(IdUtil.getSnowflakeNextId());
                            protocolMessage.setHeader(header);
                            protocolMessage.setBody(rpcRequest);

                            /**
                             * 编码写入请求
                             */
                            try {
                                Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                                socket.write(encodeBuffer);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }

                            /**
                             * 接收响应解码
                             */

                            socket.handler(buffer -> {
                                try {
                                    ProtocolMessage<RpcResponse> responseProtocolMessage =
                                            (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
                                    responseFuture.complete(responseProtocolMessage.getBody());
                                } catch (IOException e) {
                                    throw new RuntimeException("协议消息解码错误");
                                }
                            });


                        }else{
                            System.err.println("Failed to connect to TCP server");
                        }
                    });

            RpcResponse rpcResponse = responseFuture.get();
            // 关闭连接
            netClient.close();
            return rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

5.6 修改服务提供者服务

// 启动 web 服务
VertxTcpServer vertxTcpServer = new VertxTcpServer();
vertxTcpServer.doStart(RpcApplication.getRpcConfig().getServerPort());

6. 粘包和半包

6.1 什么是粘包和半包

6.1.1 什么是粘包

  • 每次收到的数据更多了,这种情况叫做 粘包

6.1.1 什么是半包

  • 每次收到的数据更少了,这种情况叫做 半包

6.2 怎么解决粘包和半包

6.2.1 怎么解决粘包

  • 每次只读取指‍定长度的数据,超过长度的留着下一次接收到消息时再读取

6.2.2 怎么解决半包

  • 在消息头中设置请求‍体的长度,服务端接收时,判断每次消息的长度是否符合预؜期,不完整就不读,留到下一‌次接收到消息时再读取

6.3 Vertx解决粘包和半包

  • 先完整读取请求头信息,由于请求头信息长度是固定的,可以使用 RecordParser 保证每次都完整读取。
  • 再根据请求头长度信息更改 RecordParser 的固定长度,保证完整获取到请求体

6.3.1 TcpServer 实现

/**
 * Vertx Tcp 服务器
 */
@Slf4j
public class VertxTcpServer implements HttpServer {

    @Override
    public void doStart(int port) {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.创建TCP服务器
        NetServer server = vertx.createNetServer();

        // 3.处理请求
        server.connectHandler(socket ->{
            // 构造parser
            RecordParser parser = RecordParser.newFixed(8);
            parser.setOutput(new Handler<Buffer>() {
                // 初始化
                int size = -1;
                // 一次完整的读取(头 + 体)
                Buffer resultBuffer = Buffer.buffer();

                @Override
                public void handle(Buffer buffer) {
                    if(-1 == size){
                        // 更新size为消息头长度
                        size = buffer.getInt(4);
                        parser.fixedSizeMode(size);

                        // 写入头信息到结果
                        resultBuffer.appendBuffer(buffer);
                    }else{
                        // 写入体信息到结果
                        resultBuffer.appendBuffer(buffer);
                        System.out.println(resultBuffer.toString());

                        // 重置一轮
                        parser.fixedSizeMode(8);
                        size = -1;
                        resultBuffer = Buffer.buffer();
                    }
                }
            });
            socket.handler(parser);
        });

        // 4.启动TCP服务器并监听端口
        server.listen(port,result->{
            if(result.succeeded()){
                System.out.println("Server is now listening on port " + port);
            }else{
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().doStart(8888);
    }
}

6.3.2 TcpClient 实现

/**
 * Vertx Tcp 客户端
 */
public class VertxTcpClient {

    public void start() {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.监听端口处理请求
        vertx.createNetClient().connect(8888, "localhost", result -> {
            if (result.succeeded()) {
                System.out.println("Connected to TCP server");
                io.vertx.core.net.NetSocket socket = result.result();
                for (int i = 0; i < 1000; i++) {
                    // 发送数据
                    Buffer buffer = Buffer.buffer();
                    String str = "Hello, server!Hello, server!Hello, server!Hello, server!";
                    buffer.appendInt(0);
                    buffer.appendInt(str.getBytes().length);
                    buffer.appendBytes(str.getBytes());
                    socket.write(buffer);
                }
                // 接收响应
                socket.handler(buffer -> {
                    System.out.println("Received response from server: " + buffer.toString());
                });
            } else {
                System.err.println("Failed to connect to TCP server");
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpClient().start();
    }
}

6.4 装饰者模式解决

6.4.1 装饰者模式实现

/**
 * 装饰者模式(使用 recordParser 对原有的 buffer 处理能力进行增强)
 */
public class TcpBufferHandlerWrapper implements Handler<Buffer> {

    private final RecordParser recordParser;

    public TcpBufferHandlerWrapper(Handler<Buffer> bufferHandler) {
        this.recordParser = initRecordParser(bufferHandler);
    }

    @Override
    public void handle(Buffer buffer) {
        recordParser.handle(buffer);
    }

    private RecordParser initRecordParser(Handler<Buffer> bufferHandler) {
        RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);
        parser.setOutput(new Handler<Buffer>() {
            // 初始化
            int size = -1;
            // 一次完整的读取(头 + 体)
            Buffer resultBuffer = Buffer.buffer();

            @Override
            public void handle(Buffer buffer) {
                if(-1 == size){
                    // 更新size为消息头长度
                    size = buffer.getInt(13);
                    parser.fixedSizeMode(size);

                    // 写入头信息到结果
                    resultBuffer.appendBuffer(buffer);
                }else{
                    // 写入体信息到结果
                    resultBuffer.appendBuffer(buffer);
                    // 已拼接为完整 Buffer,执行处理
                    bufferHandler.handle(resultBuffer);

                    // 重置一轮
                    parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
                    size = -1;
                    resultBuffer = Buffer.buffer();
                }
            }
        });
        return parser;
    }
}

6.4.2 优化客户端调用


/**
 * 请求处理器
 */
public class TcpServerHandler implements Handler<NetSocket> {

    /**
     * 处理请求
     * @param netSocket
     */
    @Override
    public void handle(NetSocket netSocket) {
        TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {

            // 接受请求,解码
            ProtocolMessage<RpcRequest> protocolMessage;

            try {
                protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
            } catch (IOException e) {
                throw new RuntimeException("协议消息解码错误");
            }

            RpcRequest rpcRequest = protocolMessage.getBody();

            // 处理请求,解码
            // 构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();

            try {
                Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
                Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());

                // 封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType());
                rpcResponse.setMessage("Success");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }

            // 发送响应,编码
            ProtocolMessage.Header header = protocolMessage.getHeader();
            header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
            ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);

            try {
                Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
                netSocket.write(encode);
            } catch (IOException e) {
                throw new RuntimeException("协议消息编码错误");
            }

        });
        // 处理连接
        netSocket.handler(bufferHandlerWrapper);
    }
}

6.4.3 修改TcpClient

/**
 * Vertx Tcp 客户端
 */
public class VertxTcpClient {



    public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws ExecutionException, InterruptedException {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();
        // 2.监听端口处理请求
        NetClient netClient = vertx.createNetClient();
        CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();

        netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
            result->{
                if(result.succeeded()){
                    System.out.println("Connected to TCP server");

                    NetSocket socket = result.result();
                    // 发送数据
                    // 构造消息
                    ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
                    ProtocolMessage.Header header = new ProtocolMessage.Header();
                    header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
                    header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
                    header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(
                            RpcApplication.getRpcConfig().getSerializer()).getKey());
                    header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
                    // 生成全局请求 ID
                    header.setRequestId(IdUtil.getSnowflakeNextId());
                    protocolMessage.setHeader(header);
                    protocolMessage.setBody(rpcRequest);

                    /**
                     * 编码写入请求
                     */
                    try {
                        Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                        socket.write(encodeBuffer);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                    /**
                     * 接收响应解码
                     * 重点:将创建的 TcpBufferHandlerWrapper 注册到 socket.handler()
                     */
                    socket.handler(new TcpBufferHandlerWrapper(buffer -> {
                        try {
                            ProtocolMessage<RpcResponse> rpcResponseProtocolMessage =
                                    (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
                            responseFuture.complete(rpcResponseProtocolMessage.getBody());
                        } catch (IOException e) {
                            throw new RuntimeException("协议消息解码错误");
                        }
                    }));
                }else{
                    System.err.println("Failed to connect to TCP server");
                }
            });

            RpcResponse rpcResponse = responseFuture.get();
            // 关闭连接
            netClient.close();
            return rpcResponse;
    }
}

6.4.4 修改TcpServer

/**
 * Vertx Tcp 服务器
 */
@Slf4j
public class VertxTcpServer implements HttpServer {

    @Override
    public void doStart(int port) {
        // 1.创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 2.创建TCP服务器
        NetServer server = vertx.createNetServer();

        // 3.处理请求
        server.connectHandler(new TcpServerHandler());

        // 4.启动TCP服务器并监听端口
        server.listen(port,result->{
            if(result.succeeded()){
                System.out.println("Server is now listening on port " + port);
            }else{
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().doStart(8888);
    }
}

6.4.4 继续修改代理对象

/**
 * JDK 动态代理
 */
public class JdkServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 本地处理 Object 类的方法,避免 RPC 分派
        if (method.getDeclaringClass() == Object.class) {
            switch (method.getName()) {
                case "toString":
                    // 为代理对象本身提供一个有意义的字符串表示
                    return proxy.getClass().getName() + "@" +
                            Integer.toHexString(System.identityHashCode(proxy)) +
                            ", with InvocationHandler " + this;
                case "hashCode":
                    // 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
                    return System.identityHashCode(proxy);
                case "equals":
                    // 委托给处理器的 equals 或提供代理特定的相等性判断
                    return proxy == args[0]; // 代理对象的默认引用相等性
                default:
                    // 对于其他 Object 方法,可以考虑在处理器上调用它们
                    return method.invoke(this, args);
            }
        }
        // 1.指定序列化器
//        JdkSerializer serializer = new JdkSerializer();
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());


        String serviceName = method.getDeclaringClass().getName();

        // 2.发出请求(全部使用动态的参数)
        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(serviceName)
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        try {
            // 3.序列化对象
            byte[] bodyBytes = serializer.serialize(rpcRequest);


            // 从注册中心获取服务提供者请求地址
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            // 实例化注册中心对象
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());

            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            // 服务发现
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());

            if(CollUtil.isEmpty(serviceMetaInfoList)){
                throw new RuntimeException("暂无服务地址");
            }

            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // 4.发出TCP请求
            RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
            return rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}