ElasticSearch數(shù)據(jù)傳輸機(jī)制

ElasticSearch的數(shù)據(jù)傳輸服務(wù)TransportService

ElasticSearch的數(shù)據(jù)傳輸服務(wù)是在TransportService類中實現(xiàn)的。TransportService的核心方法是sendRequest,如下圖所示:

sendRequest方法

從上面的代碼段可以看出幾個有用的信息:

  1. 首先看這一句:
生成requestId

這一句表明,每個被傳輸?shù)恼埱?,均包含一個requestId,根據(jù)requestId的生成函數(shù)newRequestId()的實現(xiàn)來看,requestId實際是一個自增的long型變量。這一變量的作用就是為了標(biāo)識每一次請求。在接收方處理完請求,并返回應(yīng)答時,需要將請求的requestId帶回,以便發(fā)送方收到應(yīng)答后,能夠確定是對哪次請求的應(yīng)答。

  1. 再來看這一句:
響應(yīng)回調(diào)句柄對象

在發(fā)送傳輸請求時,同時指定了返回數(shù)據(jù)的回調(diào)句柄對象:TransportResponseHandler<T> hander。這一回調(diào)句柄對象被注冊到clientHandlers容器中。
clientHandlers可以看做是一個Map,map的key值是requestId,map的value值是對應(yīng)的TransportResponseHandler。可以通過clientHandlers.get(requestId)這樣的調(diào)用來獲取到對應(yīng)的ResponseHandler。

  1. 最后看這一段
調(diào)用不同方法發(fā)送Request

在發(fā)送請求的時候,需要指定發(fā)送的目標(biāo)節(jié)點。如果目標(biāo)節(jié)點是本機(jī),直接調(diào)用sendLocalRequest方法即可,這一方法不需要通過網(wǎng)絡(luò)協(xié)議進(jìn)行傳輸。如果目標(biāo)節(jié)點不是本機(jī),則調(diào)用transport成員的sendRequest方法實現(xiàn)數(shù)據(jù)發(fā)送。

Transport組件

ElasticSearch的節(jié)點間數(shù)據(jù)傳輸組件被抽象成Transport接口。并通過構(gòu)造函數(shù)注入的方式注入到TransportService對象中,如下圖所示:

將transport對象注入到TransportService對象中

可以看到,transport對象和threadPool對象都是通過構(gòu)造函數(shù)注入的方式注入到TransportService中的。
,實際上,Transport這一接口的實現(xiàn)類僅有NettyTransport這一個。所以,可以認(rèn)為ElasticSearch的節(jié)點間通訊就是通過Netty來實現(xiàn)的。

NettyTransport

由于Netty基于攔截器模式實現(xiàn)的NIO通訊框架,因此Netty的響應(yīng)處理機(jī)制要通過如下代碼說明:

Netty的ChannelPipeline設(shè)置

從上圖的代碼可以看出,ServerChannelPipelineFactory在pipeline上主要添加了兩個Handler,一個是SizeHeaderFrameDecoder,一個是MessageChannelHandler。

SizeHeaderFrameDecoder

SizeHeaderFrameDecoder在ChannelPipeline中被命名為“size”,考慮到Netty本身也內(nèi)置一個類似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解為該Decoder是負(fù)責(zé)通過一個數(shù)據(jù)包長度的字段來指示包的長度的。而實際上,elasticsearch的SizeHeaderFrameDecoder的功能遠(yuǎn)比簡單的一個包長度復(fù)雜,Netty的數(shù)據(jù)包頭也不僅是一個包長度信息。下面詳細(xì)介紹一下Netty數(shù)據(jù)包的包頭數(shù)據(jù)結(jié)構(gòu)。

Netty數(shù)據(jù)包頭格式

NettyHeader

NettyHeader數(shù)據(jù)的格式如下:

字段名稱| 字段長度(字節(jié))| 說明|備注
----|------|----
MARKER_BYTES_SIZE| 2 | 起始標(biāo)識 | “ES”兩個大寫字母
MESSAGE_LENGTH_SIZE| 4 | 消息長度 | int型變量
REQUEST_ID_SIZE| 8 | 消息ID | long型變量,請求發(fā)起方自增生成
STATUS_SIZE | 1 |狀態(tài)變量|消息的flag集合,下面詳細(xì)說明
VERSION_ID_SIZE| 4 | 版本信息 |

