Eureka源碼采用1.7.2版本
本人小白,此文為本人閱讀源碼筆記,如果您讀到本文,您需要自己甄別是否正確,文中的說(shuō)明只代表本人理解,不一定是正確的?。?!
心跳機(jī)制在于證明客戶端正常運(yùn)行,在代碼層面在于定時(shí)更新過(guò)期時(shí)間,防止自動(dòng)故障移除機(jī)制導(dǎo)致實(shí)例被摘除。
Eureka Client的心跳機(jī)制是在客戶端初始化時(shí)也構(gòu)建了一個(gè)心跳執(zhí)行的線程池
com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
//初始化保持心跳的線程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
心跳任務(wù)的觸發(fā)在調(diào)度任務(wù)初始化時(shí)候
com.netflix.discovery.DiscoveryClient#initScheduledTasks
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
//任務(wù)線程
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
默認(rèn)是renewalIntervalInSecs延遲之后再進(jìn)行心跳發(fā)送,默認(rèn)是30S,并且定時(shí)發(fā)送的間隔也是30S。這里面核心的是
任務(wù)執(zhí)行線程HeartbeatThread
com.netflix.discovery.DiscoveryClient.HeartbeatThread#run
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//發(fā)送心跳
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
這兩個(gè)方法沒(méi)什么可說(shuō)的,就是發(fā)送一個(gè)Http請(qǐng)求,請(qǐng)求的地址類似這樣的:
http://DESKTOP-1S6DCTA:8080/v2/apps/EUREKA/001
參數(shù):status,lastDirtyTimestamp
method:PUT
直接轉(zhuǎn)Server端吧,看看怎么處理心跳請(qǐng)求
com.netflix.eureka.resources.InstanceResource#renewLease
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
//集群節(jié)點(diǎn)同步心跳
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
核心方法還是調(diào)用了父類的renew(),主要的處理邏輯在那里面
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
//服務(wù)續(xù)約方法
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
//獲取appName獲取服務(wù)實(shí)例
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
//通過(guò)ID獲取租約
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//獲取租約的服務(wù)實(shí)例
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
//獲取服務(wù)實(shí)例的狀態(tài)
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
//如果不相等則進(jìn)行覆蓋
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//記錄每一分鐘實(shí)際的心跳數(shù)量
renewsLastMin.increment();
//重置續(xù)約時(shí)間
leaseToRenew.renew();
return true;
}
}
主要流程:
- 依據(jù)服務(wù)名稱和服務(wù)ID獲取到租約
- 設(shè)置租約的覆蓋狀態(tài)
- 當(dāng)前分鐘內(nèi)心跳次數(shù)自增
- 重置續(xù)約時(shí)間
最核心就是更新服務(wù)實(shí)例的過(guò)期時(shí)間,防止自動(dòng)故障移除機(jī)制錯(cuò)誤的摘除,服務(wù)實(shí)例的方法如下:
lastUpdateTimestamp = System.currentTimeMillis() + duration;
這個(gè)是當(dāng)前時(shí)間+過(guò)期時(shí)間(90S),有個(gè)BUG,在自動(dòng)故障移除的時(shí)候會(huì)看到的
從這個(gè)源碼中可以看出一個(gè)場(chǎng)景啊,當(dāng)一個(gè)服務(wù)實(shí)例因?yàn)榫W(wǎng)絡(luò)故障長(zhǎng)時(shí)間未發(fā)送心跳,造成服務(wù)實(shí)例被摘除(3min)
那么后續(xù)網(wǎng)絡(luò)正常后,再次發(fā)送心跳會(huì)找不到對(duì)應(yīng)的實(shí)例
客戶端這塊是怎么解決的呢?看看客戶端代碼:
com.netflix.discovery.DiscoveryClient#renew
//如果當(dāng)前的服務(wù)實(shí)例在注冊(cè)中心已經(jīng)被移除
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
//重新注冊(cè)下
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
可以看到客戶端在發(fā)現(xiàn)服務(wù)實(shí)例未找到時(shí),直接進(jìn)行注冊(cè)了
我這里說(shuō)下,Eureka的一堆機(jī)制里調(diào)度任務(wù)太多,造成一些時(shí)間計(jì)算感覺(jué)不是很可靠