Nacos Server 源碼分析

版本

Nacos 2.5.1

啟動(dòng)方式

console模塊下的com.alibaba.nacos.Nacos,啟動(dòng)spring應(yīng)用
使用Intellij IDEA啟動(dòng)時(shí),可以使用VM options-Dnacos.standalone=true,啟動(dòng)單機(jī)模式

配置文件位于console/src/main/resources/application.properties
sql建表語(yǔ)句位于config/src/main/resources/META-INF/mysql-schema.sql

開發(fā)的集群模式啟動(dòng)
application.properties中配置集群列表"nacos.member.list=192.168.3.10:8648,192.168.3.10:8748,192.168.3.10:8848"
注意不要寫127.0.0.1

啟動(dòng)命令VM options填寫"-Dserver.port=8848
-Dnacos.home=E:\Gits\nacos\nacos-cluster-0"

nacos.home的相關(guān)代碼位于com.alibaba.nacos.sys.env.EnvUtil#getNacosHome

默認(rèn)端口號(hào)

端口 與主端口的偏移量 描述
8848 0 Nacos HTTP API 端口,用于Nacos AdminAPI及HTTP OpenAPI的訪問(wèn)
9848 1000 客戶端gRPC請(qǐng)求服務(wù)端端口,用于客戶端向服務(wù)端發(fā)起連接和請(qǐng)求
9849 1001 服務(wù)端gRPC請(qǐng)求服務(wù)端端口,用于服務(wù)間同步等
7848 -1000 Jraft請(qǐng)求服務(wù)端端口,用于處理服務(wù)端間的Raft相關(guān)請(qǐng)求

端口 與主端口的偏移量 描述

8080 獨(dú)立配置 Nacos控制臺(tái)端口,訪問(wèn)Nacos控制臺(tái)及Nacos控制臺(tái)的API

啟動(dòng)流程

com.alibaba.nacos.core.code.SpringApplicationRunListener調(diào)用StartingApplicationListener加了新的配置源EnvUtil.getApplicationConfFileResource()

GrpcClusterClient#connectToServer啟動(dòng)連接其它nacos服務(wù)實(shí)例的grpc端口。
另一種實(shí)現(xiàn)GrpcSdkClient#connectToServer是nacos sdk客戶端去連接nacos服務(wù)端的grpc端口。這里包含了根據(jù)8848計(jì)算其它端口號(hào)的邏輯。

NotifyCenter & Event

NotifyCenter負(fù)責(zé)發(fā)布事件,由Subscriber響應(yīng)事件并處理。EventPublisher居中負(fù)責(zé)事件的轉(zhuǎn)發(fā)。
每個(gè)Event類型對(duì)應(yīng)一個(gè)EventPublisher。nacos的事件響應(yīng)都是異步的,跨線程的。
閱讀代碼時(shí),根據(jù)事件的類名,尋找對(duì)應(yīng)處理的Subscriber即可,Subscriber#onEvent為響應(yīng)函數(shù)入口。

事件發(fā)布

com.alibaba.nacos.common.notify.NotifyCenter#publishEvent(com.alibaba.nacos.common.notify.Event) 靜態(tài)函數(shù)

  1. 如果事件是慢事件SlowEvent,則使用INSTANCE.sharePublisher發(fā)布事件
  2. NotifyCenter#publisherMap中,以事件類名為key,獲取對(duì)應(yīng)的EventPublisher。
    然后調(diào)用publisher.publish(event);執(zhí)行EventPublisher內(nèi)的預(yù)置邏輯

EventPublisher

  1. 可以使用NotifyCenter#registerToPublisher()函數(shù),使用NotifyCenter#DEFAULT_PUBLISHER_FACTORY,默認(rèn)的工廠類,構(gòu)建默認(rèn)的EventPublisher實(shí)現(xiàn)DefaultPublisher。之后再通過(guò)NotifyCenter#registerSubscriber()函數(shù)單獨(dú)注冊(cè)Subscriber

  2. 可以使用NotifyCenter#registerSubscriber()函數(shù),入?yún)⒅刑峁?code>Subscriber。
    入?yún)⒁部梢蕴峁?code>EventPublisher的工廠函數(shù),如果不提供,則使用默認(rèn)的工廠類,構(gòu)建默認(rèn)的DefaultPublisher。

SubscriberSmartSubscriber的區(qū)別就是Subscriber的接口只能指定一個(gè)關(guān)聯(lián)的Event類型,而SmartSubscriber的新接口可以指定多個(gè)。

這塊感覺設(shè)計(jì)的有些混亂,有些可以通過(guò)Subscriber的接口知道他關(guān)聯(lián)哪些Event。有些卻需要再注冊(cè)Subscriber時(shí)再指定關(guān)聯(lián)哪些Event。

DefaultPublisher

DefaultPublisher 是一個(gè)單獨(dú)的線程,初始化時(shí)傳入隊(duì)列長(zhǎng)度參數(shù),初始化啟動(dòng)線程。

接受事件時(shí)將事件加入隊(duì)列。
線程一刻不停的從隊(duì)列中取出事件,遍歷所有注冊(cè)進(jìn)來(lái)的Subscriber,使用Subscriber#executor異步執(zhí)行Subscriber#onEvent事件。

詳細(xì)過(guò)程是線程從隊(duì)列中取出事件,交由執(zhí)行receiveEvent(event);。
receiveEvent(event);中,遍歷DefaultPublisher#subscribers中所有注冊(cè)進(jìn)來(lái)的的Subscriber,Subscriber#scopeMatches 確認(rèn)事件匹配當(dāng)前Subscriber。
通過(guò)Event#sequence確保事件有序執(zhí)行。
最終使用Subscriber#executor中的線程異步執(zhí)行Subscriber#onEvent事件,如果Subscriber#executor為null,則使用當(dāng)前DefaultPublisher的線程執(zhí)行Subscriber#onEvent。

其它 EventPublisher

NamingEventPublisher
TraceEventPublisherFactory
邏輯差不太多,區(qū)別點(diǎn)主要是一個(gè)EventPublisher,可以支持多種Event。內(nèi)部通過(guò)一個(gè)map,key為Event,value為Subscriber集合。

接受請(qǐng)求

配置列表 模糊查詢