STATUS字段

NettyHeader中的Status字段的意義在TransportStatus類中定義。STATUS字段主要包含三個標(biāo)志位:

  • STATUS_REQRES
  • STATUS_ERROR
  • STATUS_COMPRESS

TransportStatus的代碼如下所示:

TransportStatus.java

下圖圖示中列出了STATUS字段(單字節(jié))各個標(biāo)識位的位置和意義??梢钥闯?,只有后三位是有意義的。

7|6|5|4|3|3|2|1|0
----|----|----|----|----|----|----|----
-|-|-|-|-|-|壓縮標(biāo)識|Error標(biāo)識|response標(biāo)識(request為0,response為1)

MessageChannelHandler

MessageChannelHandler在ChannelPipeline中被命名為“dispatcher”,這說明該Decoder負(fù)責(zé)決定接收到的數(shù)據(jù)包該交給那個具體的業(yè)務(wù)邏輯去處理。在MessageChannelHandler的業(yè)務(wù)邏輯中,如下三個成員起了重要作用:

  • transport
  • threadPool
  • transportServiceAdapter

這三個成員是通過構(gòu)造函數(shù)傳入的,如下圖所示:

MessageChannelHandler的構(gòu)造函數(shù)

從上面的代碼可以看出來,threadPool和transportServiceAdapter均來自于transport對象,因此,對于MessageChannelHandler來說,transport是至關(guān)重要的。

transportServiceAdapter

MessageChannelHandler的核心邏輯從messageReceived方法展開。但是,在進(jìn)入messageReceived方法之前,我想先介紹一下transportServiceAdapter。這是后面關(guān)于messageReceived方法相關(guān)邏輯中需要涉及到的一個重要成員變量。

TransportServiceAdapter接口和TransportService類

transportServiceAdapter成員是TransportServiceAdapter接口的一個實現(xiàn)類,該接口的代碼如下所示:

TransportServiceAdapter接口

該接口只有一個實現(xiàn)類,即TransportService.Adapter。其實,TransportServiceAdapter雖然命名為Adapter,但是,它的設(shè)計原意可能更接近門面模式。因為目標(biāo)是使用一個更簡單的接口來調(diào)用TransportService。TransportServiceAdapter接口有兩個主要的獲取消息處理句柄的方法,分別是:

  • onResponseReceived
  • getRequestHandler
    下面針對這兩個函數(shù),來看一下TransportService.Adapter的代碼。
TransportService.Adapter.onResponseReceived
TransportService.Adapter.onResponseReceived

從代碼中可以看到,這部分代碼的主要邏輯是從clientHandlers容器中,獲取到response的處理句柄——ResponseHandler。關(guān)于clientHandlers,之前在介紹TransportService.sendRequest方法時,介紹過了。下面結(jié)合此部分代碼,重新回憶和梳理一下request的發(fā)送和響應(yīng)流程:

  1. 發(fā)送方構(gòu)建Request,在提交Request的同時,還需要提供responseHandler的響應(yīng)信息回調(diào)處理句柄對象。
  2. 發(fā)送方將構(gòu)建的Request對象和responseHandler句柄傳遞給TransportService的sendRequest方法。
  3. TransportService的sendRequest方法首先給request分配一個requestId,然后將requestId和responseHandler已key-value對的方式存儲到clientHandlers容器中。隨后,sendRequest調(diào)用transport成員變量的sendRequest方法執(zhí)行數(shù)據(jù)發(fā)送操作。
  4. 接收方接收到request,進(jìn)行處理,并返回response。(這部分操作在下面會進(jìn)一步描述)。然后通過請求的通道(channel)將response返回。
  5. 發(fā)送方通過Netty框架完成接收數(shù)據(jù)包的處理,根據(jù)數(shù)據(jù)包的status字段,判斷這是一個Response,然后調(diào)用MessageChannelHandler的相應(yīng)函數(shù)進(jìn)行處理。MessageChannelHandler最終通過調(diào)用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根據(jù)requestId查找到該response對應(yīng)的handler處理句柄對象。
  6. 調(diào)用handler的handleResponse方法進(jìn)行返回結(jié)果的處理。根據(jù)handler的執(zhí)行線程選擇,可能在數(shù)據(jù)接收線程里面直接進(jìn)行處理,也可能在線程池調(diào)用線程進(jìn)行處理。
    下面,首先對上述第5步的數(shù)據(jù)包接收處理過程進(jìn)行詳細(xì)描述。
