ElasticSearch的數(shù)據(jù)傳輸服務(wù)TransportService
ElasticSearch的數(shù)據(jù)傳輸服務(wù)是在TransportService類中實現(xiàn)的。TransportService的核心方法是sendRequest,如下圖所示:

從上面的代碼段可以看出幾個有用的信息:
- 首先看這一句:

這一句表明,每個被傳輸?shù)恼埱?,均包含一個requestId,根據(jù)requestId的生成函數(shù)newRequestId()的實現(xiàn)來看,requestId實際是一個自增的long型變量。這一變量的作用就是為了標(biāo)識每一次請求。在接收方處理完請求,并返回應(yīng)答時,需要將請求的requestId帶回,以便發(fā)送方收到應(yīng)答后,能夠確定是對哪次請求的應(yīng)答。
- 再來看這一句:

在發(fā)送傳輸請求時,同時指定了返回數(shù)據(jù)的回調(diào)句柄對象:TransportResponseHandler<T> hander。這一回調(diào)句柄對象被注冊到clientHandlers容器中。
clientHandlers可以看做是一個Map,map的key值是requestId,map的value值是對應(yīng)的TransportResponseHandler。可以通過clientHandlers.get(requestId)這樣的調(diào)用來獲取到對應(yīng)的ResponseHandler。
- 最后看這一段

在發(fā)送請求的時候,需要指定發(fā)送的目標(biāo)節(jié)點。如果目標(biāo)節(jié)點是本機(jī),直接調(diào)用sendLocalRequest方法即可,這一方法不需要通過網(wǎng)絡(luò)協(xié)議進(jìn)行傳輸。如果目標(biāo)節(jié)點不是本機(jī),則調(diào)用transport成員的sendRequest方法實現(xiàn)數(shù)據(jù)發(fā)送。
Transport組件
ElasticSearch的節(jié)點間數(shù)據(jù)傳輸組件被抽象成Transport接口。并通過構(gòu)造函數(shù)注入的方式注入到TransportService對象中,如下圖所示:

可以看到,transport對象和threadPool對象都是通過構(gòu)造函數(shù)注入的方式注入到TransportService中的。
,實際上,Transport這一接口的實現(xiàn)類僅有NettyTransport這一個。所以,可以認(rèn)為ElasticSearch的節(jié)點間通訊就是通過Netty來實現(xiàn)的。
NettyTransport
由于Netty基于攔截器模式實現(xiàn)的NIO通訊框架,因此Netty的響應(yīng)處理機(jī)制要通過如下代碼說明:

從上圖的代碼可以看出,ServerChannelPipelineFactory在pipeline上主要添加了兩個Handler,一個是SizeHeaderFrameDecoder,一個是MessageChannelHandler。
SizeHeaderFrameDecoder
SizeHeaderFrameDecoder在ChannelPipeline中被命名為“size”,考慮到Netty本身也內(nèi)置一個類似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解為該Decoder是負(fù)責(zé)通過一個數(shù)據(jù)包長度的字段來指示包的長度的。而實際上,elasticsearch的SizeHeaderFrameDecoder的功能遠(yuǎn)比簡單的一個包長度復(fù)雜,Netty的數(shù)據(jù)包頭也不僅是一個包長度信息。下面詳細(xì)介紹一下Netty數(shù)據(jù)包的包頭數(shù)據(jù)結(jié)構(gòu)。
Netty數(shù)據(jù)包頭格式
NettyHeader
NettyHeader數(shù)據(jù)的格式如下:
字段名稱| 字段長度(字節(jié))| 說明|備注
----|------|----
MARKER_BYTES_SIZE| 2 | 起始標(biāo)識 | “ES”兩個大寫字母
MESSAGE_LENGTH_SIZE| 4 | 消息長度 | int型變量
REQUEST_ID_SIZE| 8 | 消息ID | long型變量,請求發(fā)起方自增生成
STATUS_SIZE | 1 |狀態(tài)變量|消息的flag集合,下面詳細(xì)說明
VERSION_ID_SIZE| 4 | 版本信息 |
STATUS字段
NettyHeader中的Status字段的意義在TransportStatus類中定義。STATUS字段主要包含三個標(biāo)志位:
- STATUS_REQRES
- STATUS_ERROR
- STATUS_COMPRESS
TransportStatus的代碼如下所示:

下圖圖示中列出了STATUS字段(單字節(jié))各個標(biāo)識位的位置和意義??梢钥闯?,只有后三位是有意義的。
7|6|5|4|3|3|2|1|0
----|----|----|----|----|----|----|----
-|-|-|-|-|-|壓縮標(biāo)識|Error標(biāo)識|response標(biāo)識(request為0,response為1)
MessageChannelHandler
MessageChannelHandler在ChannelPipeline中被命名為“dispatcher”,這說明該Decoder負(fù)責(zé)決定接收到的數(shù)據(jù)包該交給那個具體的業(yè)務(wù)邏輯去處理。在MessageChannelHandler的業(yè)務(wù)邏輯中,如下三個成員起了重要作用:
- transport
- threadPool
- transportServiceAdapter
這三個成員是通過構(gòu)造函數(shù)傳入的,如下圖所示:

從上面的代碼可以看出來,threadPool和transportServiceAdapter均來自于transport對象,因此,對于MessageChannelHandler來說,transport是至關(guān)重要的。
transportServiceAdapter
MessageChannelHandler的核心邏輯從messageReceived方法展開。但是,在進(jìn)入messageReceived方法之前,我想先介紹一下transportServiceAdapter。這是后面關(guān)于messageReceived方法相關(guān)邏輯中需要涉及到的一個重要成員變量。
TransportServiceAdapter接口和TransportService類
transportServiceAdapter成員是TransportServiceAdapter接口的一個實現(xiàn)類,該接口的代碼如下所示:

該接口只有一個實現(xiàn)類,即TransportService.Adapter。其實,TransportServiceAdapter雖然命名為Adapter,但是,它的設(shè)計原意可能更接近門面模式。因為目標(biāo)是使用一個更簡單的接口來調(diào)用TransportService。TransportServiceAdapter接口有兩個主要的獲取消息處理句柄的方法,分別是:
- onResponseReceived
- getRequestHandler
下面針對這兩個函數(shù),來看一下TransportService.Adapter的代碼。
TransportService.Adapter.onResponseReceived

從代碼中可以看到,這部分代碼的主要邏輯是從clientHandlers容器中,獲取到response的處理句柄——ResponseHandler。關(guān)于clientHandlers,之前在介紹TransportService.sendRequest方法時,介紹過了。下面結(jié)合此部分代碼,重新回憶和梳理一下request的發(fā)送和響應(yīng)流程:
- 發(fā)送方構(gòu)建Request,在提交Request的同時,還需要提供responseHandler的響應(yīng)信息回調(diào)處理句柄對象。
- 發(fā)送方將構(gòu)建的Request對象和responseHandler句柄傳遞給TransportService的sendRequest方法。
- TransportService的sendRequest方法首先給request分配一個requestId,然后將requestId和responseHandler已key-value對的方式存儲到clientHandlers容器中。隨后,sendRequest調(diào)用transport成員變量的sendRequest方法執(zhí)行數(shù)據(jù)發(fā)送操作。
- 接收方接收到request,進(jìn)行處理,并返回response。(這部分操作在下面會進(jìn)一步描述)。然后通過請求的通道(channel)將response返回。
- 發(fā)送方通過Netty框架完成接收數(shù)據(jù)包的處理,根據(jù)數(shù)據(jù)包的status字段,判斷這是一個Response,然后調(diào)用MessageChannelHandler的相應(yīng)函數(shù)進(jìn)行處理。MessageChannelHandler最終通過調(diào)用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根據(jù)requestId查找到該response對應(yīng)的handler處理句柄對象。
- 調(diào)用handler的handleResponse方法進(jìn)行返回結(jié)果的處理。根據(jù)handler的執(zhí)行線程選擇,可能在數(shù)據(jù)接收線程里面直接進(jìn)行處理,也可能在線程池調(diào)用線程進(jìn)行處理。
下面,首先對上述第5步的數(shù)據(jù)包接收處理過程進(jìn)行詳細(xì)描述。
TransportService.Adapter.getRequestHandler方法

代碼很簡單,就是直接調(diào)用requestHandlers的get方法。requestHandlers也是個map,key值是action,value值是RequestHandlerRegistry,這個Registry中包含相應(yīng)消息的hander句柄對象。
那么requestHandlers是由誰構(gòu)建的呢,這個requestHandler是在系統(tǒng)啟動時,由各個消息相應(yīng)的Action對象通過調(diào)用registerRequestHandler方法,注冊到TransportService中的。整個ElasticSearch各個模塊,其中大量功能是需要用到節(jié)點間通訊的。因此,ElasticSearch各個模塊均會調(diào)用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction為例進(jìn)行說明,代碼如下:

從代碼中可以看到,SearchServiceTransportAction中注冊了大量不同類型的Request的處理句柄。
MessageChannelHandler的messageReceived方法
MessageChannelHandler作為Netty的Decoder的實現(xiàn)類。需要重載messageReceived方法。在該方法中,根據(jù)消息的status信息,來決定如何對消息進(jìn)行處理。具體代碼如下:

可以看到,實際處理消息內(nèi)容的函數(shù)有如下幾個:
- handleRequest
- handlerResponseError
- handleResponse
以上這三個函數(shù)均是MessageChannelHandler的proteted方法。根據(jù)上面的業(yè)務(wù)邏輯,數(shù)據(jù)包的status標(biāo)志位中,只有response才會出現(xiàn)error的情況。
handleRequest函數(shù)
handleRequest函數(shù)的代碼如下圖所示:

上述代碼主要有三個需要注意的地方,已經(jīng)在上面的代碼中通過紅色方框標(biāo)出
獲取requestHandler
通過調(diào)用transportServiceAdapter.getRequestHandler方法實現(xiàn),這部分代碼在前面介紹transportServiceAdpater成員變量的時候,已經(jīng)進(jìn)行了較為詳細(xì)的說明。
執(zhí)行request的消息處理函數(shù)
從代碼上看,是根據(jù)request的處理句柄對象的執(zhí)行方式設(shè)定來決定是在當(dāng)前線程(Netty的消息處理線程)中進(jìn)行消息處理還是在特定的線程池中完成消息處理。
異常信息返回
如果在request消息處理過程中發(fā)生異常,則調(diào)用transportChannel.sendResponse(Throwable e)方法將錯誤信息返回給request請求節(jié)點。
handleResponse函數(shù)
handleResponse函數(shù)的代碼如下圖所示:

handleResponse方法中并無特殊需要注意的代碼。大致邏輯與handleRequest相同。