fuzzySearchConfig:419, ConfigController (com.alibaba.nacos.config.server.controller)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:205, InvocableHandlerMethod (org.springframework.web.method.support)
invokeForRequest:150, InvocableHandlerMethod (org.springframework.web.method.support)
invokeAndHandle:117, ServletInvocableHandlerMethod (org.springframework.web.servlet.mvc.method.annotation)
invokeHandlerMethod:903, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handleInternal:809, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handle:87, AbstractHandlerMethodAdapter (org.springframework.web.servlet.mvc.method)
doDispatch:1072, DispatcherServlet (org.springframework.web.servlet)
doService:965, DispatcherServlet (org.springframework.web.servlet)
processRequest:1006, FrameworkServlet (org.springframework.web.servlet)
doGet:898, FrameworkServlet (org.springframework.web.servlet)
service:529, HttpServlet (javax.servlet.http)
service:883, FrameworkServlet (org.springframework.web.servlet)
service:623, HttpServlet (javax.servlet.http)
internalDoFilter:199, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:51, WsFilter (org.apache.tomcat.websocket.server)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:42, XssFilter (com.alibaba.nacos.console.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:91, CorsFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:84, ParamCheckerFilter (com.alibaba.nacos.core.paramcheck)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:109, NacosHttpTpsFilter (com.alibaba.nacos.core.control.http)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:69, AuthFilter (com.alibaba.nacos.core.auth)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:67, NacosWebFilter (com.alibaba.nacos.config.server.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:218, FilterChainProxy (org.springframework.security.web)
doFilter:190, FilterChainProxy (org.springframework.security.web)
invokeDelegate:354, DelegatingFilterProxy (org.springframework.web.filter)
doFilter:267, DelegatingFilterProxy (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:100, RequestContextFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:93, FormContentFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:96, WebMvcMetricsFilter (org.springframework.boot.actuate.metrics.web.servlet)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:201, CharacterEncodingFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:57, HttpRequestContextFilter (com.alibaba.nacos.core.context.remote)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
invoke:168, StandardWrapperValve (org.apache.catalina.core)
invoke:90, StandardContextValve (org.apache.catalina.core)
invoke:482, AuthenticatorBase (org.apache.catalina.authenticator)
invoke:130, StandardHostValve (org.apache.catalina.core)
invoke:93, ErrorReportValve (org.apache.catalina.valves)
invoke:74, StandardEngineValve (org.apache.catalina.core)
invoke:660, AbstractAccessLogValve (org.apache.catalina.valves)
service:346, CoyoteAdapter (org.apache.catalina.connector)
service:396, Http11Processor (org.apache.coyote.http11)
process:63, AbstractProcessorLight (org.apache.coyote)
process:937, AbstractProtocol$ConnectionHandler (org.apache.coyote)
doRun:1791, NioEndpoint$SocketProcessor (org.apache.tomcat.util.net)
run:52, SocketProcessorBase (org.apache.tomcat.util.net)
runWorker:1190, ThreadPoolExecutor (org.apache.tomcat.util.threads)
run:659, ThreadPoolExecutor$Worker (org.apache.tomcat.util.threads)
run:63, TaskThread$WrappingRunnable (org.apache.tomcat.util.threads)
run:748, Thread (java.lang)

HttpRequestContextFilter
ThreadLocal RequestContext中填入http請(qǐng)求等信息,RequestContext類是nacos自定義的類。

AuthFilter
鑒權(quán)功能,@Secured注解相關(guān)鑒權(quán)功能

ParamCheckerFilter
@ExtractorManager.Extractor注解解析請(qǐng)求參數(shù),使用AbstractParamChecker對(duì)請(qǐng)求參數(shù)進(jìn)行通用的檢查。

XssFilter
response設(shè)置"Content-Security-Policy"header,xss相關(guān)

NacosWebFilter
"/v1/cs/*"專屬的filter
request設(shè)置encoding UTF-8,response設(shè)置content type header 為 json

NacosHttpTpsFilter
"/v1/ns/", "/v2/ns/", "/v1/cs/", "/v2/cs/"專屬的filter
@Secured注解相關(guān)鑒權(quán)功能
應(yīng)該是tps流量控制插件,但是默認(rèn)實(shí)現(xiàn)是無(wú)管控

查詢配置

比如配置列表的模糊查詢接口ConfigController#fuzzySearchConfig,調(diào)用ConfigInfoPersistService對(duì)數(shù)據(jù)庫(kù)進(jìn)行查詢。

默認(rèn)是derby數(shù)據(jù)庫(kù),通過(guò)EmbeddedConfigInfoPersistServiceImpl實(shí)現(xiàn)類查詢,
內(nèi)部邏輯就是根據(jù)參數(shù)組裝sql,調(diào)用PaginationHelper.fetchPageLimit。然后是BaseDatabaseOperate,使用JdbcTemplate對(duì)數(shù)據(jù)庫(kù)進(jìn)行查詢,先查count,再查列表數(shù)據(jù)。

配置為mysql后使用的是ExternalConfigInfoPersistServiceImpl實(shí)現(xiàn)類,與derby的流程一致,唯一區(qū)別是PaginationHelper.fetchPageLimit內(nèi)直接調(diào)用JdbcTemplate對(duì)數(shù)據(jù)庫(kù)進(jìn)行查詢。

獲取配置詳情接口也是普通的查詢mysql

發(fā)布修改配置

com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig
先是一些aop

CapacityManagementAspect
檢查配置文件大小是否超過(guò)限制,默認(rèn)關(guān)閉,不檢查。
大小限制數(shù)據(jù)來(lái)自于group_capacity tenant_capacity表,有rest接口可以寫入數(shù)據(jù),但沒看到webui界面有相關(guān)設(shè)置。

ConfigChangeAspect
為config change plugin提供aop接入點(diǎn),默認(rèn)沒有插件

RequestLogAspect
MetricsMonitor統(tǒng)計(jì)publish次數(shù),就寫config耗時(shí)

業(yè)務(wù)邏輯:

  1. 修改mysql內(nèi)的配置時(shí),使用事務(wù)
    事務(wù)內(nèi)進(jìn)行更新配置,插入配置歷史(his_config_info表)兩個(gè)操作
    其中更新配置語(yǔ)句是UPDATE config_info SET ... WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')
    where語(yǔ)句額外使用md5作為判斷條件。md5是根據(jù)配置文件內(nèi)容計(jì)算出來(lái)的,web界面打開配置時(shí)會(huì)從后端得到當(dāng)前配置的md5,調(diào)用配置更新接口時(shí)會(huì)傳入就舊配置的md5,這樣后端可以確保這期間沒有其他人修改過(guò)這個(gè)配置文件。

  2. 發(fā)布配置修改事件ConfigDataChangeEvent
    NotifyCenter.publishEvent(event);進(jìn)行事件發(fā)布,NotifyCenter會(huì)找到該消息對(duì)應(yīng)的DefaultPublisher線程異步處理事件
    默認(rèn)ConfigDataChangeEvent事件有兩個(gè)監(jiān)聽器AsyncNotifyServiceExternalDumpService。我們先向下看,后面再描述這個(gè)異步流程。

  3. 緊接著的同步操作是向"com.alibaba.nacos.config.traceLog"logger中寫入操作日志,這里只記錄日志內(nèi)容的md5,操作者ip等,沒有日志的具體內(nèi)容

  4. 繼續(xù)講解ConfigDataChangeEvent事件的異步操作

其實(shí)此事件就是讓nacos集群的所有節(jié)點(diǎn)執(zhí)行ExternalDumpService#dump。本地會(huì)監(jiān)聽該事件,執(zhí)行ExternalDumpService#dump。AsyncNotifyService是向其它nacos節(jié)點(diǎn)發(fā)送grpc消息,讓其他節(jié)點(diǎn)執(zhí)行ExternalDumpService#dump。

AsyncNotifyService
MetricsMonitor統(tǒng)計(jì)配置變更次數(shù)
ServerMemberManager獲取其它nacos實(shí)例地址
com.alibaba.nacos.config.server.utils.ConfigExecutor#executeAsyncNotify再次進(jìn)行異步執(zhí)行
通過(guò)grpc向nacos內(nèi)的其他節(jié)點(diǎn)發(fā)送ConfigChangeClusterSyncRequest消息。
grpc服務(wù)定義于nacos_grpc_service.proto文件內(nèi),是一個(gè)通用的消息服務(wù),
其它nacos節(jié)點(diǎn)的GrpcRequestAcceptor是處理該消息的服務(wù)實(shí)現(xiàn)類。GrpcRequestAcceptor#request內(nèi)部進(jìn)行消息分發(fā),最終由ConfigChangeClusterSyncRequestHandler#handle處理,這里的內(nèi)部也是調(diào)用ExternalDumpService#dump進(jìn)行處理。

ExternalDumpService#dump
TaskManager增加新的任務(wù)DumpTask,再次進(jìn)行異步處理。
名為"com.alibaba.nacos.server.DumpTaskManager"的TaskManager.ProcessRunnable線程處理這些任務(wù)。任務(wù)key為"dataid+groupid+namespaceid"。根據(jù)任務(wù)key選擇Processor,這里會(huì)選擇默認(rèn)Processor。任務(wù)交由com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process處理。
DumpProcessor#process先去庫(kù)中查詢最新的配置信息,然后調(diào)用com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump直接處理填充好的ConfigDumpEvent事件。com.alibaba.nacos.config.server.service.ConfigCacheService#dump
如果內(nèi)存中該配置文件的md5發(fā)生了變化,將會(huì)
寫入本地文件緩存,位于"{nacos.home}/data/config-data/"下
更新內(nèi)存中緩存的md5值

NotifyCenter發(fā)布LocalDataChangeEvent事件,事件對(duì)應(yīng)的EventPublisher中有LongPollingServiceRpcConfigChangeNotifier兩個(gè)subscriber。
轉(zhuǎn)到對(duì)應(yīng)的EventPublisher線程,開始執(zhí)行兩個(gè)subscriber。

LongPollingService
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey));
通知LongPolling的客戶端配置有變化。舊版本使用長(zhǎng)輪詢的方式,這里會(huì)拿到空的客戶端列表,也就沒有任何操作。

RpcConfigChangeNotifier
在本機(jī)查找關(guān)心這個(gè)配置文件的所有nacos客戶端,
RpcPushTask中,向客戶端異步發(fā)送ConfigChangeNotifyRequest請(qǐng)求,ConfigChangeNotifyRequest只是一個(gè)普通Java類,經(jīng)由GrpcUtils.convert()函數(shù)轉(zhuǎn)化為Payloadgrpc消息。這個(gè)消息只發(fā)送配置文件名稱,沒有具體配置內(nèi)容。如果推送失敗,會(huì)使用指數(shù)退避的時(shí)間持續(xù)重試,直到達(dá)到重試次數(shù)。

發(fā)布灰度配置邏輯與上面的類似,一個(gè)事務(wù)內(nèi),插入config_info_gray表,插入his_config_info表。事務(wù)成功后,發(fā)布ConfigDataChangeEvent事件。然后發(fā)布LocalDataChangeEvent事件,向所有監(jiān)聽這個(gè)配置文件的nacos客戶端推送ConfigChangeNotifyRequest。

表結(jié)構(gòu)

  1. config_info表
    主鍵id
    tenant_id,groupid,dataid 一起是一個(gè)唯一索引,其中tenant_id就是namespace

配置文件直接原封不動(dòng)存入content字段
md5判斷表內(nèi)容是否有變更
插入時(shí)唯一索引避免重復(fù)插入
變更時(shí)where語(yǔ)句加上md5字段,確保只有一個(gè)人變更成功,同時(shí)舊配置插入歷史記錄表his_config_info。

  1. config_info_gray 表
    主鍵id
    data_id,group_id,tenant_id,gray_name 一起是一個(gè)唯一索引,其中tenant_id就是namespace

nacos 集群

集群模式下,即使只啟動(dòng)一臺(tái)服務(wù)器,客戶端連接這一臺(tái)服務(wù)器也可以正常變更配置。對(duì)于配置中心而言,nacos的各個(gè)實(shí)例就像是無(wú)狀態(tài)的實(shí)例,可以任意擴(kuò)縮容,沒有其他影響。不過(guò)太多的話會(huì)增加mysql的負(fù)載,因?yàn)閷?shí)例會(huì)不停地輪詢mysql的用戶角色表,配置歷史,配置灰度表,確保數(shù)據(jù)緩存與mysql一致。

