獲取服務(wù)Client 端流程
??我們先看下面這張圖片,這張圖片簡單描述了下我們Client是如何獲取到Server已續(xù)約實例信息的流程:
??從圖片中我們可以知曉大致流程就是Client會自己開啟一個定時任務(wù),然后根據(jù)不同的條件去調(diào)用Server端接口得到所有已續(xù)約服務(wù)的信息,然后合并到自己的緩存數(shù)據(jù)中。下面我們詳情了解下上述流程在源碼中的具體實現(xiàn)。
獲取服務(wù)Client端源碼分析
??我們先來看看服務(wù)獲取定時任務(wù)的初始化。那我們的服務(wù)獲取定時任務(wù)什么時候會被初始化呢,那肯定是我們啟用我們Eureka Client的時候唄,當我們啟動Client時,Eureka會先處理相關(guān)的配置,然后初始化我們Client的相關(guān)信息,我們的定時任務(wù)也就是此時進行的初始化,更具體地說我們的服務(wù)續(xù)約定時任務(wù)就是在DiscoveryClient這個類中initScheduledTasks方法中被初始化的。具體代碼如下:
private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
…省略其他代碼
// 初始化定時拉取服務(wù)注冊信息
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
…省略其他代碼
}
??由此可見,我們的定時任務(wù)其實是Client進行初始化完成的,并且還是使用ScheduledExecutorService線程池來完成我們的定時任務(wù)。我們下面就看看CacheRefreshThread這個類是如何實現(xiàn)獲取服務(wù)的流程:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
??不多說,我們接著看refreshRegistry()方法:
@VisibleForTesting
void refreshRegistry() {
…省略部分代碼
boolean success = fetchRegistry(remoteRegionsModified); // 獲取實例信息
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
…省略部分代碼
}
??這里不做太多解釋,我們接著看fetchRegistry()方法:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
//1. 是否禁用增量更新;
//2. 是否對某個region特別關(guān)注;
//3. 外部調(diào)用時是否通過入?yún)⒅付ㄈ扛拢? //4. 本地還未緩存有效的服務(wù)列表信息;
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry(); // 全量更新
} else {
getAndUpdateDelta(applications); // 增量更新
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
??由此可見,fetchRegistry 方法主要是判斷我們獲取實例信息是進行全量更新還是增量更新。如果條件成立,則我們會進行全量更新,否則則進行批量更新。下面我們簡單介紹下getAndStoreFullRegistry() 全量更新、getAndUpdateDelta(applications)批量更新方法:
// 全量更新
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
// 調(diào)用服務(wù)端接口得到全量的實例信息
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
// 將實例信息存進Applications
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
?? getAndStoreFullRegistry() 簡單來說就是通過調(diào)用Eureka服務(wù)端提供的http接口獲取到全部實例信息,然后將實例信息存進我們的Applications中。
// 批量更新
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 得到增量的實例信息
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) { // 如果增量信息為空,則進行一次全量更新
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
}
//考慮到多線程同步問題,這里通過CAS來確保請求發(fā)起到現(xiàn)在是線程安全的,
//如果這期間fetchRegistryGeneration變了,就表示其他線程也做了類似操作,因此放棄本次響應(yīng)的數(shù)據(jù)
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 合并增量實例信息
updateDelta(delta);
// 用合并了增量數(shù)據(jù)之后的本地數(shù)據(jù)來生成一致性哈希碼
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
//Eureka server在返回增量更新數(shù)據(jù)時,也會返回服務(wù)端的一致性哈希碼,
//理論上每次本地緩存數(shù)據(jù)經(jīng)歷了多次增量更新后,計算出的一致性哈希碼應(yīng)該是和服務(wù)端一致的,
//如果發(fā)現(xiàn)不一致,就證明本地緩存的服務(wù)列表信息和Eureka server不一致了,需要做一次全量更新
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
??getAndUpdateDelta 方法簡單來說,就是首先調(diào)用Eureka服務(wù)端提供的增量信息接口得到增量實例信息,然后進行判斷,如果增量為null,為了數(shù)據(jù)準確性,調(diào)用一下全量更新實例接口更新實例信息。如果增量信息不為空,則進行一個cas處理,如果成功,則進行增量信息的合并。最后會再進行一次判斷,判斷從服務(wù)端得到的批量實例信息計算得到的HashCode是否和從服務(wù)端得到的實例信息HashCode值是否相等,如果不相等則會調(diào)用reconcileAndLogDifference 方法,再次進行全量更新實例信息。下面我們就簡單看下reconcileAndLogDifference這個方法:
private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {
logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",
reconcileHashCode, delta.getAppsHashCode());
RECONCILE_HASH_CODES_MISMATCH.increment();
long currentUpdateGeneration = fetchRegistryGeneration.get();
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
// 調(diào)用服務(wù)端接口得到全量的實例信息
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
Applications serverApps = httpResponse.getEntity();
if (serverApps == null) {
logger.warn("Cannot fetch full registry from the server; reconciliation failure");
return;
}
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(serverApps));
getApplications().setVersion(delta.getVersion());
logger.debug(
"The Reconcile hashcodes after complete sync up, client : {}, server : {}.",
getApplications().getReconcileHashCode(),
delta.getAppsHashCode());
} else {
logger.warn("Not setting the applications map as another thread has advanced the update generation");
}
}
?? 由此可見,reconcileAndLogDifference方法和我們getAndStoreFullRegistry方法調(diào)用的接口一樣,都是調(diào)用Eureka服務(wù)端提供的全量實例信息接口,然后更新我們自己的實例信息。
總的來說,獲取服務(wù)Client端的執(zhí)行流程就可以分為以下兩步:
?? 1.初始化一個定時任務(wù),默認30s
?? 2.定時任務(wù)中根據(jù)不同的情況調(diào)用不同的方法來更新本地實例緩存信息
在定時任務(wù)中獲取實例信息我們也可以分為以下幾步:
?? 1.判斷是否需要全量更新
?? 2.條件成立則進行全量更新
???? a.將全量更新數(shù)據(jù)更新到本地緩存中
??3.條件不成立則進行批量更新
?? ??a.判斷批量更新數(shù)據(jù)是否為空,是則進行一次全量更新
?? ??b.將批量更新數(shù)據(jù)合并到本地緩存中
?? ??c.判斷批量更新數(shù)據(jù)計算得到的HashCode是否和服務(wù)端傳過來的HashCode相等,如果不相等,說明數(shù)據(jù)有問題,需要進行一次全量更新
??下面為自己總結(jié)的Eureka相關(guān)的知識點,有興趣地小伙伴可以看一看,當然再點下贊就更棒了,創(chuàng)作不易!
??Eureka系列(一)Eureka功能介紹
??Eureka系列(二) 服務(wù)注冊Server端具體實現(xiàn)
??Eureka系列(三)獲取服務(wù)Client端具體實現(xiàn)
??Eureka系列(四) 獲取服務(wù)Server端具體實現(xiàn)
??Eureka系列(五) 服務(wù)續(xù)約流程具體實現(xiàn)
??Eureka系列(六) TimedSupervisorTask類解析
??Eureka系列(七) 服務(wù)下線Server端具體實現(xiàn)
??Eureka系列(八)服務(wù)剔除具體實現(xiàn)
??Eureka系列(九)Eureka自我保護機制