SOFA 源碼分析 —— 服務(wù)發(fā)布過程

前言

SOFA 包含了 RPC 框架,底層通信框架是 bolt ,基于 Netty 4,今天將通過 SOFA—RPC 源碼中的例子,看看他是如何發(fā)布一個服務(wù)的。

示例代碼

下面的代碼在 com.alipay.sofa.rpc.quickstart.QuickStartServer 類下。

ServerConfig serverConfig = new ServerConfig()
    .setProtocol("bolt") // 設(shè)置一個協(xié)議,默認(rèn)bolt
    .setPort(9696) // 設(shè)置一個端口,默認(rèn)12200
    .setDaemon(false); // 非守護線程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setRef(new HelloServiceImpl()) // 指定實現(xiàn)
    .setServer(serverConfig); // 指定服務(wù)端

providerConfig.export(); // 發(fā)布服務(wù)

首先,創(chuàng)建一個 ServerConfig ,包含了端口,協(xié)議等基礎(chǔ)信息,當(dāng)然,這些都是手動設(shè)定的,在該類加載的時候,會自動加載很多配置文件中的服務(wù)器默認(rèn)配置。比如 RpcConfigs 類,RpcRuntimeContext 上下文等。

然后呢,創(chuàng)建一個 ProviderConfig,也是個 config,不過多繼承了一個 AbstractInterfaceConfig 抽象類,該類是接口級別的配置,而 ServerConfig 是 服務(wù)器級別的配置。雖然都繼承了 AbstractIdConfig。

ProviderConfig 包含了接口名稱,接口指定實現(xiàn)類,還有服務(wù)器的配置。

最后,ProviderConfig 調(diào)用 export 發(fā)布服務(wù)。

展示給我的 API 很簡單,但內(nèi)部是如何實現(xiàn)的呢?

在看源碼之前,我們思考一下:如果我們自己來實現(xiàn),怎么弄?

RPC 框架簡單一點來說,就是使用動態(tài)代理和 Socket。

SOFA 使用 Netty 來做網(wǎng)絡(luò)通信框架,我們之前也寫過一個簡單的 Netty RPC,主要是通過 handler 的 channelRead 方法來實現(xiàn)。

SOFA 是這么操作的嗎?

一起來看看。

# 源碼分析

上面的示例代碼其實就是 3 個步驟,創(chuàng)建 ServerConfig,創(chuàng)建 ProviderConfig,調(diào)用 export 方法。

先看第一步,還是有點意思的。

雖然是空構(gòu)造方法,但 ServerConfig 的屬性都是自動初始化的,而他的父類 AbstractIdConfig 更有意思了,父類有 1 個地方值得注意:

static {
    RpcRuntimeContext.now();
}

熟悉類加載的同學(xué)都知道,這是為了主動加載 RpcRuntimeContext ,看名字是 RPC 運行時上下文,所謂上下文,大約就是我們?nèi)祟惲奶熘械?"老地方" 的意思。

這個上下文會在靜態(tài)塊中加載 Module(基于擴展點實現(xiàn)),注冊 JVM 關(guān)閉鉤子(類似 Tomcat)。還有很多配置信息。

然后呢?創(chuàng)建 ProviderConfig 對象。這個類比上面的那個類多繼承了一個 AbstractInterfaceConfig,接口級別的配置。比如有些方法我不想發(fā)布啊,比如權(quán)重啊,比如超時啊,比如具體的實現(xiàn)類啊等等,當(dāng)然還需要一個 ServerConfig 的屬性(注冊到 Server 中啊喂)。

最后就是發(fā)布了。export 方法。

ProviderCofing 擁有一個 export 方法,但并不是直接就在這里發(fā)布的,因為他是一個 config,不適合在config 里面做這些事情,違背單一職責(zé)。

SOFA 使用了一個 Bootstrap 類來進行操作。和大部分服務(wù)器類似,這里就是啟動服務(wù)器的地方。因為這個類會多線程使用,比如并發(fā)的發(fā)布服務(wù)。而不是一個一個慢慢的發(fā)布服務(wù)。所以他不是單例的,而是和 Config 一起使用的,并緩存在 map 中。

ProviderBootstrap 目前有 3 個實現(xiàn):Rest,Bolt,Dubbo。Bolt 是他的默認(rèn)實現(xiàn)。

export 方法默認(rèn)有個實現(xiàn)(Dubbo 的話就要重寫了)。主要邏輯是執(zhí)行 doExport 方法,其中包括延遲加載邏輯。

而 doExport 方法中,就是 SOFA 發(fā)布服務(wù)的邏輯所在了。