局限

灰度發(fā)布 beta發(fā)布

每個(gè)配置文件,只能同時(shí)存在一種beta。不過(guò)看后端代碼應(yīng)該是支持多個(gè)灰度,但是web頁(yè)面不支持。
beta區(qū)分實(shí)例是能到ip地址,一個(gè)ip起兩個(gè)服務(wù)無(wú)法區(qū)分,同時(shí)生效
灰度發(fā)布生效期間,無(wú)法更改主配置,無(wú)法再次修改beta配置

服務(wù)注冊(cè)中心

核心邏輯簡(jiǎn)要概述

nacos允許一個(gè)客戶端實(shí)例注冊(cè)多個(gè)微服務(wù),即一個(gè){ip:port}可以提供多個(gè)服務(wù)
ServiceStorage#serviceDataIndexes相當(dāng)于一個(gè)緩存。獲取服務(wù)信息及實(shí)例列表時(shí),直接從緩存中返回?cái)?shù)據(jù)。
新的微服務(wù)實(shí)例注冊(cè)或是取消注冊(cè)時(shí),主動(dòng)更新這個(gè)緩存。

nacos客戶端在調(diào)用其他微服務(wù)時(shí),需要關(guān)心這一微服務(wù)的實(shí)例列表變化。通過(guò)SubscribeServiceRequestgrpc消息告知某一個(gè)nacos server實(shí)例,它需要訂閱這個(gè)服務(wù)的實(shí)例列表變化信息。這一個(gè)nacos server實(shí)例會(huì)在將來(lái)向這個(gè)nacos 客戶端推送NotifySubscriberRequest消息,包含微服務(wù)的實(shí)例列表。
所以每個(gè)nacos server實(shí)例只會(huì)通知部分nacos客戶端,就是建立了推送消息連接的那部分nacos客戶端。

nacos客戶端發(fā)送InstanceRequestgrpc消息向某一個(gè)nacos server實(shí)例注冊(cè)當(dāng)前微服務(wù)實(shí)例。nacos server實(shí)例在本機(jī)記錄好新實(shí)例信息后,會(huì)通過(guò)DistroDataRequestgrpc消息,distro協(xié)議廣播至其它全部nacos server實(shí)例。這樣所有的nacos server實(shí)例都有了這個(gè)新的微服務(wù)實(shí)例信息了。然后每個(gè)nacos server實(shí)例主動(dòng)更新ServiceStorage#serviceDataIndexes這個(gè)緩存。
然后每個(gè)nacos server實(shí)例會(huì)向在各自nacos server實(shí)例訂閱的nacos客戶端推送微服務(wù)實(shí)例列表變化消息。
比如nacos客戶端A在nacos server實(shí)例1上訂閱了消息,nacos客戶端B在nacos server實(shí)例2上訂閱了消息。那么nacos server實(shí)例1向nacos客戶端A推送變化消息,nacos server實(shí)例2向nacos客戶端B推送變化消息,

接下來(lái)先陳述一些重要的數(shù)據(jù)結(jié)構(gòu),它們用于微服務(wù),微服務(wù)實(shí)例列表,訂閱信息的內(nèi)存查詢與內(nèi)存存儲(chǔ)

數(shù)據(jù)結(jié)構(gòu)

  1. POJO

1.1 Service
表示一個(gè)微服務(wù),僅包含微服務(wù)的最基本的標(biāo)識(shí)信息,常用于map中的key

  • namespace group name 三個(gè)字段一起確定一個(gè)微服務(wù),ServiceequalshashCode只和這三個(gè)字段有關(guān)。

1.2 ServiceInfo
表示一個(gè)微服務(wù)的詳細(xì)信息

  • 包含 namespace group name 三個(gè)字段
  • List<Instance> hosts
    表示該服務(wù)的實(shí)例列表

1.3 Instance
表示一個(gè)微服務(wù)實(shí)例的詳細(xì)信息

  • 包含ip port metadata等

