Eureka系列(五) 服務(wù)續(xù)約流程具體實(shí)現(xiàn)

服務(wù)續(xù)約執(zhí)行簡(jiǎn)要流程圖

??下面這張圖大致描述了服務(wù)續(xù)約從Client端到Server端的大致流程,詳情如下:
Eureka 服務(wù)續(xù)約.jpg

服務(wù)續(xù)約Client源碼分析

??我們先來看看服務(wù)續(xù)約定時(shí)任務(wù)的初始化。那我們的服務(wù)續(xù)約定時(shí)任務(wù)什么時(shí)候會(huì)被初始化呢,那肯定是我們啟用我們Eureka Client的時(shí)候,當(dāng)我們啟動(dòng)Client時(shí),Eureka會(huì)先處理相關(guān)的配置,然后初始化我們Client的相關(guān)信息,我們的定時(shí)任務(wù)也就是此時(shí)進(jìn)行的初始化,具體來說我們的服務(wù)續(xù)約定時(shí)任務(wù)就是在DiscoveryClient這個(gè)類中initScheduledTasks方法中被初始化的。代碼如下:

private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
     …省略其他代碼
    // 初始化定時(shí)服務(wù)續(xù)約任務(wù)
    scheduler.schedule(
        new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        ),
        renewalIntervalInSecs, TimeUnit.SECONDS);
     …省略其他代碼   
}

??由此可見,我們的定時(shí)任務(wù)其實(shí)是Client進(jìn)行初始化完成的,并且還是使用ScheduledExecutorService線程池來完成我們的定時(shí)任務(wù)。接下來我們看看HeartbeatThread類相關(guān)的源碼:

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

??HeartbeatThread類中run方法調(diào)用了renew方法,我們接著看下去:

/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 調(diào)用Eureka Server提供的服務(wù)續(xù)約Http接口
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

?? 通過eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null) 這行代碼,我們則可以知道Client端的續(xù)約其實(shí)就是用調(diào)用Server提供的接口,然后Server端實(shí)現(xiàn)了服務(wù)續(xù)約的操作。


服務(wù)續(xù)約Server源碼實(shí)現(xiàn)

??由上面可見,我們Client的續(xù)約操作是直接調(diào)用的Server端提供的接口,以此來實(shí)現(xiàn)的具體操作,那我們接下來就看看Server是如何進(jìn)行服務(wù)續(xù)約操作的,源碼如下:

@PUT
public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 進(jìn)行服務(wù)續(xù)約操作
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }

??由registry.renew(app.getName(), id, isFromReplicaNode);這里我們可以看到調(diào)用了renew()方法來實(shí)現(xiàn)續(xù)約操作,那我們就看下這個(gè)方法的實(shí)現(xiàn):

Override
    public boolean renew(final String appName, final String serverId,
            boolean isReplication) {
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
                + isReplication);
        List<Application> applications = getSortedApplications();
        for (Application input : applications) {
            if (input.getName().equals(appName)) {
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) {
                    if (info.getId().equals(serverId)) {
                        instance = info;
                        break;
                    }
                }
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                        instance, isReplication));
                break;
            }
        }
        return super.renew(appName, serverId, isReplication);
    }

??我們可以看到方法最后調(diào)用了父類的renew()方法,我們繼續(xù)看下去:

public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            // 向其他server發(fā)送服務(wù)續(xù)約消息
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
 public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }
public void renew() {
        //  更新上次更新時(shí)間
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

??由上面的代碼可見,Eureka Server服務(wù)續(xù)約的流程可大致分為以下幾步:
????1.更新實(shí)例狀態(tài)
????2.更新實(shí)例上次更新時(shí)間
????3.向其他Server發(fā)送服務(wù)續(xù)約消息


??下面為自己總結(jié)的Eureka相關(guān)的知識(shí)點(diǎn),有興趣地小伙伴可以看一看,當(dāng)然再點(diǎn)下贊就更棒了,創(chuàng)作不易!
??Eureka系列(一)Eureka功能介紹
??Eureka系列(二) 服務(wù)注冊(cè)Server端具體實(shí)現(xiàn)
??Eureka系列(三)獲取服務(wù)Client端具體實(shí)現(xiàn)
??Eureka系列(四) 獲取服務(wù)Server端具體實(shí)現(xiàn)
??Eureka系列(五) 服務(wù)續(xù)約流程具體實(shí)現(xiàn)
??Eureka系列(六) TimedSupervisorTask類解析
??Eureka系列(七) 服務(wù)下線Server端具體實(shí)現(xiàn)
??Eureka系列(八)服務(wù)剔除具體實(shí)現(xiàn)
??Eureka系列(九)Eureka自我保護(hù)機(jī)制

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容