樓主將方法的異常處理邏輯去除,整體如下:

 private void doExport() {
        if (exported) {
            return;
        }
        String key = providerConfig.buildKey();
        String appName = providerConfig.getAppName();
        // 檢查參數(shù)
        checkParameters();
        // 注意同一interface,同一uniqleId,不同server情況
        AtomicInteger cnt = EXPORTED_KEYS.get(key); // 計數(shù)器
        if (cnt == null) { // 沒有發(fā)布過
            cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        int maxProxyCount = providerConfig.getRepeatedExportLimit();
        if (maxProxyCount > 0) {
          // 超過最大數(shù)量,直接拋出異常
        }
        // 構(gòu)造請求調(diào)用器
        providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
        // 初始化注冊中心
        if (providerConfig.isRegister()) {
            List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
            if (CommonUtils.isNotEmpty(registryConfigs)) {
                for (RegistryConfig registryConfig : registryConfigs) {
                    RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                }
            }
        }
        // 將處理器注冊到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            Server server = serverConfig.buildIfAbsent();
            // 注冊序列化接口
            server.registerProcessor(providerConfig, providerProxyInvoker);
            if (serverConfig.isAutoStart()) {
                server.start();
            }
        }

        // 注冊到注冊中心
        providerConfig.setConfigListener(new ProviderAttributeListener());
        register();

        // 記錄一些緩存數(shù)據(jù)
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }

主要邏輯如下:

  1. 根據(jù) providerConfig 創(chuàng)建一個 key 和 AppName。
  2. 檢驗同一個服務(wù)多次發(fā)布的次數(shù)。
  3. 創(chuàng)建一個 ProviderProxyInvoker, 其中包含了過濾器鏈,而過濾器鏈的最后一鏈就是對接口實現(xiàn)類的調(diào)用。
  4. 初始化注冊中心,創(chuàng)建 Server(會有多個Server,因為可能配置了多個協(xié)議)。
  5. 將 config 和 invoker 注冊到 Server 中。內(nèi)部是將其放進了一個 Map 中。
  6. 啟動 Server。啟動 Server 其實就是啟動 Netty 服務(wù),并創(chuàng)建一個 RpcHandler,也就是 Netty 的 Handler,這個 RpcHandler 內(nèi)部含有一個數(shù)據(jù)結(jié)構(gòu),包含接口級別的 invoker。所以,當(dāng)請求進入的時候,RpcHandler 的 channelRead 方法會被調(diào)用,然后間接的調(diào)用 invoker 方法。
  7. 成功啟動后,注冊到注冊中心。將數(shù)據(jù)緩存到 RpcRuntimeContext 的一個 Set 中。

一起來詳細(xì)看看。

Invoker 怎么構(gòu)造的?很簡單,最主要的就是過濾器。關(guān)于過濾器,我們之前已經(jīng)寫過一篇文章了。不再贅述。

關(guān)鍵看看 Server 是如何構(gòu)造的。

關(guān)鍵代碼 serverConfig.buildIfAbsent(),類似 HashMap 的 putIfAbsent。如果不存在就創(chuàng)建。

Server 接口目前有 2 個實現(xiàn),bolt 和 rest。當(dāng)然,Server 也是基于擴展的,所以,不用怕,可以隨便增加實現(xiàn)。

關(guān)鍵代碼在 ServerFactory 的 getServer 中,其中會獲取擴展點的 Server,然后,執(zhí)行 Server 的 init 方法,我們看看默認(rèn) bolt 的 init 方法。

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        // 啟動線程池
        bizThreadPool = initThreadPool(serverConfig);
        boltServerProcessor = new BoltServerProcessor(this);
    }

保存了 serverConfig 的引用,啟動了一個業(yè)務(wù)線程池,創(chuàng)建了一個 BoltServerProcessor 對象。

第一:這個線程池會在 Bolt 的 RpcHandler 中被使用,也就是說,復(fù)雜業(yè)務(wù)都是在這個線程池執(zhí)行,不會影響 Netty 的 IO 線程。

第二:BoltServerProcessor 非常重要,他的構(gòu)造方法包括了當(dāng)前的 BoltServer,所以他倆是互相依賴的。關(guān)鍵點來了:

BoltServerProcessor 實現(xiàn)了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一個 Map<String, UserProcessor<?>>,所以,當(dāng) RpcHandler 被執(zhí)行 channelRead 方法的時候,一定會根據(jù)接口名稱找到對應(yīng)的 UserProcessor,并執(zhí)行他的 handlerRequest 方法。

那么,RpcHandler 是什么時候創(chuàng)建并放置到 RpcHandler 中的呢?

具體是這樣的:在 server.start() 執(zhí)行的時候,該方法會初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,將 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。然后,當(dāng) RpcServer 啟動的時候,也就是 start 方法,會執(zhí)行一個 init 方法,該方法內(nèi)部就是設(shè)置 Netty 各種屬性的地方,包括 Hander,其中有 2 行代碼對我們很重要:

