
在上一篇文章中,講解了反射 與動態(tài)代理基本概念,沒有看過點擊此傳送門。
接下來看看,動態(tài)代理在RPC中是如何使用的。在講解過程中會去除掉一些無關的代碼,如果讀者相應看全部的代碼,可以去開源項目的倉庫中下載相關源碼,進一步的專研,以下也會給出鏈接。
實例
1.第一個實例取自黃勇的輕量級分布式 RPC 框架 ,由于實現(xiàn)中通信框架使用了Netty,所以在分析中會有部分Netty代碼的信息,不過不用擔心,即使不懂Netty,講解的過程中會盡量避免,并會突出反射與動態(tài)代理在其中的作用。
在rpc-simple-client中HelloClient.Class有如下代碼
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);
這個代碼做的是什么事呢?通過一個代理生成helloService對象,執(zhí)行hello方法。
在我們印象中執(zhí)行方法,最終都會執(zhí)行的是接口中實現(xiàn)的方法。那事實是這樣嗎?看下面的分析。
在rpcProxy代碼如下:
public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
// 創(chuàng)建動態(tài)代理對象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 創(chuàng)建 RPC 請求對象并設置請求屬性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(method.getDeclaringClass().getName());
request.setServiceVersion(serviceVersion);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// 獲取 RPC 服務地址
if (serviceDiscovery != null) {
String serviceName = interfaceClass.getName();
if (StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
serviceAddress = serviceDiscovery.discover(serviceName);
LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
}
if (StringUtil.isEmpty(serviceAddress)) {
throw new RuntimeException("server address is empty");
}
// 從 RPC 服務地址中解析主機名與端口號
String[] array = StringUtil.split(serviceAddress, ":");
String host = array[0];
int port = Integer.parseInt(array[1]);
// 創(chuàng)建 RPC 客戶端對象并發(fā)送 RPC 請求
RpcClient client = new RpcClient(host, port);
long time = System.currentTimeMillis();
RpcResponse response = client.send(request);
LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
if (response == null) {
throw new RuntimeException("response is null");
}
// 返回 RPC 響應結果
if (response.hasException()) {
throw response.getException();
} else {
return response.getResult();
}
}
}
);
}
從上面的代碼可以看出經(jīng)過了代理,執(zhí)行hello方法,其實是發(fā)起一個請求。既然是一個請求,就是要涉及Client端與Server端,上面其實是一個Clent端代碼。
那我們看看Server做了什么,去掉一個和本文所介紹不相關的代碼,在RpcServerHandler中可以看核心代碼如下
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
Object result = handle(request);
response.setResult(result);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); //返回,異步關閉連接
}
其中hanlde中重要實現(xiàn)如下
// 獲取反射調(diào)用所需的參數(shù),這些都是Client端傳輸給我們的。
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 使用 CGLib 執(zhí)行反射調(diào)用
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
2.第二個實例取自xxl-job分布式任務調(diào)度平臺
說明:此開源項目的,RPC通信是用Jetty來實現(xiàn)的。
在xxl-job-admin中XxlJobTrigger.Class的runExecutor有如下
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address); //根據(jù)地址拿到執(zhí)行器
runResult = executorBiz.run(triggerParam);
做了很簡單的是取出執(zhí)行器,觸發(fā)執(zhí)行。但是進入getExecutorBiz方法你會發(fā)現(xiàn)如下
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address,
accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
是不是很熟悉,沒錯,動態(tài)代理,看是NetComClientProxy的實現(xiàn):
在結構上是不是和第一個實例中的rpcProxy代碼,很相似呢。
new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();做了什么呢
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request封裝
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// send發(fā)送
RpcResponse response = client.send(request);
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
依舊是封裝了一個RpcRequest ,發(fā)送請求。所以在 runResult = executorBiz.run(triggerParam)
其實是在發(fā)送一個請求。上面是Client端代碼,照舊,接著看Server代碼,你會發(fā)現(xiàn)還是似成相識。去掉與本文無關的代碼,得到如下:
在xxl-job-core中JettyServerHandler.Class有
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
點擊進入:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class<?> serviceClass = serviceBean.getClass(); //類名
String methodName = request.getMethodName(); //方法名run
Class<?>[] parameterTypes = request.getParameterTypes(); //參數(shù)類型
Object[] parameters = request.getParameters(); //具體參數(shù)
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 使用 CGLib 執(zhí)行反射調(diào)用
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
根據(jù)反射生成具體的類,來執(zhí)行相關的方法,達到想要的目的。
上面兩個實例的過程可以用下圖概括:

由于本人水平有限,有什么問題可以評論,喜歡的可以關注。