TransportService.Adapter.getRequestHandler方法
TransportService.Adapter.getRequestHandler

代碼很簡單,就是直接調(diào)用requestHandlers的get方法。requestHandlers也是個map,key值是action,value值是RequestHandlerRegistry,這個Registry中包含相應(yīng)消息的hander句柄對象。

那么requestHandlers是由誰構(gòu)建的呢,這個requestHandler是在系統(tǒng)啟動時,由各個消息相應(yīng)的Action對象通過調(diào)用registerRequestHandler方法,注冊到TransportService中的。整個ElasticSearch各個模塊,其中大量功能是需要用到節(jié)點間通訊的。因此,ElasticSearch各個模塊均會調(diào)用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction為例進(jìn)行說明,代碼如下:

SearchServiceTransportAction

從代碼中可以看到,SearchServiceTransportAction中注冊了大量不同類型的Request的處理句柄。

MessageChannelHandler的messageReceived方法

MessageChannelHandler作為Netty的Decoder的實現(xiàn)類。需要重載messageReceived方法。在該方法中,根據(jù)消息的status信息,來決定如何對消息進(jìn)行處理。具體代碼如下:

MessageChannelHandler的消息分發(fā)邏輯

可以看到,實際處理消息內(nèi)容的函數(shù)有如下幾個:

  • handleRequest
  • handlerResponseError
  • handleResponse
    以上這三個函數(shù)均是MessageChannelHandler的proteted方法。根據(jù)上面的業(yè)務(wù)邏輯,數(shù)據(jù)包的status標(biāo)志位中,只有response才會出現(xiàn)error的情況。

handleRequest函數(shù)

handleRequest函數(shù)的代碼如下圖所示:

MessageChannelHandler.handleRequest

上述代碼主要有三個需要注意的地方,已經(jīng)在上面的代碼中通過紅色方框標(biāo)出

獲取requestHandler

通過調(diào)用transportServiceAdapter.getRequestHandler方法實現(xiàn),這部分代碼在前面介紹transportServiceAdpater成員變量的時候,已經(jīng)進(jìn)行了較為詳細(xì)的說明。

執(zhí)行request的消息處理函數(shù)

從代碼上看,是根據(jù)request的處理句柄對象的執(zhí)行方式設(shè)定來決定是在當(dāng)前線程(Netty的消息處理線程)中進(jìn)行消息處理還是在特定的線程池中完成消息處理。

異常信息返回

如果在request消息處理過程中發(fā)生異常,則調(diào)用transportChannel.sendResponse(Throwable e)方法將錯誤信息返回給request請求節(jié)點。

handleResponse函數(shù)

handleResponse函數(shù)的代碼如下圖所示:

MessageChannelHandler.handleResponse

handleResponse方法中并無特殊需要注意的代碼。大致邏輯與handleRequest相同。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評論 19 139
  • 從三月份找實習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂視家的研發(fā)崗...
    時芥藍(lán)閱讀 42,869評論 11 349
  • 10#數(shù)據(jù)類型 合并數(shù)組和非合并數(shù)組 合并數(shù)組:存儲方式是連續(xù)的,中間沒有閑置空間。例如,32bit的寄存器,可以...
    constant007閱讀 33,627評論 0 18
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,308評論 25 708
  • 休息不代表一定要睡覺,其實只是換個腦子去工作而已。 優(yōu)秀不夠,你是否無可替代
    芣苢_0413閱讀 301評論 0 0

友情鏈接更多精彩內(nèi)容