自定义协议
1. 网络传输设计
- 网络传输设计的目标是:选择一个能够高性能通信的网络协议和传输方式
2. 消息结构设计
- 消息结构设计的目标是:用 最少的 空间传递 需要的 信息
2.1 RPC 消息所需内容:
- 魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似 HTTPS 的安全证书)
- 版本号:保证请求和响应的一致性(类似 HTTP 协议有 1.0/2.0 等版本)
- 序列化方式:来告诉服务端和客户端如何解析数据(类似 HTTP 的 Content-Type 内容类型)
- 类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP 有请求头和响应头)
- 状态:如果是响应,记录响应的结果(类似 HTTP 的 200 状态代码)
- 请求ID:唯一标识某个请求,因为 TCP 是双向通信的,需要有个唯一标识来追踪每个请求
- 请求体:要发送的 body 内容数据
- 请求数据长度:保证能够完整地获取 body 内容信息
flowchart TD subgraph header["请求头"] A["魔数(8bit) | 版本(8bit) | 序列化方式(8bit) | 类型(8bit) | 状态(8bit)"] B["请求id (64 bit)"] C["请求体数据长度 (32 bit)"] end D["请求体(内容)"] header --> D style A fill:#e1f5fe style B fill:#f3e5f5 style C fill:#e8f5e8 style D fill:#fff3e0
2.2 写入和读取
请求头信息总长 17 个字节,也就是说,上述消息结构,本质上就是拼接在一起的一个字节数组。我们后续实现时,需要有 消息编码器 和 消息解码器,编码器先 new 一个空的 Buffer 缓冲区,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据,通过这种约定的方式,我们就不用记录头信息了
3.代码实现-消息结构
新建protocol包,将所有和自定义协议有关的代码放到该包下
3.1 新建协议消息类 ProtocolMessage
- 将消息头单独封装为一个内部类,消息体可以使用泛型类型
/**
* 协议消息结构
* @param <T>
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage <T>{
/**
* 消息头
*/
private Header header;
/**
* 消息体
*/
private T body;
/**
* 协议消息头
*/
@Data
public static class Header{
/**
* 魔数,保证安全性
*/
private byte magic;
/**
* 版本号
*/
private byte version;
/**
* 序列化器
*/
private byte serializer;
/**
* 消息类型(请求 / 响应)
*/
private byte type;
/**
* 状态
*/
private byte status;
/**
* 请求 id
*/
private long requestId;
/**
* 消息体长度
*/
private int bodyLength;
}
}
3.2 新建协议常量类 ProtocolConstant
- 记录了和自定义协议有关的关键信息,比如消息头长度、魔数、版本号。
/**
* 协议常量
*/
public interface ProtocolConstant {
/**
* 消息头长度
*/
int MESSAGE_HEADER_LENGTH = 17;
/**
* 协议魔数
*/
byte PROTOCOL_MAGIC = 0X1;
/**
* 协议版本号
*/
byte PROTOCOL_VERSION = 0X1;
}
3.3 新建消息字段的状态枚举类 ProtocolMessageStatusEnum
- 协议状态枚举,暂时只定义成功、请求失败、响应失败三种枚举值
/**
* 协议消息的状态枚举
*/
@Getter
public enum ProtocolMessageStatusEnum {
SUCCESS("success", 20),
BAD_REQUEST("badRequest", 40),
BAD_RESPONSE("badResponse", 50);
private final String text;
private final int value;
ProtocolMessageStatusEnum(String text, int value) {
this.text = text;
this.value = value;
}
/**
* 根据value获取枚举
* @param value
* @return
*/
public static ProtocolMessageStatusEnum getEnumByValue(int value){
for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {
if(anEnum.getValue() == value){
return anEnum;
}
}
return null;
}
}
3.4 新建消息字段的类型枚举类 ProtocolMessageTypeEnum
- 协议消息类型枚举,包括请求、响应、心跳、其他
/**
* 协议消息的类型枚举
*/
@Getter
public enum ProtocolMessageTypeEnum {
REQUEST(0),
RESPONSE(1),
HEART_BEAT(2),
OTHERS(3);
private final int key;
ProtocolMessageTypeEnum(int key) {
this.key = key;
}
/**
* 根据 key 获取枚举
* @param key
* @return
*/
public static ProtocolMessageTypeEnum getEnumByValue(int key){
for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {
if(anEnum.key == key){
return anEnum;
}
}
return null;
}
}
3.5 新建消息字段的序列化器枚举类 ProtocolMessageSerializerEnum
- 协议消息的序列化器枚举,和 RPC 框架已支持的序列化器对应
/**
* 协议消息的序列化器枚举
*/
@Getter
public enum ProtocolMessageSerializerEnum {
JDK(0,"jdk"),
JSON(1,"json"),
KRYO(2,"kryo"),
HESSIAN(3,"hessian"),
PROTOSTUFF(4, "protostuff");
private final int key;
private final String value;
ProtocolMessageSerializerEnum(int key,String value) {
this.key = key;
this.value = value;
}
/**
* 获取值列表
*/
public static List<String> getValues(){
return Arrays.stream(values()).map(item->item.value).collect(Collectors.toList());
}
/**
* 根据 key 获取枚举
* @param key
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByKey(int key){
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
if(anEnum.key == key){
return anEnum;
}
}
return null;
}
/**
* 根据value获取枚举
* @param value
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByValue(String value){
if (value == null) {
return null;
}
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
// 重点:使用 .equals() 来正确比较字符串内容
if(anEnum.getValue().equals(value)){
return anEnum;
}
}
return null;
}
}
4.代码实现-网络传输
4.1 TCP服务器实现
/**
* Vertx Tcp 服务器
*/
public class VertxTcpServer implements HttpServer {
private byte[] handleRequest(byte[] requestData){
return "Hello Client".getBytes();
}
@Override
public void doStart(int port) {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.创建TCP服务器
NetServer server = vertx.createNetServer();
// 3.处理请求
server.connectHandler(socket ->{
// 处理连接
socket.handler(buffer -> {
// 处理接收到的字节数组
byte[] requestData = buffer.getBytes();
// 进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应
byte[] responseData = handleRequest(requestData);
// 发送响应
socket.write(Buffer.buffer(responseData));
});
});
// 4.启动TCP服务器并监听端口
server.listen(port,result->{
if(result.succeeded()){
System.out.println("Server is now listening on port " + port);
}else{
System.err.println("Failed to start server: " + result.cause());
}
});
}
public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}
4.2 TCP客户端实现
/**
* Vertx Tcp 客户端
*/
public class VertxTcpClient {
public void start() {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.监听端口处理请求
vertx.createNetClient().connect(8888,"localhost",result->{
if(result.succeeded()){
System.out.println("connected to TCP server");
NetSocket socket = result.result();
// 发送数据
socket.write("Hello Server");
// 接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: " + buffer.toString());
});
}else{
System.err.println("Failed to connect to TCP server");
}
});
}
public static void main(String[] args) {
new VertxTcpClient().start();
}
}
5.代码实现-编码/解码器
5.1 消息编码器
/**
* 编码
*/
public class protocolMessageEncoder {
/**
* 编码
* @param protocolMessage
* @return
* @throws IOException
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
Buffer buffer = Buffer.buffer();
if(protocolMessage == null || protocolMessage.getHeader() == null){
return buffer;
}
// 获取请求头
ProtocolMessage.Header header = protocolMessage.getHeader();
// 依次向缓冲区写入字节
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());
// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if(serializerEnum == null){
throw new RuntimeException("序列化协议不存在");
}
// 获取序列化对象
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
// 序列化请求体
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
// 写入body 长度和数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}
5.2 消息解码器
/**
* 消息解码器
*/
public class protocolMessageDecoder {
/**
* 解码
* @param buffer
* @return
*/
public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
// 分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
// 校验魔数
byte magic = buffer.getByte(0);
if(magic != ProtocolConstant.PROTOCOL_MAGIC){
throw new RuntimeException("消息 magic 非法");
}
// 设置响应
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
// 解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if(serializerEnum == null){
throw new RuntimeException("序列化消息的协议不存在");
}
// 实例化序列化对象
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
// 获取序列化类型
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByValue(header.getType());
if (messageTypeEnum == null) {
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum) {
case REQUEST:
RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
return new ProtocolMessage<>(header, request);
case RESPONSE:
RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
return new ProtocolMessage<>(header, response);
case HEART_BEAT:
case OTHERS:
default:
throw new RuntimeException("暂不支持该消息类型");
}
}
}
5.3 测试编码解码功能
public class ProtocolMessageTest {
@Test
public void testEncodeAndDecode() throws IOException {
// 构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.JDK.getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
header.setRequestId(IdUtil.getSnowflakeNextId());
header.setBodyLength(0);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setServiceName("myService");
rpcRequest.setMethodName("myMethod");
rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
rpcRequest.setParameterTypes(new Class[]{String.class});
rpcRequest.setArgs(new Object[]{"aaa", "bbb"});
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer);
Assert.assertNotNull(message);
}
}
5.4 请求处理器(服务提供者)
/**
* 请求处理器
*/
public class TcpServerHandler implements Handler<NetSocket> {
@Override
public void handle(NetSocket netSocket) {
// 处理连接
netSocket.handler(buffer -> {
// 接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
// 处理请求,解码
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("Success");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
netSocket.write(encode);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
});
}
}
5.5 请求发送(服务消费者)
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 3.序列化对象
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 4.发出TCP请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(selectedServiceMetaInfo.getServicePort(),selectedServiceMetaInfo.getServiceAddress(),
result->{
if(result.succeeded()){
System.out.println("Connected to TCP server");
NetSocket socket = result.result();
// 发送数据
// 构造消息
ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(
RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
/**
* 编码写入请求
*/
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
/**
* 接收响应解码
*/
socket.handler(buffer -> {
try {
ProtocolMessage<RpcResponse> responseProtocolMessage =
(ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(responseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
});
}else{
System.err.println("Failed to connect to TCP server");
}
});
RpcResponse rpcResponse = responseFuture.get();
// 关闭连接
netClient.close();
return rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
5.6 修改服务提供者服务
// 启动 web 服务
VertxTcpServer vertxTcpServer = new VertxTcpServer();
vertxTcpServer.doStart(RpcApplication.getRpcConfig().getServerPort());
6. 粘包和半包
6.1 什么是粘包和半包
6.1.1 什么是粘包
- 每次收到的数据更多了,这种情况叫做 粘包
6.1.1 什么是半包
- 每次收到的数据更少了,这种情况叫做 半包
6.2 怎么解决粘包和半包
6.2.1 怎么解决粘包
- 每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取
6.2.2 怎么解决半包
- 在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取
6.3 Vertx解决粘包和半包
- 先完整读取请求头信息,由于请求头信息长度是固定的,可以使用 RecordParser 保证每次都完整读取。
- 再根据请求头长度信息更改 RecordParser 的固定长度,保证完整获取到请求体
6.3.1 TcpServer 实现
/**
* Vertx Tcp 服务器
*/
@Slf4j
public class VertxTcpServer implements HttpServer {
@Override
public void doStart(int port) {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.创建TCP服务器
NetServer server = vertx.createNetServer();
// 3.处理请求
server.connectHandler(socket ->{
// 构造parser
RecordParser parser = RecordParser.newFixed(8);
parser.setOutput(new Handler<Buffer>() {
// 初始化
int size = -1;
// 一次完整的读取(头 + 体)
Buffer resultBuffer = Buffer.buffer();
@Override
public void handle(Buffer buffer) {
if(-1 == size){
// 更新size为消息头长度
size = buffer.getInt(4);
parser.fixedSizeMode(size);
// 写入头信息到结果
resultBuffer.appendBuffer(buffer);
}else{
// 写入体信息到结果
resultBuffer.appendBuffer(buffer);
System.out.println(resultBuffer.toString());
// 重置一轮
parser.fixedSizeMode(8);
size = -1;
resultBuffer = Buffer.buffer();
}
}
});
socket.handler(parser);
});
// 4.启动TCP服务器并监听端口
server.listen(port,result->{
if(result.succeeded()){
System.out.println("Server is now listening on port " + port);
}else{
System.err.println("Failed to start server: " + result.cause());
}
});
}
public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}
6.3.2 TcpClient 实现
/**
* Vertx Tcp 客户端
*/
public class VertxTcpClient {
public void start() {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.监听端口处理请求
vertx.createNetClient().connect(8888, "localhost", result -> {
if (result.succeeded()) {
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
for (int i = 0; i < 1000; i++) {
// 发送数据
Buffer buffer = Buffer.buffer();
String str = "Hello, server!Hello, server!Hello, server!Hello, server!";
buffer.appendInt(0);
buffer.appendInt(str.getBytes().length);
buffer.appendBytes(str.getBytes());
socket.write(buffer);
}
// 接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: " + buffer.toString());
});
} else {
System.err.println("Failed to connect to TCP server");
}
});
}
public static void main(String[] args) {
new VertxTcpClient().start();
}
}
6.4 装饰者模式解决
6.4.1 装饰者模式实现
/**
* 装饰者模式(使用 recordParser 对原有的 buffer 处理能力进行增强)
*/
public class TcpBufferHandlerWrapper implements Handler<Buffer> {
private final RecordParser recordParser;
public TcpBufferHandlerWrapper(Handler<Buffer> bufferHandler) {
this.recordParser = initRecordParser(bufferHandler);
}
@Override
public void handle(Buffer buffer) {
recordParser.handle(buffer);
}
private RecordParser initRecordParser(Handler<Buffer> bufferHandler) {
RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);
parser.setOutput(new Handler<Buffer>() {
// 初始化
int size = -1;
// 一次完整的读取(头 + 体)
Buffer resultBuffer = Buffer.buffer();
@Override
public void handle(Buffer buffer) {
if(-1 == size){
// 更新size为消息头长度
size = buffer.getInt(13);
parser.fixedSizeMode(size);
// 写入头信息到结果
resultBuffer.appendBuffer(buffer);
}else{
// 写入体信息到结果
resultBuffer.appendBuffer(buffer);
// 已拼接为完整 Buffer,执行处理
bufferHandler.handle(resultBuffer);
// 重置一轮
parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
size = -1;
resultBuffer = Buffer.buffer();
}
}
});
return parser;
}
}
6.4.2 优化客户端调用
/**
* 请求处理器
*/
public class TcpServerHandler implements Handler<NetSocket> {
/**
* 处理请求
* @param netSocket
*/
@Override
public void handle(NetSocket netSocket) {
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
// 接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
// 处理请求,解码
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("Success");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
netSocket.write(encode);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
});
// 处理连接
netSocket.handler(bufferHandlerWrapper);
}
}
6.4.3 修改TcpClient
/**
* Vertx Tcp 客户端
*/
public class VertxTcpClient {
public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws ExecutionException, InterruptedException {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.监听端口处理请求
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
result->{
if(result.succeeded()){
System.out.println("Connected to TCP server");
NetSocket socket = result.result();
// 发送数据
// 构造消息
ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(
RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
// 生成全局请求 ID
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
/**
* 编码写入请求
*/
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
/**
* 接收响应解码
* 重点:将创建的 TcpBufferHandlerWrapper 注册到 socket.handler()
*/
socket.handler(new TcpBufferHandlerWrapper(buffer -> {
try {
ProtocolMessage<RpcResponse> rpcResponseProtocolMessage =
(ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(rpcResponseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
}));
}else{
System.err.println("Failed to connect to TCP server");
}
});
RpcResponse rpcResponse = responseFuture.get();
// 关闭连接
netClient.close();
return rpcResponse;
}
}
6.4.4 修改TcpServer
/**
* Vertx Tcp 服务器
*/
@Slf4j
public class VertxTcpServer implements HttpServer {
@Override
public void doStart(int port) {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.创建TCP服务器
NetServer server = vertx.createNetServer();
// 3.处理请求
server.connectHandler(new TcpServerHandler());
// 4.启动TCP服务器并监听端口
server.listen(port,result->{
if(result.succeeded()){
System.out.println("Server is now listening on port " + port);
}else{
System.err.println("Failed to start server: " + result.cause());
}
});
}
public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}
6.4.4 继续修改代理对象
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 本地处理 Object 类的方法,避免 RPC 分派
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "toString":
// 为代理对象本身提供一个有意义的字符串表示
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
case "hashCode":
// 委托给处理器的 hashCode 或提供一个代理特定的 hashCode
return System.identityHashCode(proxy);
case "equals":
// 委托给处理器的 equals 或提供代理特定的相等性判断
return proxy == args[0]; // 代理对象的默认引用相等性
default:
// 对于其他 Object 方法,可以考虑在处理器上调用它们
return method.invoke(this, args);
}
}
// 1.指定序列化器
// JdkSerializer serializer = new JdkSerializer();
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
String serviceName = method.getDeclaringClass().getName();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 3.序列化对象
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 实例化注册中心对象
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
// 服务发现
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if(CollUtil.isEmpty(serviceMetaInfoList)){
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 4.发出TCP请求
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}