前言
Eureka Client的應(yīng)用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中,會做以下幾件事:
- 1、周期性更新服務(wù)列表;
- 3、周期性服務(wù)續(xù)約;
- 3、服務(wù)注冊邏輯;
概覽
以下圖片來自Netflix官方,圖中顯示Eureka Client會發(fā)起Renew向注冊中心做周期性續(xù)約,這樣其他Eureka client通過Get Registry請求就能獲取到新注冊應(yīng)用的相關(guān)信息:

來自官方文檔的指導(dǎo)信息
最準(zhǔn)確的說明信息來自Netflix的官方文檔,地址:
https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew
關(guān)于續(xù)約的理解:
- 1、Eureka client每隔三十秒發(fā)送一次心跳到Eureka server,這就是續(xù)約;
- 2、Eureka client續(xù)約的目的是告訴Eureka server自己還活著;
- 3、Eureka server若90秒內(nèi)未收到心跳,就從自己的服務(wù)列表中剔除該Eureka client;
- 4、建議不要改變心跳間隔,因為Eureka server是通過心跳來判斷Eureka client是否正常;
服務(wù)續(xù)約執(zhí)行簡要流程圖
下面這張圖大致描述了服務(wù)續(xù)約從Client端到Server端的大致流程,詳情如下:

Eureka 續(xù)約源碼分析
1、Eureka Client發(fā)起續(xù)約
Eureka Client向Eureka Server發(fā)起注冊應(yīng)用實例成功后獲得租約,Eureka Client固定間隔向Eureka Server發(fā)起續(xù)約(renew),避免租約過期。
默認(rèn)情況下,租約有效期為90秒,續(xù)約頻率為30秒。兩者比例為1:3,保證在網(wǎng)絡(luò)異常等情況下,有三次重試的機(jī)會。
1)、初始化定時任務(wù)
Eureka Client在初始化過程中,創(chuàng)建心跳線程,固定間隔向Eureka Server發(fā)起續(xù)約。實現(xiàn)代碼如下
@Singleton
public class DiscoveryClient implements EurekaClient {
/**
* 初始化所有計劃的任務(wù)
*/
private void initScheduledTasks() {
//獲取注冊信息的定時任務(wù)
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//心跳定時任務(wù)
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
//服務(wù)實例同步定時任務(wù)
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注冊應(yīng)用實例狀態(tài)變更監(jiān)聽器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//初始化定時服務(wù)注冊任務(wù)
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
2)、發(fā)起續(xù)約
@Singleton
public class DiscoveryClient implements EurekaClient {
//最后成功向Eureka Server心跳時間戳
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private class HeartbeatThread implements Runnable {
public void run() {
// 調(diào)用續(xù)約方法
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
//服務(wù)續(xù)約
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//發(fā)Restful請求,即心跳
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//404錯誤會觸發(fā)注冊邏輯
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
//返回碼200表示心跳成功
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
}
AbstractJerseyEurekaHttpClient的renew()方法使用PUT請求調(diào)用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,參數(shù)為status、lastDirtyTimestamp、overriddenstatus,實現(xiàn)續(xù)約。
繼續(xù)展開上面代碼段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源碼在EurekaHttpClientDecorator類中:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
final String id,
final InstanceInfo info,
final InstanceStatus overriddenStatus) {
return execute(new RequestExecutor<InstanceInfo>() {
@Override
public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
//網(wǎng)絡(luò)處理委托給代理類完成
return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public RequestType getRequestType() {
//請求類型為心跳
return RequestType.SendHeartBeat;
}
});
}
繼續(xù)展開delegate.sendHeartBeat,多層調(diào)用一路展開,最終由JerseyApplicationClient類來完成操作,對應(yīng)源碼在父類AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey庫的Restful Api將自身的信息PUT到Eureka server,注意:這里不是POST,也不是GET,而是PUT:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
//請求參數(shù)有兩個:Eureka client自身狀態(tài)、自身關(guān)鍵信息(狀態(tài)、元數(shù)據(jù)等)最后一次變化的時間
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//注意:這里不是POST,也不是GET,而是PUT
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity() &&
!HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
至此,Eureka client向服務(wù)續(xù)租的源碼就分析完畢了,過程相對簡單,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等實例各司其職,定時發(fā)送PUT請求到Eureka server。
2、Eureka Server接收續(xù)約
Eureka Server接收續(xù)約核心流程如下圖:

1)、接收續(xù)約請求
@Produces({"application/xml", "application/json"})
public class InstanceResource {
@PUT
public Response renewLease(
// 是否是Replication模式 復(fù)制,同步
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus, // 實例的覆蓋狀態(tài)
@QueryParam("status") String status, // 實例狀態(tài)
// 實例信息在EurekClient端上次被修改的時間
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 續(xù)約
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
// 續(xù)租失敗,返回404,EurekaClient端收到404后會發(fā)起注冊請求
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
// 比較InstanceInfo的lastDirtyTimestamp屬性
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
// 驗證傳入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
// 續(xù)約成功,返回200
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
}
PeerAwareInstanceRegistryImpl中調(diào)用了父類AbstractInstanceRegistry的renew(...)方法續(xù)約實例信息
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 調(diào)用父類里的renew(appName, id, isReplication)方法續(xù)約
if (super.renew(appName, id, isReplication)) {
// 如果是續(xù)約請求則向其他EurekaServer節(jié)點同步續(xù)約信息
// 如果是同步信息請求則直接返回
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
}
2)、續(xù)約應(yīng)用實例信息
調(diào)用了AbstractInstanceRegistry的renew(...)方法,續(xù)約實例信息,代碼如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public boolean renew(String appName, String id, boolean isReplication) {
// 增加續(xù)約次數(shù)到監(jiān)控
RENEW.increment(isReplication);
// 獲取應(yīng)用名對應(yīng)的租約,即根據(jù)實例名稱取出實例信息集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
// 根據(jù)實例id取出具體實例租約信息
leaseToRenew = gMap.get(id);
}
// 租約不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 獲得實例的覆蓋狀態(tài)
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 實例覆蓋狀態(tài)為UNKNOWN,續(xù)租失敗
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 實例狀態(tài)與覆蓋狀態(tài)不一致
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
// 強(qiáng)行把實例的覆蓋狀態(tài)設(shè)為實例狀態(tài)
// 即status = overriddenInstanceStatus
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 新增續(xù)租每分鐘次數(shù)
renewsLastMin.increment();
// 續(xù)租(設(shè)置lastUpdateTimestamp(租約最后更新時間))
leaseToRenew.renew();
return true;
}
}
}
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
private volatile long lastUpdateTimestamp;
public void renew() {
// 設(shè)置租約最后更新時間戳
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
}
續(xù)約的整個過程修改租約的過期時間,即使并發(fā)請求,也不會對數(shù)據(jù)的一致性產(chǎn)生影響,因此不需要像注冊操作一樣加鎖。
3)、eureka引入overriddenstatus用來解決狀態(tài)被覆蓋問題
客戶端調(diào)用updateStatus方法時,同時更新server端實例的status和overriddenStatus狀態(tài)。
客戶端調(diào)用renew方法時,也要更新server端實例的status和overriddenstatus狀態(tài),但是有以下規(guī)則的
(1):如果客戶端上傳的實例狀態(tài)是down或者starting,表明客戶端是重啟或者h(yuǎn)ealthCheck失敗。此時這個實例不能作為服務(wù)提供服務(wù)。因此即使客戶端調(diào)用updateStatus把實例狀態(tài)更新為up,也是沒用的。此時客戶端實例的準(zhǔn)確狀態(tài)就是down或者starting。
(2):如果客戶端的實例是up或者out_of_service,此時是不可信的。就像第二大節(jié)介紹的那樣。有可能client端的實例狀態(tài)已被改變,此時要使用overriddenstatus狀態(tài)作為當(dāng)前實例的狀態(tài),避免被覆蓋。
(3):(2)中的overriddenstatus有可能不存在,緩存失效,此時要使用server端已經(jīng)存在的實例的狀態(tài)。
參考:
https://xinchen.blog.csdn.net/article/details/82915355
https://blog.csdn.net/qq_40378034/article/details/119079180