Apollo 8 — ConfigService 異步輪詢接口的實(shí)現(xiàn)

源碼

Apollo 長(zhǎng)輪詢的實(shí)現(xiàn),是通過(guò)客戶端輪詢 /notifications/v2 接口實(shí)現(xiàn)的。具體代碼在 com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2.java。

這個(gè)類(lèi)也是實(shí)現(xiàn)了 ReleaseMessageListener 監(jiān)控,表明他是一個(gè)消息監(jiān)聽(tīng)器,當(dāng)有新的消息時(shí),就會(huì)調(diào)用他的 hanlderMessage 方法。這個(gè)具體我們后面再說(shuō)。

該類(lèi)只有一個(gè) rest 接口: pollNotification 方法。返回值是 DeferredResult,這是 Spring 支持 Servlet 3 的一個(gè)類(lèi),關(guān)于異步同步的不同,可以看筆者的另一篇文章 異步 Servlet 和同步 Servlet 的性能測(cè)試。

該接口提供了幾個(gè)參數(shù):

  1. appId appId
  2. cluster 集群名稱(chēng)
  3. notificationsAsString 通知對(duì)象的 json 字符串
  4. dataCenter,idc 屬性
  5. clientIp 客戶端 IP, 非必傳,為了擴(kuò)展吧估計(jì)

大家有么有覺(jué)得少了什么? namespace 。

當(dāng)然,沒(méi)有 namespace 這個(gè)重要的參數(shù)是不存在的。

參數(shù)在 notificationsAsString 中??蛻舳藭?huì)將自己所有的 namespace 傳遞到服務(wù)端進(jìn)行查詢。

是時(shí)候上源碼了。

@RequestMapping(method = RequestMethod.GET)
  public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
      @RequestParam(value = "appId") String appId,// appId
      @RequestParam(value = "cluster") String cluster,// default
      @RequestParam(value = "notifications") String notificationsAsString,// json 對(duì)象 List<ApolloConfigNotification>
      @RequestParam(value = "dataCenter", required = false) String dataCenter,// 基本用不上, idc 屬性
      @RequestParam(value = "ip", required = false) String clientIp) {

    List<ApolloConfigNotification> notifications =// 轉(zhuǎn)換成對(duì)象
          gson.fromJson(notificationsAsString, notificationsTypeReference);
          
    // Spring 的異步對(duì)象: timeout 60s, 返回304
    DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
    Set<String> namespaces = Sets.newHashSet();
    Map<String, Long> clientSideNotifications = Maps.newHashMap();
    Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);// 過(guò)濾一下名字
    // 循環(huán)
    for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
      // 拿出 key
      String normalizedNamespace = notificationEntry.getKey();
      // 拿出 value
      ApolloConfigNotification notification = notificationEntry.getValue();
      /* 添加到 namespaces Set */
      namespaces.add(normalizedNamespace);
      // 添加到 client 端的通知, key 是 namespace, values 是 messageId
      clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
      // 如果不相等, 記錄客戶端名字
      if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
        // 記錄 key = 標(biāo)準(zhǔn)名字, value = 客戶端名字
        deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
      }
    }// 記在 namespaces 集合, clientSideNotifications 也put (namespace, notificationId)

    // 組裝得到需要觀察的 key,包括公共的.
    Multimap<String, String> watchedKeysMap =
        watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);// namespaces 是集合
    // 得到 value; 這個(gè) value 也就是 appId + cluster + namespace
    Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
    // 從緩存得到最新的發(fā)布消息
    List<ReleaseMessage> latestReleaseMessages =// 根據(jù) key 從緩存得到最新發(fā)布的消息.
        releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

    /* 如果不關(guān)閉, 這個(gè)請(qǐng)求將會(huì)一直持有一個(gè)數(shù)據(jù)庫(kù)連接. 影響并發(fā)能力. 這是一個(gè) hack 操作*/
    entityManagerUtil.closeEntityManager();
    // 計(jì)算出新的通知
    List<ApolloConfigNotification> newNotifications =
        getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
            latestReleaseMessages);
    // 不是空, 理解返回結(jié)果, 不等待
    if (!CollectionUtils.isEmpty(newNotifications)) {
      deferredResultWrapper.setResult(newNotifications);
    } else {
      // 設(shè)置 timeout 回調(diào):打印日志
      deferredResultWrapper
          .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
      // 設(shè)置完成回調(diào):刪除 key
      deferredResultWrapper.onCompletion(() -> {
        //取消注冊(cè)
        for (String key : watchedKeys) {
          deferredResults.remove(key, deferredResultWrapper);
        }
      });

      //register all keys 注冊(cè)
      for (String key : watchedKeys) {
        this.deferredResults.put(key, deferredResultWrapper);
      }
    }
    // 立即返回
    return deferredResultWrapper.getResult();/** @see DeferredResultHandler 是關(guān)鍵 */
  }

