從頭學(xué)習(xí)SpringCloud(四)Eureka服務(wù)注冊(cè)

這篇文章來看一下DiscoveryClient類是怎么實(shí)現(xiàn)服務(wù)注冊(cè)的。

DiscoveryClient類有一個(gè)initScheduledTasks方法。下面是initScheduledTasks的源碼。

/**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

注釋說明這個(gè)方法初始化了所有的規(guī)定任務(wù)。服務(wù)注冊(cè)也是在這個(gè)方法里。
有一段if分支是clientConfig.shouldRegisterWithEureka(),判斷是否發(fā)起注冊(cè),其實(shí)就是獲取配置文件中的registration.enabled,系統(tǒng)默認(rèn)為true。
在這個(gè)分支內(nèi)會(huì)創(chuàng)建一個(gè)InstanceInfoReplicator類,注釋說明是“復(fù)制實(shí)例信息”。該類的完整代碼如下。

/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - configured with a single update thread to guarantee sequential update to the remote server
 * - update tasks can be scheduled on-demand via onDemandUpdate()
 * - task processing is rate limited by burstSize
 * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
 *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 *   @author dliu
 */

翻譯一下這個(gè)類的注釋:這個(gè)任務(wù)用來更新和復(fù)制本地的實(shí)例信息到遠(yuǎn)程服務(wù)器。這個(gè)類的特征是:配置一個(gè)單獨(dú)的更新線程來保證順序更新到遠(yuǎn)程服務(wù)器;任務(wù)更新可以通過"onDemandUpdate()"按需安排;任務(wù)處理速率受burstSize限制;在較早的更新任務(wù)之后,新的更新任務(wù)始終會(huì)被自動(dòng)計(jì)劃。 但是,如果啟動(dòng)了某項(xiàng)按需任務(wù),則計(jì)劃的自動(dòng)更新任務(wù)將被廢棄(并且,按需更新之后將安排新的計(jì)劃任務(wù))。
InstanceInfoReplicator的start方法來開啟應(yīng)用實(shí)例信息復(fù)制器。

public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

執(zhí)行 instanceInfo.setIsDirty() 代碼塊,因?yàn)?InstanceInfo 剛被創(chuàng)建時(shí),在 Eureka-Server 不存在,也會(huì)被注冊(cè)。

InstanceInfoReplicator會(huì)執(zhí)行一個(gè)定時(shí)任務(wù),定時(shí)檢查 InstanceInfo 的狀態(tài)( status ) 屬性是否發(fā)生變化。若是,發(fā)起注冊(cè)。具體工作看一下run()方法的實(shí)現(xiàn)。run()方法中有一句discoveryClient.register();這個(gè)就是注冊(cè)的方法了。代碼如下。

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

調(diào)用 DiscoveryClient#refreshInstanceInfo() 方法,刷新應(yīng)用實(shí)例信息。
調(diào)用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注冊(cè)應(yīng)用實(shí)例。
調(diào)用 ScheduledExecutorService#schedule(...) 方法,再次延遲執(zhí)行任務(wù),并設(shè)置 scheduledPeriodicRef。通過這樣的方式,不斷循環(huán)定時(shí)執(zhí)行任務(wù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容