简易消费者
- 1.注册服务
- 2.启动HTTP服务器,等待请求
- 3.通过rpc框架获取代理对象并调用代理对象的方法
- 4.指定序列化器,构造RPC Request
- 5.将请求序列化为字节数组,发送HTTP请求
- 6.等待接收HTTP响应
- 7.通过web服务器监听并接收consumer的请求
- 8.经过请求处理器将请求反序列化RPC Request
- 9.使用本地服务注册器查询 provider 中的对应服务并进行反射调用
- 10.将 provider 返回的结果构造为RPC Response,并将RPC Response序列化为字节数组
- 11.rpc发送HTTP响应
- 12.consumer接收响应
- 13.consumer将响应反序列化为RPC Request
- 14.consumer获取响应中的结果
1.1 模块划分与介绍
首先需要初始化四个模块,分别是:
- example-common:公共模块,提供公共的对象和方法
- example-provider:提供者模块,提供服务给消费者调用
- example-consumer:消费者模块,向提供者请求服务调用
- todaysaturday-rpc-easy:提供 rpc 框架的核心方法
公共模块提供公共的对象及属性,提供者提供服务,消费者请求服务,rpc 作为中介平衡提供者和消费者之间的请求与调用
2.1 模块创建流程
新建项目文件夹(todaysaturday-rpc)
–> 打开IDEA
–> File -> New
-> Module
-> Maven Archetype
-> Name:输入模块、Location:选择项目位置、JDK:选择11、Archetype:选择QuickStart
-> 点击Create
3.1 基本模块
3.1.1 公共模块
用户实体类(User)
/**
* 用户
*/
public class User implements Serializable {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
用户功能接口(UserService)
/**
* 用户服务
*/
public interface UserService {
/**
* 获取用户
*
* @param user
* @return
*/
User getUser(User user);
}
3.1.2 提供者模块
引入依赖
<dependencies>
<dependency>
<!-- RPC 框架 -->
<groupId>com.todaysaturday</groupId>
<artifactId>todaysaturday-rpc-easy</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<!-- 公共模块 -->
<groupId>com.todaysaturday</groupId>
<artifactId>example-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://doc.hutool.cn/ -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<!-- https://projectlombok.org/ -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
服务实现类(UserServiceImpl)
/**
* 用户服务实现类
*/
public class UserServiceImpl implements UserService {
@Override
public User getUser(User user) {
System.out.println("用户名:"+user.getName());
return null;
}
}
服务提供者启动类(EasyProviderExample)
/**
* 简易服务提供者示例
*/
public class EasyProviderExample {
public static void main( String[] args ){
// 注册服务
LocalRegistry.register(UserService.class.getName(), UserServiceImpl.class);
// 启动 web 服务
VertxHttpServer httpServer = new VertxHttpServer();
httpServer.doStart(8080);
}
}
3.1.3 消费者模块
引入依赖
<dependencies>
<dependency>
<!-- RPC 框架 -->
<groupId>com.todaysaturday</groupId>
<artifactId>todaysaturday-rpc-easy</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<!-- 公共模块 -->
<groupId>com.todaysaturday</groupId>
<artifactId>example-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://doc.hutool.cn/ -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<!-- https://projectlombok.org/ -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
服务消费者启动类(EasyConsumerExample)
/**
* 服务消费者启动类
*/
public class EasyConsumerExample {
public static void main( String[] args ){
// 创建静态代理
// UserService userService = new UserServiceProxy();
// 创建JDK动态代理
// UserService userService = JdkServiceProxyFactory.getProxy(UserService.class);
// 创建CGlib动态代理
UserService userService = CGlibServiceProxyFactory.getProxy(UserService.class);
User user = new User();
user.setName("todaysaturday");
User newUser = userService.getUser(user);
if(newUser == null){
System.out.println("user == null");
}else{
System.out.println("user == "+ newUser.getName());
}
}
}
4.1 RPC核心模块
创建Web服务器(Vert.x)
如果需要让服务提供者提供可远程访问的服务,那么就需要创建web服务器,能够接受处理请求、并返回响应
定义HTTP服务接口(HttpServer)
/**
* HTTP服务器接口
*/
public interface HttpServer {
/**
* 启动服务器
* @param port
*/
void doStart(int port);
}
创建Vert.x服务实现类(VertxHttpServer)
/**
* vertx 服务器
*/
public class VertxHttpServer implements HttpServer{
/**
* 启动服务器
* @param port
*/
@Override
public void doStart(int port) {
// 1.创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 2.创建HTTP服务器
io.vertx.core.http.HttpServer server = vertx.createHttpServer();
// 3.监听端口并处理请求
server.requestHandler(new HttpServerHandler());
// 4.启动HTTP服务器并监听端口
server.listen(port,result->{
if(result.succeeded()){
System.out.println("Server is now listening on port " + port);
}else{
System.err.println("Failed to start server: " + result.cause());
}
});
}
}
创建本地服务注册器(LocalRegistry)
本地服务注册器 | 服务注册中心 |
---|---|
根据服务名获取到对应的实现类 ConcurrentHashMap<String,Class<?>> |
管理注册的服务,提供服务信息给消费者 |
存储注册信息(Map<String,Class<?>>) | |
注册服务(map.put()) | |
获取服务(map.get()) | |
删除服务(map.remove()) |
/**
* 本地服务注册器
*/
public class LocalRegistry {
/**
* 使用线程安全的 concurrentHashMap 存储服务注册信息,
* key 为服务名称、value 为服务的实现类
*/
/**
* 存储注册信息
*/
private static final Map<String,Class<?>> map =new ConcurrentHashMap<>();
/**
* 注册服务
* @param serviceName 服务名称
* @param implClass 服务
*/
public static void register(String serviceName,Class<?> implClass){
map.put(serviceName,implClass);
}
/**
* 获取服务
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName){
return map.get(serviceName);
}
/**
* 删除服务
* @param serviceName
*/
public static void remove(String serviceName){
map.remove(serviceName);
}
}
创建序列化器
创建序列化器接口,定义序列化 serializer
方法和反序列化 deserialize
方法
序列化器接口(Serializer)
/**
* 序列化器接口
*/
public interface Serializer {
/**
* 序列化
*
* @param object
* @param <T>
* @return
* @throws IOException
*/
<T> byte[] serialize(T object) throws IOException;
/**
* 反序列化
*
* @param bytes
* @param type
* @param <T>
* @return
* @throws IOException
*/
<T> T deserialize(byte[] bytes, Class<T> type) throws IOException;
}
JDK 序列化器实现类(JdkSerializer)
/**
* JDK 序列化器
*/
public class JdkSerializer implements Serializer{
/**
* 将对象序列化为字节数组
* @param object
* @return
* @param <T>
* @throws IOException
*/
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(object);
objectOutputStream.close();
return outputStream.toByteArray();
}
/**
* 将字节数组反序列化为对象
* @param bytes
* @param type
* @return
* @param <T>
* @throws IOException
*/
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
try {
return (T) objectInputStream.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
objectInputStream.close();
}
}
}
创建请求处理器
RPCRequest请求类
/**
* RPC 请求
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {
/**
* 服务名称
*/
private String serviceName;
/**
* 方法名称
*/
private String methodName;
/**
* 参数类型列表
*/
private Class<?>[] parameterTypes;
/**
* 参数列表
*/
private Object[] args;
}
RPCResponse响应类
/**
* RPC 响应
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {
/**
* 响应数据
*/
private Object data;
/**
* 响应数据类型(预留)
*/
private Class<?> dataType;
/**
* 响应信息
*/
private String message;
/**
* 异常信息
*/
private Exception exception;
}
请求处理器类(HttpServerHandler)
/**
* 请求处理器
*/
public class HttpServerHandler implements Handler<HttpServerRequest> {
/**
* 反序列化请求为对象,并从请求对象中获取参数。
* 根据服务名称从本地注册器中获取到对应的服务实现类。
* 通过反射机制调用方法,得到返回结果。
* 对返回结果进行封装和序列化,并写入到响应中。
*/
@Override
public void handle(HttpServerRequest request) {
// 1.指定序列化器
final JdkSerializer serializer = new JdkSerializer();
// 记录日志
System.out.println("Received request: " + request.method() + " " + request.uri());
// 2.异步处理 HTTP 请求
request.bodyHandler(body->{
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
try {
// 3.反序列化对象
rpcRequest = serializer.deserialize(bytes,RpcRequest.class);
} catch (IOException e) {
e.printStackTrace();
}
// 4.构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
// 如果请求为null,直接返回
if(rpcRequest == null){
rpcResponse.setMessage("rpcRequest is null");
doResponse(request,rpcResponse,serializer);
return;
}
// 5.获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
try {
// 6.通过方法名找到目标方法
Method method = implClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
// 7.创建实例并反射调用方法
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 8.封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("success");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}
/**
* 响应
* @param request
* @param rpcResponse
* @param serializer
*/
private void doResponse(HttpServerRequest request, RpcResponse rpcResponse, JdkSerializer serializer) {
HttpServerResponse httpServerResponse = request.response()
.putHeader("content-type","application/json");
try {
// 序列化
byte[] serializered = serializer.serializer(rpcResponse);
httpServerResponse.end(Buffer.buffer(serializered));
} catch (IOException e) {
e.printStackTrace();
}
}
}
创建代理
静态代理(UserServiceProxy)
/**
* 静态代理,只代理某个接口/功能
*/
public class UserServiceProxy implements UserService {
@Override
public User getUser(User user) {
// 1.指定序列化器
JdkSerializer serializer = new JdkSerializer();
// 2.发出请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(UserService.class.getName())
.methodName("getUser")
.parameterTypes(new Class[]{User.class})
.args(new Object[]{user})
.build();
try {
// 3.序列化对象
byte[] bodyBytes = serializer.serializer(rpcRequest);
byte[] result;
// 4.发出HTTP请求
try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()){
result = httpResponse.bodyBytes();
}
// 5.反序列化对象
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (User) rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
动态代理
JDK动态代理
/**
* JDK 动态代理
*/
public class JdkServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1.指定序列化器
JdkSerializer serializer = new JdkSerializer();
// 2.发出请求(全部使用动态的参数)
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 3.序列化对象
byte[] bodyBytes = serializer.serializer(rpcRequest);
byte[] result;
// 4.发出HTTP请求
try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()){
result = httpResponse.bodyBytes();
}
// 5.反序列化对象
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (User) rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
JDK动态代理工厂
/**
* 服务代理工厂(用于创建代理对象)
*/
public class JdkServiceProxyFactory {
/**
* 根据服务类获取代理对象
* @param serviceClass
* @return
* @param <T>
*/
public static<T> T getProxy(Class<T> serviceClass){
T t = (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass},
new JdkServiceProxy());
return t;
}
}
5.1 项目扩展
CGlib代理
引入依赖
<!-- CGlib 动态代理依赖 -->
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.3.0</version>
</dependency>
CGlib代理实现
/**
* CGlib 动态代理
*/
public class CGlibServiceProxy implements MethodInterceptor {
/**
*
* @param o
* @param method
* @param objects
* @param methodProxy
* @return
*/
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy){
// 指定序列化器
JdkSerializer serializer = new JdkSerializer();
// 发请求
// builder构造链
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName()) // 调用的远程服务接口名
.methodName(method.getName()) // 方法名
.parameterTypes(method.getParameterTypes()) // 获取方法参数类型
.args(objects) // 实际参数
.build();
try {
// 将请求序列化为字节数组
byte[] bodyBytes = serializer.serialize(rpcRequest);
byte[] result;
// 将序列化的请求作为请求体发送
try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()){
// 接收响应
result = httpResponse.bodyBytes();
}
// 将返回的结果进行反序列化为响应对象
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
// 返回响应数据
return rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
CGlib代理工厂实现
/**
* CGlib代理工厂
*/
public class CGlibServiceProxyFactory {
/**
* 根据服务类获取代理对象
* @param serviceClass
* @return
* @param <T>
*/
public static <T> T getProxy(Class<T> serviceClass) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(serviceClass); // 设置(被代理类)
enhancer.setCallback(new CGlibServiceProxy()); // 设置回调
return (T) enhancer.create(); // 创建代理对象
}
}