注釋寫(xiě)了很多了,再簡(jiǎn)單說(shuō)說(shuō)邏輯:

  1. 解析 JSON 字符串為 List< ApolloConfigNotification> 對(duì)象。
  2. 創(chuàng)建 Spring 異步對(duì)象。
  3. 處理過(guò)濾 namespace。
  4. 根據(jù) namespace 生成需要監(jiān)聽(tīng)的 key,格式為 appId + cluster + namespace,包括公共 namespace。并獲取最新的 Release 信息。
  5. 關(guān)閉 Spring 實(shí)例管理器,釋放數(shù)據(jù)庫(kù)資源。
  6. 根據(jù)剛剛得到的 ReleaseMessage,和客戶端的 ReleaseMessage 的版本進(jìn)行對(duì)比,生成新的配置通知對(duì)象集合。
  7. 如果不是空 —— 立即返回給客戶端,結(jié)束此次調(diào)用。如果沒(méi)有,進(jìn)入第 8 步。
  8. 設(shè)置 timeout 回調(diào)方法 —— 打印日志。再設(shè)置完成回調(diào)方法:刪除注冊(cè)的 key。
  9. 對(duì)客戶端感興趣的 key 進(jìn)行注冊(cè),這些 key 都對(duì)應(yīng)著 deferredResultWrapper 對(duì)象,可以認(rèn)為他就是客戶端。
  10. 返回 Spring 異步對(duì)象。該請(qǐng)求將被異步掛起。

Apollo 的 DeferredResultWrapper 保證了 Spring 的 DeferredResult 對(duì)象,泛型內(nèi)容是 List<ApolloConfigNotification>, 構(gòu)造這個(gè)對(duì)象,默認(rèn)的 timeout 是 60 秒,即掛起 60 秒。同時(shí),對(duì) setResult 方法進(jìn)行包裝,加入了對(duì)客戶端 key 和服務(wù)端 key 的一個(gè)映射(大小寫(xiě)不一致) 。

我們剛剛說(shuō),Apollo 會(huì)將這些 key 注冊(cè)起來(lái)。那么什么時(shí)候使用呢,異步對(duì)象被掛起,又是上面時(shí)候被喚醒呢?

答案就在 handleMessage 方法里。我們剛剛說(shuō)他是一個(gè)監(jiān)聽(tīng)器,當(dāng)消息掃描器掃描到新的消息時(shí),會(huì)通知所有的監(jiān)聽(tīng)器,也就是執(zhí)行 handlerMessage 方法。方法內(nèi)容如下:

@Override
public void handleMessage(ReleaseMessage message, String channel) {

  String content = message.getMessage();
  if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
    return;
  }
  String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

  //create a new list to avoid ConcurrentModificationException 構(gòu)造一個(gè)新 list ,防止并發(fā)失敗
  List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

  // 創(chuàng)建通知對(duì)象
  ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
  configNotification.addMessage(content, message.getId());

  //do async notification if too many clients 如果有大量的客戶端(100)在等待,使用線程池異步處理
  if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
    // 大量通知批量處理
    largeNotificationBatchExecutorService.submit(() -> {
      for (int i = 0; i < results.size(); i++) { // 循環(huán)
        /*
         * 假設(shè)一個(gè)公共 Namespace 有10W 臺(tái)機(jī)器使用,如果該公共 Namespace 發(fā)布時(shí)直接下發(fā)配置更新消息的話,
         * 就會(huì)導(dǎo)致這 10W 臺(tái)機(jī)器一下子都來(lái)請(qǐng)求配置,這動(dòng)靜就有點(diǎn)大了,而且對(duì) Config Service 的壓力也會(huì)比較大。
         * 即"驚群效應(yīng)"
         */
        if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {// 如果處理了一批客戶端,休息一下(100ms)
            TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
        }
        results.get(i).setResult(configNotification);// 通知每個(gè)等待的 HTTP 請(qǐng)求
      }
    });
    return;
  }

  // 否則,同步處理
  for (DeferredResultWrapper result : results) {
    result.setResult(configNotification);
  }
}

筆者去除了一些日志和一些數(shù)據(jù)判斷。大致的邏輯如下:

  1. 消息類(lèi)型必須是 “apollo-release”。然后拿到消息里的 namespace 內(nèi)容。
  2. 根據(jù) namespace 從注冊(cè)器里拿出 Spring 異步對(duì)象集合
  3. 創(chuàng)建通知對(duì)象。
  4. 如果有超過(guò) 100 個(gè)客戶端在等待,那么就使用線程池批量執(zhí)行通知。否則就同步慢慢執(zhí)行。
  5. 每處理 100 個(gè)客戶端就休息 100ms,防止發(fā)生驚群效應(yīng),導(dǎo)致大量客戶端調(diào)用配置獲取接口,引起服務(wù)抖動(dòng)。
  6. 循環(huán)調(diào)用 Spring 異步對(duì)象的 setResult 方法,讓其立即返回。

具體的流程圖如下:

其中,灰色區(qū)域是掃描器的異步線程,黃色區(qū)域是接口的同步線程。他們共享 deferredResults 這個(gè)線程安全的 Map,實(shí)現(xiàn)異步解耦和實(shí)時(shí)通知客戶端。

總結(jié)

好了,這就是 Apollo 的長(zhǎng)輪詢接口,客戶端會(huì)不斷的輪詢服務(wù)器,服務(wù)器會(huì) Hold住 60 秒,這是通過(guò) Servlet 3 的異步 + NIO 來(lái)實(shí)現(xiàn)的,能夠保持萬(wàn)級(jí)連接(Tomcat 默認(rèn) 10000)。

通過(guò)一個(gè)線程安全的 Map + 監(jiān)聽(tīng)器,讓掃描器線程和 HTTP 線程共享 Spring 異步對(duì)象,即實(shí)現(xiàn)了消息實(shí)時(shí)通知,也讓?xiě)?yīng)用程序?qū)崿F(xiàn)異步解耦。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容