1.4 ConnectionBasedClient
表示一個(gè)nacos client實(shí)例
與微服務(wù)實(shí)例的區(qū)別是,因?yàn)閚acos支持一個(gè)客戶端實(shí)例注冊(cè)多個(gè)微服務(wù),所以一個(gè)ConnectionBasedClient可以關(guān)聯(lián)多個(gè)Instance。不過(guò)要注意的是ConnectionBasedClientInstance只是邏輯上有關(guān)聯(lián),代碼中沒有關(guān)聯(lián)在一起,代碼中使用InstancePublishInfo來(lái)表示微服務(wù)實(shí)例信息。

  • String connectionId
    表示該nacos client實(shí)例的id,默認(rèn)由時(shí)間戳 ip port拼接而成,見AddressTransportFilter#transportReadyGrpcServerConstants#ATTR_TRANS_KEY_CONN_ID相關(guān)代碼。
  • <Service, InstancePublishInfo> publishers
    表示該nacos client實(shí)例關(guān)聯(lián)的多個(gè)服務(wù)Service,以及每個(gè)服務(wù)對(duì)應(yīng)的微服務(wù)實(shí)例信息InstancePublishInfo。
  • <Service, Subscriber> subscribers
    表示該nacos client實(shí)例訂閱的多個(gè)服務(wù)Service,以及訂閱信息Subscriber。一般是該實(shí)例會(huì)調(diào)用其它服務(wù),才會(huì)訂閱那個(gè)服務(wù)的實(shí)例列表變化。

1.5 InstancePublishInfo
表示一個(gè)微服務(wù)實(shí)例的信息。
Instance邏輯上的信息大致一致,但InstancePublishInfo僅作為ConnectionBasedClient這類Client類的屬性字段使用。重點(diǎn)表示某一個(gè)Client對(duì)外提供微服務(wù)時(shí)的實(shí)例信息。在有一些細(xì)微的區(qū)別,比如Instance可以用于對(duì)外提供實(shí)例的全部信息,比如Instance#ephemeral表示該實(shí)例是臨時(shí)實(shí)例還是永久實(shí)例。但InstancePublishInfo沒有ephemeral字段,因?yàn)?code>InstancePublishInfo總是屬于ConnectionBasedClient這類Client類的屬性,而ephemeral是Client的屬性,Client才會(huì)帶著ephemeral信息。

  • 包含ip port 等信息

1.6 Subscriber
InstancePublishInfo類似,僅作為ConnectionBasedClient這類Client類的屬性字段使用。重點(diǎn)表示某一個(gè)Client需要調(diào)用其它微服務(wù)時(shí),需要訂閱該微服務(wù)的實(shí)例列表變化事件。

  • 包含ip port 等信息
  1. 核心邏輯所在的類

2.1 ServiceStorage
通過(guò)Service查詢?cè)摲?wù)的詳細(xì)信息及實(shí)例列表信息。
相當(dāng)于一個(gè)查詢緩存,查詢時(shí)直接查出數(shù)據(jù),不需要再?gòu)母鱾€(gè)Client類中收集實(shí)例列表。
實(shí)例注冊(cè)等處會(huì)拋出ServiceEvent.ServiceChangedEvent事件,事件監(jiān)聽器會(huì)更新這一緩存。

  • ServiceStorage#getPushData
    ServiceManagerClientServiceIndexesManager,ClientManager等處構(gòu)建出該服務(wù)的詳細(xì)信息及實(shí)例列表信息,記錄在serviceDataIndexesmap中,然后返回。
    相當(dāng)于該緩存的數(shù)據(jù)更新操作。一般在服務(wù)實(shí)例注冊(cè),去注冊(cè)時(shí)主動(dòng)調(diào)用。

  • ServiceStorage#getData
    serviceDataIndexesmap中讀取服務(wù)的詳細(xì)信息及實(shí)例列表信息。如果map中存在,直接返回map數(shù)據(jù),如果不存在,通過(guò)getPushData構(gòu)建緩存并返回。
    相當(dāng)于讀緩存的操作。如果緩存沒有則更新緩存。

2.2 ClientServiceIndexesManager 通過(guò)Service查詢相關(guān)ClientId列表
這是一個(gè)為了方便查詢而存在的索引類。
publisherIndexes 存儲(chǔ)了所有 Service 及其對(duì)應(yīng)的 實(shí)例列表所在的clientId集合
subscriberIndexes 存儲(chǔ)了 Service 及訂閱這個(gè)服務(wù)的clientId集合,但需要注意的是只存儲(chǔ)了與當(dāng)前nacos server建立推送連接的nacos client。所以nacos server實(shí)例的subscriberIndexes合在一起才是所有的訂閱client全集。但是每個(gè)nacos server實(shí)例只存儲(chǔ)了一部分訂閱client,因?yàn)橐粋€(gè)nacos client只與一個(gè)nacos server實(shí)例建立服務(wù)器推送連接,所以只有那個(gè)建立連接的nacos server實(shí)例才會(huì)推送訂閱變化消息。

2.3 ConnectionBasedClientManager 通過(guò)ClientId查詢具體Client詳細(xì)信息

以上均為@Component注解的單例Bean。

2.2 ServiceManager
通過(guò)Service查詢Service。
這里的Service不包含服務(wù)的詳細(xì)信息,也不包含服務(wù)的實(shí)例列表信息。
這個(gè)類不是@Component注解的Bean,但依舊是單例。

  • ServiceManager.getInstance()
    靜態(tài)方法獲取單例

  • ServiceManager#containSingleton()
    可以查詢服務(wù)是否存在。

  • ServiceManager#getSingleton()
    通過(guò)Service查詢Service。查詢時(shí)只需要提供namespace group name字段的Service,但查出來(lái)的Service還包含了ephemeral revision等信息。
    如果Service不存在,則添加這個(gè)Service。

2.5 核心Component類的map字段

字段名稱 字段類型 描述
ServiceStorage#serviceDataIndexes Map<Service, ServiceInfo> ServiceInfo包含了所有服務(wù)詳細(xì)信息以及全部實(shí)例列表信息
ServiceManager#singletonRepository Map<Service, Service> 根據(jù)Service查詢Service,具體見本節(jié)上文
ClientServiceIndexesManager#publisherIndexes Map<Service, Set<String>> value為 String 類型的 clientId 集合,包含了所有服務(wù)及其實(shí)例ClientId
ClientServiceIndexesManager#subscriberIndexes Map<Service, Set<String>> value為 String 類型的 clientId 集合。包含了部分服務(wù)及訂閱的實(shí)例ClientId,是與當(dāng)前nacos server實(shí)例建立推送連接的那部分Client
ConnectionBasedClientManager#clients Map<String, ConnectionBasedClient> key為 String 類型的 clientId ,ConnectionBasedClient 為實(shí)例信息,包含了所有Client信息,即使是那些沒有與當(dāng)前nacos server建立連接的Client
ConnectionBasedClient#publishers Map<Service, InstancePublishInfo> 該nacos client實(shí)例提供的全部微服務(wù)實(shí)例信息。一個(gè)nacos client實(shí)例可以對(duì)外提供多個(gè)微服務(wù)。key為 Service ,value 為 InstancePublishInfo,是當(dāng)前微服務(wù)實(shí)例的信息
ConnectionBasedClient#subscribers Map<Service, Subscriber>

結(jié)合關(guān)鍵代碼進(jìn)行重要流程簡(jiǎn)介

  1. 一些重要查詢函數(shù)
  • Service singleton = ServiceManager.getInstance().getSingleton(service);
    提供namespace group name字段的Service,查出或添加Service。

  • ServiceStorage#getPushData
    相當(dāng)于該緩存的數(shù)據(jù)更新操作。一般在服務(wù)實(shí)例注冊(cè),去注冊(cè)時(shí)主動(dòng)調(diào)用。

  • ServiceStorage#getData
    相當(dāng)于讀緩存的操作。如果緩存沒有則更新緩存。

  1. 微服務(wù)實(shí)例的注冊(cè)
    InstanceRequest grpc消息,InstanceRequestHandler#handle處理該消息
    2.1 Service singleton = ServiceManager.getInstance().getSingleton(service);
    至此ServiceManager完成Service的添加。

2.2 Client client = clientManager.getClient(clientId); 根據(jù)ClientId查出Client
Instance轉(zhuǎn)為InstancePublishInfo,Client.publishers加入InstancePublishInfo
至此Client新增了實(shí)例信息。

