應網(wǎng)友要求,周末看了一下ClusterService這個類,梳理了一下;看看ES是如何響應和處理一個clusterStateChange的。
什么是ClusterState
在我看來,ClusterState就是整個ES進程的所有狀態(tài)的邏輯封裝,里面包含了各式各樣的功能的Settings和Configs,每個nodes最終都需要維護一個一致的ClusterState屬性。在5.0 以后,為了節(jié)省網(wǎng)絡帶寬,ES允許相鄰的兩個版本可以只發(fā)送 diff 數(shù)據(jù),而不需要每次都發(fā)送整個全量的clusterState。

ClusterService的成員變量不多,重點關注幾個它和外部溝通的變量:

這里有三類比較重要的變量
-
clusterStatePublisher是用來把本機產(chǎn)生的clusterStateChange事件發(fā)布到所有nodes 用 -
stateAppliers當產(chǎn)生一個新event時,這些消費者會消費這些changeEvent -
clusterStateListener當完成一個state 的更新后,觸發(fā)listener通知其改變了
另外還有三個同名的方法,如下圖,用于給外部調(diào)用提交一個更新事件。

ClusterService::SubmitStateUpdateTask()
調(diào)用鏈從這里開始,如果ES希望去發(fā)布一個changeEvent 那么就需要調(diào)用這三個同名方法之一;那么誰會去觸發(fā)一個changeEvent呢?那就多著了,比如Mapping 改變了,shard掛了,有新節(jié)點加進來,and so on; 有興趣的點擊調(diào)用鏈自己去看一下,這里不再贅述。

在
submitStateUpdateTask() 方法里,會根據(jù)入?yún)⒌?code>config、executor、task 等,根據(jù)優(yōu)先級等因素最后用一個UpdateTask類來封裝并存入隊列。UpdateTask 主要起的就是一個調(diào)度作用。最后還是回調(diào)了ClusterService的runTasks() 方法。
ClusterService::runTasks()
runTasks 做了一些基本校驗之后,就進入方法publishAndAppliyChanges()方法,這是一個非常核心的方法,不過做的事情是比較簡單的:
- 根據(jù)最新狀態(tài)重新確定和節(jié)點的連接

- 如果其本身就是一個Master節(jié)點,那么就需要把這個
changeState推送到所有節(jié)點去,其中的ClusterStatePublisher下面會介紹
-
更新集群配置
- 通知所有的appliers 去應用這個變更,
ClusterStateAppliers也會稍后介紹
-
應用變更
-
通知listeners 變更
ClusterService ::clusterStatePublisher
ClusterService 本身是沒有集群其他節(jié)點信息的,因此如果它是一個Master,并且它要把一個stateEvent發(fā)布出去,唯有通過ZenDiscovery,那么很簡單,它只需要把ZenDiscovery:publish()掛進來就可以了,初始化代碼在Node.java的初始化部分

Publish()的一句核心語句就是

PublishClusterStateAction
publishClusterState 是一個PublishClusterStateAction ,顧名思義它主要負責和各個節(jié)點間的clusterChangeEvent的發(fā)送和接受任務;
首先留意一下在什么情況下發(fā)送的是diffState,什么情況下發(fā)送fullState到其他節(jié)點(和ES版本有關),相對應的兩個方法是sendClusterStateDiff()和sendClusterStateFull()
在PublishClusterStateAction里最總要的兩對方法就是
-
sendClusterStateToNode()和handleIncomingClusterStateRequest() -
sendCommitToNode()和
handleIncomingClusterStateRequest()
那這個是什么意思呢?還記得嗎,ES的Master要向所有node發(fā)送一個狀態(tài)變更的時候,需要有兩個過程
- 調(diào)用
sendClusterStateToNode()向所有nodes發(fā)送這個狀態(tài)變更的通知,其他節(jié)點接收到的話調(diào)用handleIncomingClusterStateRequest()來處理這個事件,這時它僅僅把這個事件存放在queue里而不是立刻應用這個狀態(tài)變更,因為它需要等待Master的命令,況且,這個事件的前面說不準還有一堆的狀態(tài)變更還沒有響應的,所以其實它什么也沒做,回復一個確認 - Master會調(diào)用
sendCommitToNode()到所有的nodes ,當然,之前它必須得到過半數(shù)的確認說收到這個變更才行,那么所有收到這個請求的nodes就會調(diào)用handleIncomingClusterStateRequest()去消費這個變更,注意在此之前這些nodes需要消費完queue之前的所有變更才行。而Master會啟動一個timer來等待Response。
仔細閱讀上面4個方法的代碼,就是做上面兩件事情的,注意代碼里仍然有對diff 和full State的判斷。
ClusterService::clusterStateAppliers 和ClusterService::clusterStateListeners
其實這兩個都是一種listener,需要感知各種狀態(tài)變更的modules都需要往這里來注冊自己的appliers或者listeners 來消費這些狀態(tài)變更,至于他們之間的差別,主要就是前者是在應用之前需要做些什么,后者是應用了變更之后需要做些什么,有興趣的可以點進去這些調(diào)用棧去看什么modules 注冊了這些事件。

至此整個ClusterService 處理ClusterState 的調(diào)用鏈就基本走了一遍了,回顧一下我畫了一張圖來幫助記憶





