Spark消息通信原理

一.NettyRpcEnv主要組件

子組件TransportConf,Dispatcher,TransportClientFactory,TransportServer

  • TransportConf 為RPC框架的中的配置類

  • Dispatcher 可以有效提高NettyRpcEnv消息異步處理能力和并行處理能力,負責將RPC消息路由到應該對此消息處理的RpcEndpoint端點。

  • TransportContext 是NettyRpcEnv提供服務端和客戶端能力的前提,內部的NettyRpcHandler用于接受遠程客戶端或服務端發(fā)送過來的消息,并將ByteBuffer反序列化成RequestMessage,調用相應處理消息的方法。

  • TransportClientFactory 是NettyRpcEnv向遠端服務發(fā)起請求的基礎,Spark與遠端RpcEnv進行通信都依賴于其生成的TransportClient。

  • TransportServer 為NettyRpcEnv提供了對外接受請求,處理請求,回復客戶端的服務。

二.Dispatcher的構成介紹

endpoints:端點實例名稱與端點數據EndpointData之間映射關系的緩存,有了這個緩存,就可以使用端點名稱從中快速獲取或者刪除EndpointData

endpointRefs:端點實例RpcEndpoint與端點實例引用RpcEndpointRef之間的映射關系的緩存,可以使用端點實例從中快速獲取或者刪除端點實例的引用。

receivers:存儲端點數據EndpointData的阻塞隊列,只有Inbox中有消息的EndpointData才會被放入到此阻塞隊列。

threadpool:用于對消息進行調度的線程池,此線程池運行的任務都是MessageLoop。

三.接受消息的處理過程

Dispatcher中MessageLoop的執(zhí)行流程
  1. 調用Inbox的post方法,將消息放入到message列表中
  2. 將有消息的Inbox相關聯的EndpointData放入到receivers中
  3. MessageLoop每次循環(huán)首先從receiver中獲取EndpointData
  4. 執(zhí)行EndpointData中Inbox中的process方法對消息進行具體的處理。

四.發(fā)送消息到遠端和本地的調用流程

NettyRpcEndpointRef中的ask方法和send方法都是首先將message封裝成RequestMessage,然后通過調用NettyRpcEnv的send方法和ask方法對消息目的地進行判斷,如果是發(fā)送到本地的消息,就調用Dispatcher中對應的postLocalMessage/ postOneWayMessage方法發(fā)送到本地的RpcEndpoint對應EndpointData中的Inbox中。

如果是發(fā)送到遠程RpcEndpoint的消息,則調用NettyRpcEnv的postToOutbox方法,從outboxes中根據遠端地址,取出相應的Outbox,然后將消息放入到遠端RpcEndpoint的地址所對應的Outbox的message列表中。然后Outbox中會調用drainOutbox方法不斷循環(huán),從messages列表中取得OutboxMessage,通過TransportClient向外發(fā)送消息到對應的NettyRpcEnv中的RpcEndpoint。

五.總結(Spark消息通信全過程)

RPC客戶端發(fā)送請求流程
  1. 調用NettyRpcEndpointRef的send和ask方法,向本地節(jié)點的RpcEndpoint發(fā)送消息,由于是在同一節(jié)點,所以直接調用Dispatcher的postLocalMessage或postOneWayMessage方法,將消息放入EndpointData內部的Inbox的message列表中,此EndpointData也會被加入到Dispatcher中的消息隊列receivers,消息隊列中有消息,觸發(fā)MessageLoop線程處理消息,執(zhí)行EndpointData中Inbox中的process方法對消息進行具體的處理,其實最后調用的都是RpcEndpoint中的receiveAndReply或者receive等方法。

  2. 通過NettyRpcEndpointRef的send方法和ask方法向遠端節(jié)點的RpcEndpoint發(fā)送消息,在這種情況下,首先將消息封裝成OutboxMessage,然后放入到遠端RpcEndpoint的地址所對應的Outbox的message列表中。

  3. 每個Outbox中會調用drainOutbox方法不斷循環(huán),從messages列表中取得OutboxMessage。

  4. Outbox中會使用內部的TransportClient向遠端的NettyRpcEnv發(fā)送OutboxMessage。

  5. 和遠端的NettyRpcEnv的TransportServer建立了連接后,請求消息首先經過Netty管道的處理,然后經由NettyRpcHandler的處理,最后來自服務端NettyRpcServer的回復消息會觸發(fā)NettyRpcHandler的receive方法,進而調用Dispatcher的postRemoteMessage或者postOneWayMessage方法。首先是根據端點名稱endpointName從緩存endpoints中獲取EndpointData,將消息放入到EndpointData內部的Inbox的message列表中,然后將EndpointData推入到receviers中,最后觸發(fā)MessageLoop線程處理消息,執(zhí)行EndpointData中Inbox中的process方法對消息進行具體的處理,其實最后調用的都是RpcEndpoint中的receiveAndReply或者receive等方法。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容