發(fā)布ClientEvent.ClientChangedEvent事件
Client#generateSyncData生成最新的ClientSyncData通過(guò)DistroDataRequestgrpc消息廣播到nacos server所有其他節(jié)點(diǎn)。其他節(jié)點(diǎn)也就收到了這個(gè)新的Client和微服務(wù)實(shí)例信息,幾乎是也走了一遍微服務(wù)實(shí)例的注冊(cè)的流程。

2.3 Client更新lastUpdatedTime, revision字段,發(fā)布ClientOperationEvent.ClientRegisterServiceEvent事件
ClientServiceIndexesManager監(jiān)聽該事件,ClientServiceIndexesManager#publisherIndexes加入Service與對(duì)應(yīng)的clientId。
發(fā)布ServiceEvent.ServiceChangedEvent事件
NamingSubscriberServiceV2Impl#onEvent監(jiān)聽該事件。
直接調(diào)用ServiceStorage#getPushData,從Client等中加載Service及其實(shí)例列表信息,寫入到ServiceStorage#serviceDataIndexes。這里是一處ServiceStorage#serviceDataIndexes的數(shù)據(jù)更新點(diǎn)。
ClientServiceIndexesManager#subscriberIndexes中獲取與當(dāng)前nacos server實(shí)例建立推送連接的,訂閱該服務(wù)的ClientId,向當(dāng)前nacos server實(shí)例中的所有訂閱該服務(wù)的Client推送NotifySubscriberRequest消息,發(fā)送實(shí)例列表。由于ClientEvent.ClientChangedEvent事件會(huì)廣播到所有nacos server實(shí)例,其它實(shí)例內(nèi)部也會(huì)發(fā)布ServiceEvent.ServiceChangedEvent事件,然后向各自實(shí)例的訂閱Client子集推送NotifySubscriberRequest消息。

  1. 查詢微服務(wù)對(duì)應(yīng)的實(shí)例列表
    ServiceQueryRequest grpc消息為例,查詢某一服務(wù)關(guān)聯(lián)的實(shí)例列表

通過(guò)ServiceStorage#getData獲取實(shí)例列表
ServiceStorage#getData是直接從ServiceStorage#serviceDataIndexes這個(gè)Map<Service, ServiceInfo>中獲取服務(wù)實(shí)例列表,直接返回服務(wù)實(shí)例列表信息。
如果ServiceStorage#serviceDataIndexes沒有這個(gè)key,則通過(guò)ServiceStorage#getPushData加載。
先通過(guò)ServiceManager#singletonRepositoryMap<Service, Service>,確認(rèn)Service是否存在,不存在直接返回。
ServiceStorage#getAllInstancesFromIndex加載實(shí)例列表。
ClientServiceIndexesManager#publisherIndexes<Service, Set<String>>加載clientId列表
ConnectionBasedClientManager#clients<String, ConnectionBasedClient>獲取Client
client.AbstractClient#publishers<Service, InstancePublishInfo>獲取每個(gè)Client的實(shí)例地址
至此可以拿到服務(wù)對(duì)應(yīng)的實(shí)例列表,最后放回ServiceStorage#serviceDataIndexes

  1. 訂閱服務(wù)
    客戶端向某個(gè)nacos server實(shí)例訂閱他關(guān)心的服務(wù)實(shí)例列表變化信息。
    SubscribeServiceRequest grpc消息,SubscribeServiceRequestHandler.handle處理該消息
    ServiceStorage#getData獲取服務(wù)及實(shí)例列表并返回客戶端。

查詢到Client,在Client#subscribers中加入這個(gè)Subscriber,發(fā)布ClientSubscribeServiceEvent事件。
ClientServiceIndexesManager響應(yīng)事件,ClientServiceIndexesManager#subscriberIndexes中加入這個(gè)Subscriber。
如果是首次加入,則發(fā)布ServiceEvent.ServiceSubscribedEvent事件。
NamingSubscriberServiceV2Impl#onEvent響應(yīng)這個(gè)事件,向剛剛訂閱的nacos客戶端推送服務(wù)端消息 NotifySubscriberRequest,包含了最新的服務(wù)及實(shí)例列表

所以首次訂閱時(shí),SubscribeServiceRequest訂閱消息會(huì)返回實(shí)例列表,服務(wù)器還會(huì)立刻推送一次NotifySubscriberRequest消息,再次返回實(shí)例列表。

  1. EmptyServiceAutoCleanerV2默認(rèn)每60秒執(zhí)行EmptyServiceAutoCleanerV2#cleanEmptyService

后面開始代碼詳解

查詢接口

服務(wù)實(shí)例列表

GET http://{ip}:{port}/nacos/v1/ns/catalog/services?pageNo=1&pageSize=10&withInstances=true

com.alibaba.nacos.naming.controllers.CatalogController#listDetail 接受請(qǐng)求,后續(xù)全部為內(nèi)存操作

CatalogController#listDetail => CatalogServiceV2Impl#pageListServiceDetail

ServiceManager#namespaceSingletonMaps map中獲取namespace key對(duì)應(yīng)的全部Service列表。
針對(duì)每個(gè)Service
NamingMetadataManager#serviceMetadataMap map中獲取這個(gè)Service對(duì)應(yīng)的metadata
ServiceStorage#serviceDataIndexes map中獲取這個(gè)Service對(duì)應(yīng)的ServiceInfoServiceInfo#hosts記錄了所有的實(shí)例信息。

grpc 接受請(qǐng)求

grpc服務(wù)定義于nacos_grpc_service.proto文件內(nèi),是一個(gè)通用的消息服務(wù),
GrpcRequestAcceptor#request內(nèi)部進(jìn)行消息分發(fā),最終由RequestHandler#handle處理。
RequestHandler的實(shí)現(xiàn)類繼承RequestHandler抽象類時(shí)會(huì)通過(guò)泛型聲明它關(guān)聯(lián)的具體的消息類型

查詢服務(wù)的實(shí)例列表消息 ServiceQueryRequest

與http的服務(wù)實(shí)例列表接口的邏輯大致一致,都是從一個(gè)數(shù)據(jù)源取的數(shù)據(jù)

服務(wù)實(shí)例注冊(cè)消息 InstanceRequest

InstanceRequest
InstanceRequestHandler.handle => InstanceRequestHandler#registerInstance

EphemeralClientOperationServiceImpl#registerInstance

  1. Service singleton = ServiceManager.getInstance().getSingleton(service);
    ServiceManager#getSingleton
    ServiceManager#singletonRepository這個(gè)map中加入當(dāng)前Service。key value均為 <Service, Service>。
    如果原map中沒有,是一個(gè)新的Service,會(huì)發(fā)布MetadataEvent.ServiceMetadataEvent事件。
    ServiceManager#namespaceSingletonMaps中加入服務(wù)信息。他的key是namespace,value是Service Set。

NamingMetadataManager響應(yīng)MetadataEvent.ServiceMetadataEvent事件。僅處理該Service的Metadata的數(shù)據(jù)過(guò)期問(wèn)題。

  1. Client client = clientManager.getClient(clientId);
    獲取到ConnectionBasedClient,也就是當(dāng)前服務(wù)實(shí)例對(duì)應(yīng)的ConnectionBasedClient類對(duì)象。
  1. client.addServiceInstance(singleton, instanceInfo);
    ConnectionBasedClient.publishers內(nèi)加入<Service service, InstancePublishInfo instancePublishInfo> 條目信息。

3.1 發(fā)布ClientEvent.ClientChangedEvent事件

