序
這篇文章來看一下DiscoveryClient類是怎么實(shí)現(xiàn)服務(wù)注冊(cè)的。
DiscoveryClient類有一個(gè)initScheduledTasks方法。下面是initScheduledTasks的源碼。
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
注釋說明這個(gè)方法初始化了所有的規(guī)定任務(wù)。服務(wù)注冊(cè)也是在這個(gè)方法里。
有一段if分支是clientConfig.shouldRegisterWithEureka(),判斷是否發(fā)起注冊(cè),其實(shí)就是獲取配置文件中的registration.enabled,系統(tǒng)默認(rèn)為true。
在這個(gè)分支內(nèi)會(huì)創(chuàng)建一個(gè)InstanceInfoReplicator類,注釋說明是“復(fù)制實(shí)例信息”。該類的完整代碼如下。
/**
* A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
* - configured with a single update thread to guarantee sequential update to the remote server
* - update tasks can be scheduled on-demand via onDemandUpdate()
* - task processing is rate limited by burstSize
* - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
* is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
* on-demand update).
*
* @author dliu
*/
翻譯一下這個(gè)類的注釋:這個(gè)任務(wù)用來更新和復(fù)制本地的實(shí)例信息到遠(yuǎn)程服務(wù)器。這個(gè)類的特征是:配置一個(gè)單獨(dú)的更新線程來保證順序更新到遠(yuǎn)程服務(wù)器;任務(wù)更新可以通過"onDemandUpdate()"按需安排;任務(wù)處理速率受burstSize限制;在較早的更新任務(wù)之后,新的更新任務(wù)始終會(huì)被自動(dòng)計(jì)劃。 但是,如果啟動(dòng)了某項(xiàng)按需任務(wù),則計(jì)劃的自動(dòng)更新任務(wù)將被廢棄(并且,按需更新之后將安排新的計(jì)劃任務(wù))。
InstanceInfoReplicator的start方法來開啟應(yīng)用實(shí)例信息復(fù)制器。
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
執(zhí)行 instanceInfo.setIsDirty() 代碼塊,因?yàn)?InstanceInfo 剛被創(chuàng)建時(shí),在 Eureka-Server 不存在,也會(huì)被注冊(cè)。
InstanceInfoReplicator會(huì)執(zhí)行一個(gè)定時(shí)任務(wù),定時(shí)檢查 InstanceInfo 的狀態(tài)( status ) 屬性是否發(fā)生變化。若是,發(fā)起注冊(cè)。具體工作看一下run()方法的實(shí)現(xiàn)。run()方法中有一句discoveryClient.register();這個(gè)就是注冊(cè)的方法了。代碼如下。
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
調(diào)用 DiscoveryClient#refreshInstanceInfo() 方法,刷新應(yīng)用實(shí)例信息。
調(diào)用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注冊(cè)應(yīng)用實(shí)例。
調(diào)用 ScheduledExecutorService#schedule(...) 方法,再次延遲執(zhí)行任務(wù),并設(shè)置 scheduledPeriodicRef。通過這樣的方式,不斷循環(huán)定時(shí)執(zhí)行任務(wù)。