Eureka源碼淺讀---服務(wù)心跳

Eureka源碼采用1.7.2版本

本人小白,此文為本人閱讀源碼筆記,如果您讀到本文,您需要自己甄別是否正確,文中的說(shuō)明只代表本人理解,不一定是正確的?。?!

心跳機(jī)制在于證明客戶端正常運(yùn)行,在代碼層面在于定時(shí)更新過(guò)期時(shí)間,防止自動(dòng)故障移除機(jī)制導(dǎo)致實(shí)例被摘除。
Eureka Client的心跳機(jī)制是在客戶端初始化時(shí)也構(gòu)建了一個(gè)心跳執(zhí)行的線程池

com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)

            //初始化保持心跳的線程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

心跳任務(wù)的觸發(fā)在調(diào)度任務(wù)初始化時(shí)候

com.netflix.discovery.DiscoveryClient#initScheduledTasks

        scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            //任務(wù)線程
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

默認(rèn)是renewalIntervalInSecs延遲之后再進(jìn)行心跳發(fā)送,默認(rèn)是30S,并且定時(shí)發(fā)送的間隔也是30S。這里面核心的是
任務(wù)執(zhí)行線程HeartbeatThread

com.netflix.discovery.DiscoveryClient.HeartbeatThread#run

    public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //發(fā)送心跳
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

這兩個(gè)方法沒(méi)什么可說(shuō)的,就是發(fā)送一個(gè)Http請(qǐng)求,請(qǐng)求的地址類似這樣的:

http://DESKTOP-1S6DCTA:8080/v2/apps/EUREKA/001

參數(shù):status,lastDirtyTimestamp

method:PUT

直接轉(zhuǎn)Server端吧,看看怎么處理心跳請(qǐng)求

com.netflix.eureka.resources.InstanceResource#renewLease
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            //集群節(jié)點(diǎn)同步心跳
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

核心方法還是調(diào)用了父類的renew(),主要的處理邏輯在那里面

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

    //服務(wù)續(xù)約方法
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        //獲取appName獲取服務(wù)實(shí)例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            //通過(guò)ID獲取租約
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            //獲取租約的服務(wù)實(shí)例
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                //獲取服務(wù)實(shí)例的狀態(tài)
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                //如果不相等則進(jìn)行覆蓋
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            //記錄每一分鐘實(shí)際的心跳數(shù)量
            renewsLastMin.increment();
            //重置續(xù)約時(shí)間
            leaseToRenew.renew();
            return true;
        }
    }

主要流程:

  • 依據(jù)服務(wù)名稱和服務(wù)ID獲取到租約
  • 設(shè)置租約的覆蓋狀態(tài)
  • 當(dāng)前分鐘內(nèi)心跳次數(shù)自增
  • 重置續(xù)約時(shí)間

最核心就是更新服務(wù)實(shí)例的過(guò)期時(shí)間,防止自動(dòng)故障移除機(jī)制錯(cuò)誤的摘除,服務(wù)實(shí)例的方法如下:

lastUpdateTimestamp = System.currentTimeMillis() + duration;

這個(gè)是當(dāng)前時(shí)間+過(guò)期時(shí)間(90S),有個(gè)BUG,在自動(dòng)故障移除的時(shí)候會(huì)看到的

從這個(gè)源碼中可以看出一個(gè)場(chǎng)景啊,當(dāng)一個(gè)服務(wù)實(shí)例因?yàn)榫W(wǎng)絡(luò)故障長(zhǎng)時(shí)間未發(fā)送心跳,造成服務(wù)實(shí)例被摘除(3min)
那么后續(xù)網(wǎng)絡(luò)正常后,再次發(fā)送心跳會(huì)找不到對(duì)應(yīng)的實(shí)例
客戶端這塊是怎么解決的呢?看看客戶端代碼:

com.netflix.discovery.DiscoveryClient#renew

    //如果當(dāng)前的服務(wù)實(shí)例在注冊(cè)中心已經(jīng)被移除
    if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //重新注冊(cè)下
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }

可以看到客戶端在發(fā)現(xiàn)服務(wù)實(shí)例未找到時(shí),直接進(jìn)行注冊(cè)了

我這里說(shuō)下,Eureka的一堆機(jī)制里調(diào)度任務(wù)太多,造成一些時(shí)間計(jì)算感覺(jué)不是很可靠

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容