DistroClientDataProcessor#onEvent響應(yīng)。
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
后續(xù)詳細(xì)流程位于DistroProtocol#sync DistroProtocol#onReceive 兩節(jié)
簡(jiǎn)單說(shuō)就是將Client和新加入的微服務(wù)實(shí)例信息廣播到其它nacos server節(jié)點(diǎn)。其他節(jié)點(diǎn)如果沒有相同的微服務(wù)實(shí)例信息,也執(zhí)行一遍與EphemeralClientOperationServiceImpl#registerInstance類似的微服務(wù)實(shí)例注冊(cè)邏輯,代碼位于DistroClientDataProcessor#upgradeClient。其它nacos server的微服務(wù)實(shí)例注冊(cè)邏輯內(nèi)部也會(huì)再次執(zhí)行DistroProtocol#sync進(jìn)行廣播。但是接收方如果已經(jīng)存在該實(shí)例相同的信息時(shí),將不再執(zhí)行微服務(wù)實(shí)例注冊(cè)邏輯,也就不會(huì)繼續(xù)進(jìn)行廣播了。

  1. 發(fā)布 ClientOperationEvent.ClientRegisterServiceEvent

ClientServiceIndexesManager#onEvent響應(yīng)
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addPublisherIndexes

ClientServiceIndexesManager#publisherIndexes加入<Service, Set<clientId>>條目
發(fā)布ServiceEvent.ServiceChangedEvent事件

4.1 NamingSubscriberServiceV2Impl#onEvent 響應(yīng) ServiceEvent.ServiceChangedEvent事件

主要做的事情為

  • ServiceStorage記錄這個(gè)服務(wù)的新實(shí)例信息,包括 ServiceStorage#serviceClusterIndexServiceStorage#serviceDataIndexes。
  • 向訂閱該Service的所有Subscriber推送NotifySubscriberRequest消息

詳細(xì)的過(guò)程如下

4.1.1 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
后面的 DistroProtocol#sync 一節(jié)會(huì)描述類似的 delayTaskEngine.addTask的詳細(xì)過(guò)程。我們現(xiàn)在直接介紹重要部分。

4.1.2 PushDelayTask默認(rèn)是合并500ms內(nèi)的相同key的任務(wù),key為 Service,通過(guò)PushDelayTask#merge函數(shù)合并。合并邏輯為合并推送的targetClients列表或是推送全部。

這里的 delayTaskEngine 是 PushDelayTaskExecuteEnginePushDelayTaskExecuteEngine.taskProcessors為空,只能使用默認(rèn)的PushDelayTaskProcessor處理PushDelayTask

4.1.3
PushDelayTaskProcessor#process =>
NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));

NacosExecuteTaskExecuteEngine#addTask
NamingExecuteTaskDispatcher.executeEngine中的taskProcessors為空,defaultTaskProcessor也為空。所以會(huì)使用NacosExecuteTaskExecuteEngine#executeWorkers執(zhí)行任務(wù)。執(zhí)行的是剛剛封裝的PushExecuteTask。

4.1.4 PushExecuteTask.run
4.1.4.1 PushDataWrapper wrapper = generatePushData();
=> ServiceStorage#getPushData
Service數(shù)據(jù)復(fù)制為ServiceInfo,復(fù)制Service的基本信息

ServiceStorage#getAllInstancesFromIndex 生成該服務(wù)的所有實(shí)例列表
ClientServiceIndexesManager#publisherIndexes獲取該服務(wù)的所有clientid列表
使用每個(gè)clientid從ConnectionBasedClientManager#clients獲取對(duì)應(yīng)的Client,然后從AbstractClient#publishers獲取實(shí)例信息InstancePublishInfo
InstancePublishInfo轉(zhuǎn)為Instance,從NamingMetadataManager#instanceMetadataMap中獲取meta信息。
ServiceStorage#serviceClusterIndex中加<Service service, Set<String> clusters>條目。clusters默認(rèn)只有一個(gè)DEFAULT,來(lái)自于UtilsAndCommons#DEFAULT_CLUSTER_NAME

ServiceStorage#serviceDataIndexes 加入 <Service singleton, ServiceInfo result>條目,也就是服務(wù)對(duì)應(yīng)服務(wù)實(shí)例列表。

繼續(xù)回到generatePushData()
NamingMetadataManager#instanceMetadataMap中獲取meta信息,和剛剛的ServiceInfo合并為PushDataWrapper

4.1.4.2 getTargetClientIds()
ClientServiceIndexesManager#subscriberIndexes獲取所以訂閱該服務(wù)的clientId客戶端id。
針對(duì)每一個(gè)客戶端id,Client client = clientManager.getClient(each)
AbstractClient#subscribers獲取Subscriber

4.1.4.3 針對(duì)每一個(gè)訂閱者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper, new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));

PushExecutorDelegate.doPushWithCallback
=> getPushExecuteService(clientId, subscriber)
默認(rèn)SpiImplPushExecutorHolder#pushExecutors為空,最終返回PushExecutorRpcImpl

PushExecutorRpcImpl#doPushWithCallback
根據(jù)PushDataWrapper生成ServiceInfo,根據(jù)規(guī)則過(guò)濾一些實(shí)例。
使用RpcPushService#pushWithCallback向訂閱者客戶端推送NotifySubscriberRequest消息

推送成功回調(diào) PushExecuteTask.ServicePushCallback#onSuccess
發(fā)布PushServiceTraceEvent事件,默認(rèn)沒有監(jiān)聽該事件

  1. 發(fā)布 MetadataEvent.InstanceMetadataEvent
    NamingMetadataManager#onEvent響應(yīng)事件
    => NamingMetadataManager#handleInstanceMetadataEvent
    NamingMetadataManager#expiredMetadataInfos 移除該服務(wù)的metadata過(guò)期標(biāo)記
  1. 發(fā)布RegisterInstanceTraceEvent事件
    回到InstanceRequestHandler#registerInstance最后還會(huì)發(fā)布RegisterInstanceTraceEvent事件
    默認(rèn)沒有監(jiān)聽器監(jiān)聽此事件

訂閱服務(wù)實(shí)例變更消息 SubscribeServiceRequest

SubscribeServiceRequestHandler.handle => InstanceRequestHandler#registerInstance

構(gòu)建Subscriber對(duì)象
構(gòu)建ServiceInfo對(duì)象,包含Service以及實(shí)例列表。
來(lái)自于ServiceStorage#getData,先從ServiceStorage#serviceDataIndexes直接獲取ServiceInfo對(duì)象。如果沒有使用ServiceStorage#getPushData構(gòu)建ServiceInfo對(duì)象,從ServiceManager#getSingleton獲取Service,從ClientServiceIndexesManager#publisherIndexes,ConnectionBasedClientManager#clients,AbstractClient#publishers等中獲取實(shí)例列表,共同構(gòu)建ServiceInfo對(duì)象。

判斷SubscribeServiceRequest消息參數(shù)中的subscribe是true or false
true 訂閱 EphemeralClientOperationServiceImpl#subscribeService 發(fā)布SubscribeServiceTraceEvent
false 取消訂閱 EphemeralClientOperationServiceImpl#unsubscribeService 發(fā)布UnsubscribeServiceTraceEvent

然后返回ServiceInfo對(duì)象

  1. EphemeralClientOperationServiceImpl#subscribeService
    1.1 ConnectionBasedClientManager#clients獲取當(dāng)前發(fā)送訂閱消息的Client
    AbstractClient#addServiceSubscriber AbstractClient#subscribers中加入Subscriber,這里包含了想要訂閱的服務(wù)信息

1.2 發(fā)布 ClientOperationEvent.ClientSubscribeServiceEvent
ClientServiceIndexesManager#onEvent 響應(yīng)事件
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addSubscriberIndexes
ClientServiceIndexesManager#subscriberIndexes 中加入希望監(jiān)聽這個(gè)Service的ClientId, <Service service, Set<String clientId>>

如果是首次加入,Set<String clientId> 中首次加入這個(gè)ClientId,
則發(fā)布 ServiceEvent.ServiceSubscribedEvent 事件

