服務(wù)續(xù)約執(zhí)行簡(jiǎn)要流程圖
??下面這張圖大致描述了服務(wù)續(xù)約從Client端到Server端的大致流程,詳情如下:
服務(wù)續(xù)約Client源碼分析
??我們先來看看服務(wù)續(xù)約定時(shí)任務(wù)的初始化。那我們的服務(wù)續(xù)約定時(shí)任務(wù)什么時(shí)候會(huì)被初始化呢,那肯定是我們啟用我們Eureka Client的時(shí)候,當(dāng)我們啟動(dòng)Client時(shí),Eureka會(huì)先處理相關(guān)的配置,然后初始化我們Client的相關(guān)信息,我們的定時(shí)任務(wù)也就是此時(shí)進(jìn)行的初始化,具體來說我們的服務(wù)續(xù)約定時(shí)任務(wù)就是在DiscoveryClient這個(gè)類中initScheduledTasks方法中被初始化的。代碼如下:
private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
…省略其他代碼
// 初始化定時(shí)服務(wù)續(xù)約任務(wù)
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
…省略其他代碼
}
??由此可見,我們的定時(shí)任務(wù)其實(shí)是Client進(jìn)行初始化完成的,并且還是使用ScheduledExecutorService線程池來完成我們的定時(shí)任務(wù)。接下來我們看看HeartbeatThread類相關(guān)的源碼:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
??HeartbeatThread類中run方法調(diào)用了renew方法,我們接著看下去:
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 調(diào)用Eureka Server提供的服務(wù)續(xù)約Http接口
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
?? 通過eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null) 這行代碼,我們則可以知道Client端的續(xù)約其實(shí)就是用調(diào)用Server提供的接口,然后Server端實(shí)現(xiàn)了服務(wù)續(xù)約的操作。
服務(wù)續(xù)約Server源碼實(shí)現(xiàn)
??由上面可見,我們Client的續(xù)約操作是直接調(diào)用的Server端提供的接口,以此來實(shí)現(xiàn)的具體操作,那我們接下來就看看Server是如何進(jìn)行服務(wù)續(xù)約操作的,源碼如下:
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 進(jìn)行服務(wù)續(xù)約操作
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response = null;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
??由registry.renew(app.getName(), id, isFromReplicaNode);這里我們可以看到調(diào)用了renew()方法來實(shí)現(xiàn)續(xù)約操作,那我們就看下這個(gè)方法的實(shí)現(xiàn):
Override
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);
}
??我們可以看到方法最后調(diào)用了父類的renew()方法,我們繼續(xù)看下去:
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 向其他server發(fā)送服務(wù)續(xù)約消息
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
public void renew() {
// 更新上次更新時(shí)間
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
??由上面的代碼可見,Eureka Server服務(wù)續(xù)約的流程可大致分為以下幾步:
????1.更新實(shí)例狀態(tài)
????2.更新實(shí)例上次更新時(shí)間
????3.向其他Server發(fā)送服務(wù)續(xù)約消息
??下面為自己總結(jié)的Eureka相關(guān)的知識(shí)點(diǎn),有興趣地小伙伴可以看一看,當(dāng)然再點(diǎn)下贊就更棒了,創(chuàng)作不易!
??Eureka系列(一)Eureka功能介紹
??Eureka系列(二) 服務(wù)注冊(cè)Server端具體實(shí)現(xiàn)
??Eureka系列(三)獲取服務(wù)Client端具體實(shí)現(xiàn)
??Eureka系列(四) 獲取服務(wù)Server端具體實(shí)現(xiàn)
??Eureka系列(五) 服務(wù)續(xù)約流程具體實(shí)現(xiàn)
??Eureka系列(六) TimedSupervisorTask類解析
??Eureka系列(七) 服務(wù)下線Server端具體實(shí)現(xiàn)
??Eureka系列(八)服務(wù)剔除具體實(shí)現(xiàn)
??Eureka系列(九)Eureka自我保護(hù)機(jī)制