一、前言

服務(wù)導(dǎo)出的過程中,我們已經(jīng)獲取了一個(gè)代理對(duì)象。服務(wù)調(diào)用就是通過調(diào)用這個(gè)代理對(duì)象的方法。

Dubbo官方文檔給出了服務(wù)調(diào)用的具體過程。簡述一下就是客戶端通過代理對(duì)象發(fā)起調(diào)用,提前構(gòu)造好協(xié)議頭,然后將對(duì)象序列化成協(xié)議體,通過client(Netty)進(jìn)行網(wǎng)絡(luò)傳輸。
服務(wù)提供者的NettyServer接收到這個(gè)請(qǐng)求后會(huì)分發(fā)給業(yè)務(wù)線程池。由業(yè)務(wù)線程池調(diào)用具體的實(shí)現(xiàn)方法。
二、源碼分析
客戶端調(diào)用代碼
客戶端調(diào)用的代碼如下
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
context.start();
DemoService demoService = context.getBean("demoService", DemoService.class);
CompletableFuture<String> hello = demoService.sayHelloAsync("world");
System.out.println("result: " + hello.get());
}
在服務(wù)導(dǎo)出結(jié)束完成后,我們獲取DemoService實(shí)際是一個(gè)代理對(duì)象。通過該代理對(duì)象完成方法調(diào)用。最終會(huì)生成一個(gè)RPCInvocation對(duì)象調(diào)用MockClusterInvoker#invoke方法。

MockClusterInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 獲取mock配置
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
這個(gè)方法主要是根據(jù)mock配置決定是否調(diào)用mock方法
- mock無配置調(diào)用真實(shí)方法
- mock為force則強(qiáng)制走mock方法
- mock為true,真實(shí)方法調(diào)用失敗后執(zhí)行mock方法
AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 調(diào)用directory.list 主要是做路由過濾
List<Invoker<T>> invokers = list(invocation);
// 過濾完成通過SPI機(jī)制獲取loadBalance實(shí)現(xiàn)類
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 調(diào)用子類方法
return doInvoke(invocation, invokers, loadbalance);
}
這段模板代碼主要邏輯是:
- 綁定attachement到invocation
- 通過RegistryDirectory過濾Invoker
- 通過SPI機(jī)制獲取負(fù)載均衡實(shí)現(xiàn)
- 執(zhí)行子類的doInvoke方法
最終這里是會(huì)調(diào)用到FailoverClusterInvoker執(zhí)行doInvoker方法
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 省略代碼
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
// 負(fù)載均衡中選擇一個(gè)Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 執(zhí)行方法
Result result = invoker.invoke(invocation);
// 省略代碼
return result;
} catch (RpcException e) {
// 省略代碼
} catch (Throwable e) {
// 省略代碼
}
}
throw new RpcException();
}
這個(gè)方法主要是完成了重試機(jī)制的邏輯
- 獲取重試次數(shù)并循環(huán)執(zhí)行
- 根據(jù)負(fù)載均衡策略選擇一個(gè)Invoker
- 執(zhí)行子類的doInvoke方法
最終調(diào)用到DubboInvoker的doInvoke方法
DubboInvoker#doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 獲取client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 判斷是否是oneWay方式調(diào)用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 發(fā)送
currentClient.send(inv, isSent);
// 返回null
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
這段代碼的主要邏輯是
- 獲取client對(duì)象
- 根據(jù)有無返回值判斷調(diào)用方式是否是oneway
- oneway通過client發(fā)起請(qǐng)求,返回一個(gè)異步執(zhí)行結(jié)果的返回值
- 非oneway則獲取回調(diào)線程池,發(fā)送請(qǐng)求,返回一個(gè)Future對(duì)象。
服務(wù)端調(diào)用代碼
客戶端默認(rèn)是通過Netty進(jìn)行發(fā)起請(qǐng)求調(diào)用,對(duì)于服務(wù)端主要是通過NettyServerHandler#channelRead方法進(jìn)行接收消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
服務(wù)端接收消息默認(rèn)是所有消息派發(fā)至業(yè)務(wù)線程池,也就是AllChannelHandler#received
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
這里將消息封裝成ChannelEventRunnable扔到業(yè)務(wù)線程池執(zhí)行。接下來會(huì)將消息解碼后調(diào)用到HeaderExchangeHandler#handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// 執(zhí)行方法
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
這里調(diào)用具體的handler執(zhí)行reply方法,最終調(diào)用到DubboProtocol中ExchangeHandler的實(shí)現(xiàn)
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
這里就是獲取具體的Invoker對(duì)象執(zhí)行方法調(diào)用
三、總結(jié)
客戶端通過接口調(diào)用某個(gè)方法,實(shí)際調(diào)用到代理類。代理類從cluster中獲取Invoker對(duì)象,并進(jìn)行router進(jìn)行過濾。緊接著通過loadBalance進(jìn)行負(fù)載均衡。獲取到Invoker對(duì)象后會(huì)根據(jù)協(xié)議構(gòu)造請(qǐng)求頭,然后將參數(shù)序列化后構(gòu)造回請(qǐng)求體,最后通過Client進(jìn)行遠(yuǎn)程調(diào)用。
服務(wù)端通過NettryServer監(jiān)聽請(qǐng)求,根據(jù)協(xié)議反序列化成對(duì)象,再按照派發(fā)策略派發(fā)消息。默認(rèn)是All,也就是所有請(qǐng)求扔給業(yè)務(wù)線程池。業(yè)務(wù)線程會(huì)獲取Invoker對(duì)象,并調(diào)用真實(shí)類,最終將結(jié)果返回。
[toc]
一、前言
[圖片上傳失敗...(image-2b0c4e-1632559794086)]
服務(wù)導(dǎo)出的過程中,我們已經(jīng)獲取了一個(gè)代理對(duì)象。服務(wù)調(diào)用就是通過調(diào)用這個(gè)代理對(duì)象的方法。
[圖片上傳失敗...(image-5b88ab-1632559794086)]
Dubbo官方文檔給出了服務(wù)調(diào)用的具體過程。簡述一下就是客戶端通過代理對(duì)象發(fā)起調(diào)用,提前構(gòu)造好協(xié)議頭,然后將對(duì)象序列化成協(xié)議體,通過client(Netty)進(jìn)行網(wǎng)絡(luò)傳輸。
服務(wù)提供者的NettyServer接收到這個(gè)請(qǐng)求后會(huì)分發(fā)給業(yè)務(wù)線程池。由業(yè)務(wù)線程池調(diào)用具體的實(shí)現(xiàn)方法。
二、源碼分析
客戶端調(diào)用代碼
客戶端調(diào)用的代碼如下
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
context.start();
DemoService demoService = context.getBean("demoService", DemoService.class);
CompletableFuture<String> hello = demoService.sayHelloAsync("world");
System.out.println("result: " + hello.get());
}
在服務(wù)導(dǎo)出結(jié)束完成后,我們獲取DemoService實(shí)際是一個(gè)代理對(duì)象。通過該代理對(duì)象完成方法調(diào)用。最終會(huì)生成一個(gè)RPCInvocation對(duì)象調(diào)用MockClusterInvoker#invoke方法。

MockClusterInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 獲取mock配置
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
這個(gè)方法主要是根據(jù)mock配置決定是否調(diào)用mock方法
- mock無配置調(diào)用真實(shí)方法
- mock為force則強(qiáng)制走mock方法
- mock為true,真實(shí)方法調(diào)用失敗后執(zhí)行mock方法
AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 調(diào)用directory.list 主要是做路由過濾
List<Invoker<T>> invokers = list(invocation);
// 過濾完成通過SPI機(jī)制獲取loadBalance實(shí)現(xiàn)類
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 調(diào)用子類方法
return doInvoke(invocation, invokers, loadbalance);
}
這段模板代碼主要邏輯是:
- 綁定attachement到invocation
- 通過RegistryDirectory過濾Invoker
- 通過SPI機(jī)制獲取負(fù)載均衡實(shí)現(xiàn)
- 執(zhí)行子類的doInvoke方法
最終這里是會(huì)調(diào)用到FailoverClusterInvoker執(zhí)行doInvoker方法
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 省略代碼
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
// 負(fù)載均衡中選擇一個(gè)Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 執(zhí)行方法
Result result = invoker.invoke(invocation);
// 省略代碼
return result;
} catch (RpcException e) {
// 省略代碼
} catch (Throwable e) {
// 省略代碼
}
}
throw new RpcException();
}
這個(gè)方法主要是完成了重試機(jī)制的邏輯
- 獲取重試次數(shù)并循環(huán)執(zhí)行
- 根據(jù)負(fù)載均衡策略選擇一個(gè)Invoker
- 執(zhí)行子類的doInvoke方法
最終調(diào)用到DubboInvoker的doInvoke方法
DubboInvoker#doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 獲取client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 判斷是否是oneWay方式調(diào)用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 發(fā)送
currentClient.send(inv, isSent);
// 返回null
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
這段代碼的主要邏輯是
- 獲取client對(duì)象
- 根據(jù)有無返回值判斷調(diào)用方式是否是oneway
- oneway通過client發(fā)起請(qǐng)求,返回一個(gè)異步執(zhí)行結(jié)果的返回值
- 非oneway則獲取回調(diào)線程池,發(fā)送請(qǐng)求,返回一個(gè)Future對(duì)象。
服務(wù)端調(diào)用代碼
客戶端默認(rèn)是通過Netty進(jìn)行發(fā)起請(qǐng)求調(diào)用,對(duì)于服務(wù)端主要是通過NettyServerHandler#channelRead方法進(jìn)行接收消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
服務(wù)端接收消息默認(rèn)是所有消息派發(fā)至業(yè)務(wù)線程池,也就是AllChannelHandler#received
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
這里將消息封裝成ChannelEventRunnable扔到業(yè)務(wù)線程池執(zhí)行。接下來會(huì)將消息解碼后調(diào)用到HeaderExchangeHandler#handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// 執(zhí)行方法
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
這里調(diào)用具體的handler執(zhí)行reply方法,最終調(diào)用到DubboProtocol中ExchangeHandler的實(shí)現(xiàn)
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
這里就是獲取具體的Invoker對(duì)象執(zhí)行方法調(diào)用
三、總結(jié)
客戶端通過接口調(diào)用某個(gè)方法,實(shí)際調(diào)用到代理類。代理類從cluster中獲取Invoker對(duì)象,并進(jìn)行router進(jìn)行過濾。緊接著通過loadBalance進(jìn)行負(fù)載均衡。獲取到Invoker對(duì)象后會(huì)根據(jù)協(xié)議構(gòu)造請(qǐng)求頭,然后將參數(shù)序列化后構(gòu)造回請(qǐng)求體,最后通過Client進(jìn)行遠(yuǎn)程調(diào)用。
服務(wù)端通過NettryServer監(jiān)聽請(qǐng)求,根據(jù)協(xié)議反序列化成對(duì)象,再按照派發(fā)策略派發(fā)消息。默認(rèn)是All,也就是所有請(qǐng)求扔給業(yè)務(wù)線程池。業(yè)務(wù)線程會(huì)獲取Invoker對(duì)象,并調(diào)用真實(shí)類,最終將結(jié)果返回。