1.2.1 ServiceEvent.ServiceSubscribedEvent 事件
NamingSubscriberServiceV2Impl#onEvent 響應(yīng)這個(gè)事件
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
構(gòu)建默認(rèn)延遲500毫秒的PushDelayTask,任務(wù)中包含了Service和新增訂閱的ClientId,注意PushDelayTask.pushToAll = false,并且指定了接受的客戶端PushDelayTask#targetClients為剛剛訂閱的客戶端
PushDelayTask的主要邏輯就是向剛剛訂閱的nacos客戶端推送服務(wù)端消息 NotifySubscriberRequest,包含了最新的服務(wù)及實(shí)例列表

PushDelayTask的具體執(zhí)行邏輯如下
1.2.2 后面的 DistroProtocol#sync 一節(jié)會(huì)描述類似的 delayTaskEngine.addTask的詳細(xì)過(guò)程。我們現(xiàn)在直接介紹重要部分。
1.2.2.1 PushDelayTask會(huì)進(jìn)行合并 PushDelayTask#merge,合并的邏輯就是合并要推送的nacos server列表。
1.2.2.2 PushDelayTaskExecuteEngine#taskProcessors為空,由PushDelayTaskExecuteEngine#defaultTaskProcessor處理PushDelayTask,也就是PushDelayTaskExecuteEngine.PushDelayTaskProcessor

PushDelayTaskExecuteEngine.PushDelayTaskProcessor#process
=> NamingExecuteTaskDispatcher#dispatchAndExecuteTask 執(zhí)行 PushExecuteTask任務(wù)
=> PushDelayTaskExecuteEngine#addTask

1.2.2.3 PushDelayTaskExecuteEngine#taskProcessors為空,PushDelayTaskExecuteEngine#defaultTaskProcessor為null
使用PushDelayTaskExecuteEngine#executeWorkers執(zhí)行PushExecuteTask任務(wù)

1.2.2.4 PushExecuteTask#run
PushExecuteTask#generatePushData 生成 PushDataWrapper
ServiceInfo數(shù)據(jù)來(lái)自于ServiceStorage#getPushData,ServiceMetadata來(lái)自于NamingMetadataManager#getServiceMetadata,一起封裝為PushDataWrapper

PushExecuteTask#getTargetClientIds 獲取需要發(fā)送的客戶端。
由于這個(gè)PushDelayTask構(gòu)建時(shí),PushDelayTask.pushToAll = false,并且指定了接受的客戶端PushDelayTask#targetClients為剛剛訂閱的客戶端,所以只會(huì)向剛剛訂閱的客戶端發(fā)送
PushExecutorDelegate#doPushWithCallback
=> PushExecutorRpcImpl#doPushWithCallback
=> RpcPushService#pushWithCallback
向剛剛訂閱的nacos客戶端推送服務(wù)端消息 NotifySubscriberRequest,包含了最新的服務(wù)及實(shí)例列表

  1. 發(fā)布SubscribeServiceTraceEvent
    回到EphemeralClientOperationServiceImpl#subscribeService函數(shù)的末尾,發(fā)布SubscribeServiceTraceEvent
    不過(guò)默認(rèn)沒有監(jiān)聽器響應(yīng)這個(gè)事件

DistroProtocol

DistroProtocol#sync

給其他所有nacos server節(jié)點(diǎn)除了自身,發(fā)送消息。
針對(duì)每個(gè)nacos server節(jié)點(diǎn)和微服務(wù)實(shí)例,積攢默認(rèn)1秒的消息,合并為1個(gè)消息。
經(jīng)過(guò)層層異步調(diào)用到達(dá)DistroDelayTaskProcessor#process,根據(jù)DistroProtocol#sync中不同的action,生成不同的Task,DistroSyncDeleteTask或是DistroSyncChangeTask。然后在異步執(zhí)行這個(gè)Task。最終通過(guò)DistroClientTransportAgent#syncData()向其它nacos server實(shí)例發(fā)送grpc消息DistroDataRequest,消息中包含了action以及必要的微服務(wù)實(shí)例信息。

詳細(xì)流程如下:

  1. 封裝為默認(rèn)1秒的DistroDelayTask,通過(guò)NacosDelayTaskExecuteEngine#addTask,將任務(wù)加入到NacosDelayTaskExecuteEngine#tasks中。這里NacosDelayTaskExecuteEngine的實(shí)現(xiàn)是DistroDelayTaskExecuteEngine。DistroTaskEngineHolder構(gòu)建DistroDelayTaskExecuteEngine時(shí),會(huì)加入默認(rèn)的DistroDelayTaskProcessor`。

addTask函數(shù)中,先用key查找是否已存在任務(wù),如果存在,newTask.merge(existTask);新舊任務(wù)合并。然后放回map。
key為DistroKey,使用resourceKey resourceType targetServer共同確定。
比如resourceKey = Client.getClientId() resourceType="Nacos:Naming:v2:ClientData" targetServer=target nacos server ip:port。
也就是同一個(gè)微服務(wù)實(shí)例,同一個(gè)nacos server實(shí)例,同一個(gè)resourceType,在1秒內(nèi)的任務(wù)會(huì)被合并為一個(gè)。

DistroDelayTask的合并邏輯就是根據(jù)createTime字段,保留最新的任務(wù)的action,舍棄舊的action。

  1. NacosDelayTaskExecuteEngine初始化時(shí),會(huì)啟動(dòng)一個(gè)線程,每100ms執(zhí)行一次ProcessRunnable。

也就是每100ms,ProcessRunnable中會(huì)遍歷NacosDelayTaskExecuteEngine#tasks中所有的任務(wù),取出已經(jīng)到時(shí)間的任務(wù)。對(duì)每個(gè)已經(jīng)到時(shí)間的任務(wù)執(zhí)行
NacosTaskProcessor processor = getProcessor(taskKey);
processor.process(task)

DistroDelayTaskExecuteEngine中的NacosTaskProcessor processor = getProcessor(taskKey);實(shí)現(xiàn)是將DistroKey.resourceType作為key尋找NacosTaskProcessor,比如"Nacos:Naming:v2:ClientData"

默認(rèn)DistroDelayTaskExecuteEngine#taskProcessors是空的,沒有注冊(cè)任何NacosTaskProcessor,最終所有getProcessor走的都是defaultTaskProcessor DistroDelayTaskProcessor

DistroDelayTaskProcessor#process
根據(jù)action,構(gòu)建DistroSyncDeleteTask或是DistroSyncChangeTask,加入到DistroExecuteTaskExecuteEngine#addTask。

  1. DistroExecuteTaskExecuteEngine#addTask內(nèi)部根據(jù)DistroKey尋找NacosTaskProcessor,執(zhí)行processor.process。
    如果NacosTaskProcessor為null,則通過(guò)DistroKeyNacosExecuteTaskExecuteEngine#executeWorkers中找一個(gè)TaskExecuteWorker,執(zhí)行worker.process。

默認(rèn)DistroExecuteTaskExecuteEngine#taskProcessors也是空的,沒有注冊(cè)任何NacosTaskProcessor,而且DistroExecuteTaskExecuteEngine#defaultTaskProcessor也是null。最終均由TaskExecuteWorker#process執(zhí)行。

TaskExecuteWorker#process則是將任務(wù)加入到TaskExecuteWorker#queue隊(duì)列中。InnerWorker會(huì)不停的從這個(gè)隊(duì)列中取出任務(wù),并執(zhí)行Runnable.run,也就是DistroSyncDeleteTask.run或是DistroSyncChangeTask.run。二者共用AbstractDistroExecuteTask#run

  1. AbstractDistroExecuteTask#run內(nèi)部
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    根據(jù)DistroKey.resourceTypeDistroComponentHolder#transportAgentMap中尋找對(duì)應(yīng)的DistroTransportAgent。默認(rèn)只注冊(cè)了DistroClientTransportAgent,也只用到了他。