final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
pipeline.addLast("handler", rpcHandler);

創(chuàng)建了一個 RpcHandler,并添加到 pipeline 中,這個 Handler 的構(gòu)造參數(shù)就是包含所有 BoltServerProcessor 的 Map。

所以,總的流程就是:

每個接口都會創(chuàng)建一個 providerConfig 對象,這個對象會創(chuàng)建對應(yīng)的 invoker 對象(包含過濾器鏈),這兩個對象都會放到 BoltServer 的 invokerMap 中,而 BoltServer 還包含其他對象,比如 BoltServerProcessor(繼承 UserProcessor), RpcServer(依賴 RpcHandler)。當(dāng)初始化 BoltServerProcessor 的時候,會傳入 this(BoltServer),當(dāng)初始化 RpcServer 的時候,會傳入 BoltServerProcessor 到 RpcServer 的 Map 中。在 RpcHandler 初始化的時候,又會將 RpcServer 的 Map 傳進自己的內(nèi)部。完成最終的依賴。
當(dāng)請求進入,RpcHandler 調(diào)用對應(yīng)的 UserProcessor 的 handlerRequest 方法,而該方法中,會調(diào)用對應(yīng)的 invoker,invoker 調(diào)用過濾器鏈,知道調(diào)用真正的實現(xiàn)類。

而大概的 UML 圖就是下面這樣的:

image.png

紅色部分是 RPC 的核心,包含 Solt 的 Server,實現(xiàn) UserProcessor 接口的 BoltServerProcessor,業(yè)務(wù)線程池,存儲所有接口實現(xiàn)的 Map。

綠色部分是 Bolt 的接口和類,只要實現(xiàn)了 UserProcessor 接口,就能將具體實現(xiàn)替換,也既是處理具體數(shù)據(jù)的邏輯。

最后,看看關(guān)鍵類 BoltServerProcessor ,他是融合 RPC 和 Bolt 的膠水類。

該類會注冊一個序列化器替代 Bolt 默認(rèn)的。handleRequest 方法是這個類的核心方法。有很多邏輯,主要看這里:

// 查找服務(wù)
Invoker invoker = boltServer.findInvoker(serviceName);
// 真正調(diào)用
response = doInvoke(serviceName, invoker, request);

/**
 * 找到服務(wù)端Invoker
 *
 * @param serviceName 服務(wù)名
 * @return Invoker對象
 */
public Invoker findInvoker(String serviceName) {
    return invokerMap.get(serviceName);
}

根據(jù)服務(wù)名稱,從 Map 中找到服務(wù),然后調(diào)用 invoker 的 invoker 方法。

再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的調(diào)用鏈,使用 IDEA 的 Hierarchy 功能,查看該方法,最后停留在 ProcessTast 中,一個 Runnable.

image.png

根據(jù)經(jīng)驗,這個類肯定是被放到線程池了。什么時候放的呢?看看他的構(gòu)造方法的 Hierarchy。

image.png

從圖中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最終會調(diào)用 ProcessTask 的 構(gòu)造方法。

那么 BoltServer 的用戶線程池什么時候使用呢?還是使用 IDEA 的 Hierarchy 功能。

image.png

其實也是在這個過程中,當(dāng)用戶沒有設(shè)置線程池,則使用系統(tǒng)線程池。

總結(jié)

好了,關(guān)于 SOFA 的服務(wù)發(fā)布和服務(wù)的接收過程,就介紹完了,可以說,整個框架還是非常輕量級的。基本操作就是:內(nèi)部通過在 Netty的 Handler 中保存一個存儲服務(wù)實現(xiàn)的 Map 完成遠(yuǎn)程調(diào)用。

其實和我們之前用 Netty 寫的小 demo 類似。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評論 19 139
  • dubbo暴露服務(wù)有兩種情況,一種是設(shè)置了延遲暴露(比如delay="5000"),另外一種是沒有設(shè)置延遲暴露或者...
    加大裝益達閱讀 21,414評論 5 36
  • 時序圖 在講解源碼前,先看下官方文檔提供的時序圖,后面的講解基本是這個路線,但是會更細(xì)節(jié)化 大致邏輯 首先服務(wù)的實...
    土豆肉絲蓋澆飯閱讀 2,963評論 2 3
  • 這是一份遲來的愛,致愛“利”絲。 當(dāng)一個奮斗多年小有成就、自認(rèn)為可以和業(yè)界老大平起平坐的男人,來到對手的面前邀約比...
    諸葛不亮008閱讀 473評論 0 0
  • 過了這個本該皓月當(dāng)空的月圓夜,不曉得多少人跟我一樣,這年過得也沒什么感覺,稀里糊涂,一晃就過去了。一回到家除了幾個...
    一晌貪玩閱讀 704評論 1 1

友情鏈接更多精彩內(nèi)容