Motan 調(diào)用執(zhí)行流程

客戶端服務(wù)接口所持有的對(duì)象為RefererInvocationHandler,jdk的代理實(shí)現(xiàn),invoke調(diào)用,先封裝DefaultRequest,遍歷clusters,如果存在降級(jí)的則跳過(guò),默認(rèn)選第一個(gè)非降級(jí)的cluster

  • clusters處理
for (Cluster<T> cluster : clusters) {
    String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();

    Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
    // 跳過(guò)降級(jí)的cluster
    if (switcher != null && !switcher.isOn()) {
         continue;
    }
    // request設(shè)置各種數(shù)據(jù)
    // 最后通過(guò)cluster.call調(diào)用
    response = cluster.call(request);
}
  • 高可用策略處理
// cluster.call,通過(guò)配置的HA策略 來(lái)實(shí)現(xiàn)調(diào)用(默認(rèn)配置:FailoverHaStrategy)
public Response call(Request request) {
    if (available.get()) {
        try {
            // loadBalance將在haStrategy中處理
            return haStrategy.call(request, loadBalance);
        } catch (Exception e) {
            return callFalse(request, e);
        }
    }
    return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
  • 負(fù)載均衡處理
// 選擇Refer,通過(guò)loadBalance.selectToHolder(request, referers);選擇
List<Referer<T>> referers = selectReferers(request, loadBalance);
URL refUrl = referers.get(0).getUrl();
//
// 從第一個(gè)url中獲取重試次數(shù)
int tryCount = refUrl.getMethodParameter(request.getMethodName(), 
        request.getParamtersDesc(), URLParamType.retries.getName(),
        URLParamType.retries.getIntValue());
// 如果有問(wèn)題,則設(shè)置為不重試
if (tryCount < 0) {
  tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
  // 選擇一個(gè)服務(wù)
  Referer<T> refer = referers.get(i % referers.size());
  try {
    // 設(shè)置重試次數(shù)
    request.setRetries(i);
    // 開(kāi)始遠(yuǎn)程調(diào)用
    return refer.call(request);
  } catch (RuntimeException e) {
  // 對(duì)于業(yè)務(wù)異常,直接拋出
  if (ExceptionUtil.isBizException(e)) {
    throw e;
  } else if (i >= tryCount) {
    throw e;
  }
  LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
// selectToHolder的處理(選擇refers)
public void selectToHolder(Request request, List<Referer<T>> refersHolder) {
  List<Referer<T>> referers = this.referers;

  if (referers == null) {
    throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size= 0 "
                    + MotanFrameworkUtil.toString(request));
  }

  if (referers.size() > 1) {
    // 多于一個(gè),繼續(xù)選(這里就根據(jù)配置的負(fù)載均衡策略來(lái)處理,
    // 默認(rèn)的是RoundRobinLoadBalance),但也會(huì)返回多個(gè)refer
    doSelectToHolder(request, refersHolder);
  } else if (referers.size() == 1 && referers.get(0).isAvailable()) {
    // 只有一個(gè),只記錄當(dāng)前的
    refersHolder.add(referers.get(0));
  }
  if (refersHolder.isEmpty()) {
    throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size="
                    + referers.size() + " " + MotanFrameworkUtil.toString(request));
  }
}
  • refer.call最后在DefaultRpcProtocol的DefaultRpcReferer中通過(guò)NettyClient發(fā)出請(qǐng)求
protected Response doCall(Request request) {
    try {
      // 為了能夠?qū)崿F(xiàn)跨group請(qǐng)求,需要使用server端的group。
      request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
      return client.request(request);
    } catch (TransportException exception) {
      throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
    }
}
  • 最終通過(guò)netty將消息發(fā)出
ChannelFuture writeFuture = this.channel.write(request);
// 注冊(cè)一個(gè)回調(diào),用來(lái)處理調(diào)用情況
response.addListener(new FutureListener() {
    @Override
    public void operationComplete(Future future) throws Exception {
        if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
            // 成功的調(diào)用 
            nettyClient.resetErrorCount();
        } else {
            // 失敗的調(diào)用 
            nettyClient.incrErrorCount();
        }
    }
});

服務(wù)器端通過(guò)Netty接收客戶端的請(qǐng)求,入口為NettyChannelHandler的messageReceived

  • netty 處理
if (message instanceof Request) {
    processRequest(ctx, e);  // 處理客戶端發(fā)來(lái)的請(qǐng)求
} else if (message instanceof Response) {
    processResponse(ctx, e);
}
//
// 使用線程池處理調(diào)用請(qǐng)求
threadPoolExecutor.execute(new Runnable() {
    @Override
    public void run() {
        try{
            RpcContext.init(request);
            processRequest(ctx, request, processStartTime);
        }finally{
            RpcContext.destroy();
        }
    }
});
  • ProviderProtectedMessageRouter處理
// key:motan-demo-rpc/com.weibo.motan.demo.service.MotanDemoService/1.0
String serviceKey = MotanFrameworkUtil.getServiceKey(request);
// 根據(jù)key獲取Provider(DefaultProvider)
Provider<?> provider = providers.get(serviceKey);
// 通過(guò)反射調(diào)用
Method method = lookup(request);
Object value = method.invoke(proxyImpl, request.getArguments());
  • 然后進(jìn)入service層代碼處理,后續(xù)就是結(jié)果返回處理了。

大概的示意圖:


call.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 本文繼續(xù)分析dubbo的cluster層,此層封裝多個(gè)提供者的路由及負(fù)載均衡,并橋接注冊(cè)中心,以Invoke...
    Java大生閱讀 1,055評(píng)論 0 0
  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,920評(píng)論 0 13
  • 原文:https://developer.android.com/reference/android/media/...
    thebestofrocky閱讀 6,373評(píng)論 0 6
  • “陽(yáng)歷年孩子們放假了?回來(lái)一趟吧,帶孩子來(lái)玩兩天?!?“媽,倆孩子都快考試了,放了寒假再去吧?!?“我們老了,是不...
    涼涼笙閱讀 668評(píng)論 8 11
  • 早上被電話叫醒,睜眼一看已經(jīng)7:13,是婷媽媽打電話看我們收拾好了沒(méi)。我很焦急地告訴她我才醒,婷媽媽說(shuō)那她們...
    段師傅貼膜閱讀 149評(píng)論 0 0

友情鏈接更多精彩內(nèi)容