如果transportAgent.supportCallbackTransport(),則執(zhí)行doExecuteWithCallback(new DistroExecuteCallback());。默認(rèn)DistroClientTransportAgent.supportCallbackTransport為true。

  1. DistroSyncChangeTask#doExecuteWithCallback
    DistroData distroData = getDistroData(type);傳入DistroKey.resourceType,也就是"Nacos:Naming:v2:ClientData"。
    DistroComponentHolder#dataStorageMap,這個(gè)map中只注冊(cè)了一個(gè)條目,最終獲取到DistroClientDataProcessor。
    DistroClientDataProcessor#getDistroData根據(jù)DistroKey.resourceKeyConnectionBasedClientManager#getClient獲取到當(dāng)前連接的微服務(wù)客戶端數(shù)據(jù)ConnectionBasedClient。

ConnectionBasedClient#generateSyncData生成ClientSyncData,里邊包含了微服務(wù)實(shí)例的相關(guān)完整數(shù)據(jù),同時(shí)ClientSyncData#attributes加入了"revision"條目,也就是這一ClientSyncData的版本號(hào)
再封裝為DistroData

  1. DistroClientTransportAgent#syncData()
    封裝為DistroDataRequest,通過(guò)ClusterRpcClientProxy#asyncRequest經(jīng)grpc發(fā)送出去。

DistroProtocol#onReceive

DistroDataRequest的請(qǐng)求處理,在接收方的nacos server實(shí)例內(nèi)部
DistroDataRequestHandler#handle
=> DistroProtocol#onReceive
=> DistroClientDataProcessor#processData
從消息中反序列化出ClientSyncData
=> DistroClientDataProcessor#handlerClientSyncData

  1. clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    ClientManagerDelegate#syncClientConnected
    => ConnectionBasedClientManager#syncClientConnected
    根據(jù)attributes中的"connectionType"獲取不同類型的ClientFactory,默認(rèn)ConnectionBasedClientFactory
    ConnectionBasedClientFactory構(gòu)建出ConnectionBasedClient
    ConnectionBasedClientManager#clients中如果不存在,才加入新的<connectionId, Client> 條目,此時(shí)新的Client內(nèi)部沒有具體連接信息。
  1. Client client = clientManager.getClient(clientSyncData.getClientId());
    取出 上一步新建的Client

  2. upgradeClient(client, clientSyncData);
    DistroClientDataProcessor#upgradeClient

3.1 ServiceManager#getSingleton
服務(wù)實(shí)例注冊(cè)消息 一節(jié)中提過(guò)
ServiceManager#singletonRepository ServiceManager#namespaceSingletonMaps 中加入Service

3.2 判斷當(dāng)前Client內(nèi)部是否存在相同的微服務(wù)實(shí)例信息
如果不存在,則會(huì)執(zhí)行一遍與EphemeralClientOperationServiceImpl#registerInstance內(nèi)類似的服務(wù)實(shí)例注冊(cè)邏輯。然后繼續(xù)向其它nacos server實(shí)例廣播該服務(wù)的注冊(cè)消息。但是其它nacos server實(shí)例再次執(zhí)行DistroProtocol#onReceive => upgradeClient(client, clientSyncData); 時(shí),在當(dāng)前這一步,發(fā)現(xiàn)已經(jīng)存在相同的微服務(wù)實(shí)例信息,則不再執(zhí)行后續(xù)的服務(wù)實(shí)例注冊(cè)邏輯,也就阻止了無(wú)盡的消息廣播。

接下來(lái)的邏輯與EphemeralClientOperationServiceImpl#registerInstance內(nèi)的服務(wù)實(shí)例注冊(cè)邏輯類似。
3.2.1 client.addServiceInstance(singleton, instancePublishInfo);
AbstractClient#publishers中加入微服務(wù)實(shí)例信息
發(fā)布ClientEvent.ClientChangedEvent事件。
該事件會(huì)再次觸發(fā)
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
一直到當(dāng)前函數(shù)的流程,但是接收方如果已經(jīng)有這個(gè)微服務(wù)實(shí)例信息的話,就不會(huì)執(zhí)行client.addServiceInstance(singleton, instancePublishInfo);,也就不會(huì)繼續(xù)產(chǎn)生ClientEvent.ClientChangedEvent事件

3.2.2 ClientOperationEvent.ClientRegisterServiceEvent事件
3.2.3 MetadataEvent.InstanceMetadataEvent事件

3.3 如果當(dāng)前Client存在了外來(lái)ClientSyncData中沒有的Service信息,則刪除本地該實(shí)例的Service信息,發(fā)布ClientOperationEvent.ClientDeregisterServiceEvent事件。
也就是一個(gè)nacos client實(shí)例是可以提供多個(gè)Service的,這里在同步一個(gè)nacos client實(shí)例的Service列表。

3.4 client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
同步本地的Client的版本號(hào)

至此,沒看到如果收到低版本號(hào)的Client消息時(shí)應(yīng)該忽略的邏輯,猜測(cè)這里是有問(wèn)題的。會(huì)將本地的客戶端信息由新版本降低為低版本舊數(shù)據(jù)的可能。

一些問(wèn)題

  1. ServiceInfo#lastRefTime 應(yīng)該是nacos客戶端用于定期輪詢更新緩存用的。nacos server端應(yīng)該是用不到這個(gè)字段。

  2. 如果同一時(shí)刻,實(shí)例1向serverA發(fā)送deregister,實(shí)例2向serverB發(fā)送deregister,兩個(gè)server互相廣播消息,最終是如何一致的。
    以serverA為例,收到實(shí)例1的變化時(shí),會(huì)移除實(shí)例1的實(shí)例信息,然后更新ServiceStorage#serviceDataIndexes緩存。收到實(shí)例2的變化時(shí),會(huì)移除實(shí)例2的實(shí)例信息,然后再更新緩存。無(wú)論這些消息、步驟如何異步,如何亂序。最終都可以保證有一個(gè)更新緩存的步驟是在最后發(fā)生的。這次更新緩存久一點(diǎn)更新出正確的結(jié)果。

  3. 如果一個(gè)客戶端快速重啟,向serverA發(fā)送deregister,向serverB發(fā)送register。nacos server會(huì)如何顯示這一實(shí)例的狀態(tài)。
    nacos-spring-cloud沒有使用@RefreshScope注解,應(yīng)該不會(huì)出現(xiàn)這種狀況。
    如果出現(xiàn)這種情況,感覺極端情況下是會(huì)不停的重復(fù)廣播注冊(cè)、去注冊(cè)消息的。但是DelayTask的merge與心跳機(jī)制最終應(yīng)該會(huì)統(tǒng)一穩(wěn)定為已注冊(cè)狀態(tài)。

斷點(diǎn)

  1. 發(fā)送 grpc請(qǐng)求
    io.grpc.internal.ClientCallImpl#sendMessage
ClientStream clientStream = ((RetriableStream) this.stream).state.drainedSubstreams.iterator().next().stream;


String local = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();

return "send: " + method.getFullMethodName() + " " + local + " -> " + remote + "\n" + message;
  1. 接受 grpc 請(qǐng)求
    io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onHalfClose method.invoke(request, responseObserver);一行
String local = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();

return "receive: " + call.method.getFullMethodName() + " " + remote + " -> " + local + "\n" + request;

可以通過(guò)過(guò)濾掉頻繁的jraft消息
!((ServerCallImpl) call).method.getFullMethodName().contains("jraft")

  1. 接受 http請(qǐng)求
    org.springframework.web.servlet.DispatcherServlet#doDispatch
"receive: " + request.getMethod() + " " +request.getRequestURL() + " params: " + new Gson().toJson(request.getParameterMap())
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容