
Eureka-Server是如何判斷一個(gè)服務(wù)不可用的?
Eureka是通過心跳續(xù)約的方式來檢查各個(gè)服務(wù)提供者的健康狀態(tài)。
實(shí)際上,在判斷服務(wù)不可用這個(gè)部分,會(huì)分為兩塊邏輯。
- Eureka-Server需要定期檢查服務(wù)提供者的健康狀態(tài)。
- Eureka-Client在運(yùn)行過程中需要定期更新注冊信息。
Eureka的心跳續(xù)約機(jī)制如下圖所示。

- 客戶端在啟動(dòng)時(shí), 會(huì)開啟一個(gè)心跳任務(wù),每隔30s向服務(wù)單發(fā)送一次心跳請求。
- 服務(wù)端維護(hù)了每個(gè)實(shí)例的最后一次心跳時(shí)間,客戶端發(fā)送心跳包過來后,會(huì)更新這個(gè)心跳時(shí)間。
- 服務(wù)端在啟動(dòng)時(shí),開啟了一個(gè)定時(shí)任務(wù),該任務(wù)每隔60s執(zhí)行一次,檢查每個(gè)實(shí)例的最后一次心跳時(shí)間是否超過90s,如果超過則認(rèn)為過期,需要剔除。
關(guān)于上述流程中涉及到的時(shí)間,可以通過以下配置來更改.
#Server 至上一次收到 Client 的心跳之后,等待下一次心跳的超時(shí)時(shí)間,在這個(gè)時(shí)間內(nèi)若沒收到下一次心跳,則將移除該 Instance。
eureka.instance.lease-expiration-duration-in-seconds=90
# Server 清理無效節(jié)點(diǎn)的時(shí)間間隔,默認(rèn)60000毫秒,即60秒。
eureka.server.eviction-interval-timer-in-ms=60
客戶端心跳發(fā)起流程
心跳續(xù)約是客戶端發(fā)起的,每隔30s執(zhí)行一次。
DiscoveryClient.initScheduledTasks
繼續(xù)回到DiscoveryClient.initScheduledTasks方法中,
private void initScheduledTasks() {
//省略....
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
//省略....
}
renewalIntervalInSecs=30s, 默認(rèn)每隔30s執(zhí)行一次。
HeartbeatThread
這個(gè)線程的實(shí)現(xiàn)很簡單,調(diào)用renew()續(xù)約,如果續(xù)約成功,則更新最后一次心跳續(xù)約時(shí)間。
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
在renew()方法中,調(diào)用EurekaServer的"apps/" + appName + '/' + id;這個(gè)地址,進(jìn)行心跳續(xù)約。
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
服務(wù)端收到心跳處理
服務(wù)端具體為調(diào)用[com.netflix.eureka.resources]包下的InstanceResource類的renewLease方法進(jìn)行續(xù)約,代碼如下
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
//調(diào)用renew進(jìn)行續(xù)約
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) { //如果續(xù)約失敗,返回異常
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
//校驗(yàn)客戶端與服務(wù)端的時(shí)間差異,如果存在問題則需要重新發(fā)起注冊
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build(); // 續(xù)約成功,返回200
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
InstanceRegistry.renew
renew的實(shí)現(xiàn)方法如下,主要有兩個(gè)流程
- 從服務(wù)注冊列表中找到匹配當(dāng)前請求的實(shí)例
- 發(fā)布EurekaInstanceRenewedEvent事件
@Override
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
//獲取所有服務(wù)注冊信息
List<Application> applications = getSortedApplications();
for (Application input : applications) { //逐一遍歷
if (input.getName().equals(appName)) { //如果當(dāng)前續(xù)約的客戶端和某個(gè)服務(wù)注冊信息節(jié)點(diǎn)相同
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) { //遍歷這個(gè)服務(wù)集群下的所有節(jié)點(diǎn),找到某個(gè)匹配的實(shí)例instance返回。
if (info.getId().equals(serverId)) {
instance = info; //
break;
}
}
//發(fā)布EurekaInstanceRenewedEvent事件,這個(gè)事件在EurekaServer中并沒有處理,我們可以監(jiān)聽這個(gè)事件來做一些事情,比如做監(jiān)控。
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);
}
super.renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) { //調(diào)用父類的續(xù)約方法,如果續(xù)約成功
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); //同步給集群中的所有節(jié)點(diǎn)
return true;
}
return false;
}
AbstractInstanceRegistry.renew
在這個(gè)方法中,會(huì)拿到應(yīng)用對(duì)應(yīng)的實(shí)例列表,然后調(diào)用Lease.renew()去進(jìn)行心跳續(xù)約。
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); //根據(jù)服務(wù)名字獲取實(shí)例信息
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id); //獲取需要續(xù)約的服務(wù)實(shí)例,
}
if (leaseToRenew == null) { //如果為空,說明這個(gè)服務(wù)實(shí)例不存在,直接返回續(xù)約失敗
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else { //表示實(shí)例存在
InstanceInfo instanceInfo = leaseToRenew.getHolder(); //獲取實(shí)例的基本信息
if (instanceInfo != null) { //實(shí)例基本信息不為空
// touchASGCache(instanceInfo.getASGName());
//獲取實(shí)例的運(yùn)行狀態(tài)
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { //如果運(yùn)行狀態(tài)未知,也返回續(xù)約失敗
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
//如果當(dāng)前請求的實(shí)例信息
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//更新上一分鐘的續(xù)約數(shù)量
renewsLastMin.increment();
leaseToRenew.renew(); //續(xù)約
return true;
}
}
續(xù)約的實(shí)現(xiàn),就是更新服務(wù)端最后一次收到心跳請求的時(shí)間。
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
Eureka的自我保護(hù)機(jī)制
實(shí)際,心跳檢測機(jī)制有一定的不確定行,比如服務(wù)提供者可能是正常的,但是由于網(wǎng)絡(luò)通信的問題,導(dǎo)致在90s內(nèi)沒有收到心跳請求,那將會(huì)導(dǎo)致健康的服務(wù)被誤殺。
為了避免這種問題,Eureka提供了一種叫自我保護(hù)機(jī)制的東西。簡單來說,就是開啟自我保護(hù)機(jī)制后,Eureka Server會(huì)包這些服務(wù)實(shí)例保護(hù)起來,避免過期導(dǎo)致實(shí)例被剔除的問題,從而保證Eurreka集群更加健壯和穩(wěn)定。
進(jìn)入自我保護(hù)狀態(tài)后,會(huì)出現(xiàn)以下幾種情況
- Eureka Server不再從注冊列表中移除因?yàn)殚L時(shí)間沒有收到心跳而應(yīng)該剔除的過期服務(wù),如果在保護(hù)期內(nèi)如果服務(wù)剛好這個(gè)服務(wù)提供者非正常下線了,此時(shí)服務(wù)消費(fèi)者就會(huì)拿到一個(gè)無效的服務(wù)實(shí)例,此時(shí)會(huì)調(diào)用失敗,對(duì)于這個(gè)問題需要服務(wù)消費(fèi)者端要有一些容錯(cuò)機(jī)制,如重試,斷路器等!
- Eureka Server仍然能夠接受新服務(wù)的注冊和查詢請求,但是不會(huì)被同步到其他節(jié)點(diǎn)上,保證當(dāng)前節(jié)點(diǎn)依然可用。
Eureka自我保護(hù)機(jī)制,通過配置 eureka.server.enable-self-preservation 來【true】打開/【false禁用】自我保護(hù)機(jī)制,默認(rèn)打開狀態(tài),建議生產(chǎn)環(huán)境打開此配置。
自我保護(hù)機(jī)制應(yīng)該如何設(shè)計(jì),才能更加精準(zhǔn)的控制到
“是網(wǎng)絡(luò)異常”導(dǎo)致的通信延遲,而不是服務(wù)宕機(jī)呢?
Eureka是這么做的: 如果低于85%的客戶端節(jié)點(diǎn)都沒有正常的心跳,那么Eureka Server就認(rèn)為客戶端與注冊中心出現(xiàn)了網(wǎng)絡(luò)故障,Eureka Server自動(dòng)進(jìn)入自我保護(hù)狀態(tài).
其中,
85%這個(gè)閾值,可以通過下面這個(gè)配置來設(shè)置
# 自我保護(hù)續(xù)約百分比,默認(rèn)是0.85
eureka.server.renewal-percent-threshold=0.85
但是還有個(gè)問題,超過誰的85%呢?這里有一個(gè)預(yù)期的續(xù)約數(shù)量,這個(gè)數(shù)量計(jì)算公式如下:
//自我保護(hù)閥值 = 服務(wù)總數(shù) * 每分鐘續(xù)約數(shù)(60S/客戶端續(xù)約間隔) * 自我保護(hù)續(xù)約百分比閥值因子
假設(shè)如果有100個(gè)服務(wù),續(xù)約間隔是30S,自我保護(hù)閾值0.85,那么它的預(yù)期續(xù)約數(shù)量為:
自我保護(hù)閾值 =100 * 60 / 30 * 0.85 = 170。
自動(dòng)續(xù)約的閾值設(shè)置
在EurekaServerBootstrap這個(gè)類的contextInitialized方法中,會(huì)調(diào)用initEurekaServerContext進(jìn)行初始化
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
繼續(xù)往下看。
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
//...
registry.openForTraffic(applicationInfoManager, registryCount);
}
在openForTraffic方法中,會(huì)初始化
expectedNumberOfClientsSendingRenews這個(gè)值,這個(gè)值的含義是:預(yù)期每分鐘收到續(xù)約的客戶端數(shù)量,取決于注冊到eureka server上的服務(wù)數(shù)量
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count; //初始值是1.
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
updateRenewsPerMinThreshold
接著調(diào)用updateRenewsPerMinThreshold方法,會(huì)更新一個(gè)每分鐘最小的續(xù)約數(shù)量,也就是Eureka Server期望每分鐘收到客戶端實(shí)例續(xù)約的總數(shù)的閾值。如果小于這個(gè)閾值,就會(huì)觸發(fā)自我保護(hù)機(jī)制。
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
//自我保護(hù)閥值 = 服務(wù)總數(shù) * 每分鐘續(xù)約數(shù)(60S/客戶端續(xù)約間隔) * 自我保護(hù)續(xù)約百分比閥值因子
- getExpectedClientRenewalIntervalSeconds,客戶端的續(xù)約間隔,默認(rèn)為30s
- getRenewalPercentThreshold,自我保護(hù)續(xù)約百分比閾值因子,默認(rèn)0.85。 也就是說每分鐘的續(xù)約數(shù)量要大于85%
預(yù)期值的變化觸發(fā)機(jī)制
expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThreshold 這兩個(gè)值,會(huì)隨著新增服務(wù)注冊以及服務(wù)下線的觸發(fā)而發(fā)生變化。
PeerAwareInstanceRegistryImpl.cancel
當(dāng)服務(wù)提供者主動(dòng)下線時(shí),表示這個(gè)時(shí)候Eureka-Server要剔除這個(gè)服務(wù)提供者的地址,同時(shí)也代表這這個(gè)心跳續(xù)約的閾值要發(fā)生變化。所以在PeerAwareInstanceRegistryImpl.cancel中可以看到數(shù)據(jù)的更新
調(diào)用路徑 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel
服務(wù)下線之后,意味著需要發(fā)送續(xù)約的客戶端數(shù)量遞減了,所以在這里進(jìn)行修改
protected boolean internalCancel(String appName, String id, boolean isReplication) {
//....
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
}
PeerAwareInstanceRegistryImpl.register
當(dāng)有新的服務(wù)提供者注冊到eureka-server上時(shí),需要增加續(xù)約的客戶端數(shù)量,所以在register方法中會(huì)進(jìn)行處理
register ->super.register(AbstractInstanceRegistry)
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
//....
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
}
每隔15分鐘刷新自我保護(hù)閾值
PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask
每隔15分鐘,更新一次自我保護(hù)閾值!
private void updateRenewalThreshold() {
try {
// 1. 計(jì)算應(yīng)用實(shí)例數(shù)
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
//當(dāng)節(jié)點(diǎn)數(shù)量count大于最小續(xù)約數(shù)量時(shí),或者沒有開啟自我保護(hù)機(jī)制的情況下,重新計(jì)算expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThreshold
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
自我保護(hù)機(jī)制的觸發(fā)
在AbstractInstanceRegistry的postInit方法中,會(huì)開啟一個(gè)EvictionTask的任務(wù),這個(gè)任務(wù)用來檢測是否需要開啟自我保護(hù)機(jī)制。
這個(gè)方法也是在EurekaServerBootstrap方法啟動(dòng)時(shí)觸發(fā)。
protected void postInit() {
renewsLastMin.start(); //開啟一個(gè)定時(shí)任務(wù),用來實(shí)現(xiàn)每分鐘的續(xù)約數(shù)量,每隔60s歸0重新計(jì)算
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask()); //啟動(dòng)一個(gè)定時(shí)任務(wù)EvictionTask,每隔60s執(zhí)行一次
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
其中,EvictionTask的代碼如下。
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
//獲取補(bǔ)償時(shí)間毫秒數(shù)
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
evict方法
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 是否需要開啟自我保護(hù)機(jī)制,如果需要,那么直接RETURE, 不需要繼續(xù)往下執(zhí)行了
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
//這下面主要是做服務(wù)自動(dòng)下線的操作的。
}
isLeaseExpirationEnabled
- 是否開啟了自我保護(hù)機(jī)制,如果沒有,則跳過,默認(rèn)是開啟
- 計(jì)算是否需要開啟自我保護(hù),判斷最后一分鐘收到的續(xù)約數(shù)量是否大于
numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}