Spring Cloud——Eureka服務(wù)續(xù)約(心跳機(jī)制)

前言

Eureka Client的應(yīng)用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中,會做以下幾件事:

  • 1、周期性更新服務(wù)列表;
  • 3、周期性服務(wù)續(xù)約;
  • 3、服務(wù)注冊邏輯;

概覽

以下圖片來自Netflix官方,圖中顯示Eureka Client會發(fā)起Renew向注冊中心做周期性續(xù)約,這樣其他Eureka client通過Get Registry請求就能獲取到新注冊應(yīng)用的相關(guān)信息:

來自官方文檔的指導(dǎo)信息

最準(zhǔn)確的說明信息來自Netflix的官方文檔,地址:
https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew

關(guān)于續(xù)約的理解:

  • 1、Eureka client每隔三十秒發(fā)送一次心跳到Eureka server,這就是續(xù)約;
  • 2、Eureka client續(xù)約的目的是告訴Eureka server自己還活著;
  • 3、Eureka server若90秒內(nèi)未收到心跳,就從自己的服務(wù)列表中剔除該Eureka client;
  • 4、建議不要改變心跳間隔,因為Eureka server是通過心跳來判斷Eureka client是否正常;

服務(wù)續(xù)約執(zhí)行簡要流程圖

下面這張圖大致描述了服務(wù)續(xù)約從Client端到Server端的大致流程,詳情如下:

Eureka 續(xù)約源碼分析

1、Eureka Client發(fā)起續(xù)約

Eureka Client向Eureka Server發(fā)起注冊應(yīng)用實例成功后獲得租約,Eureka Client固定間隔向Eureka Server發(fā)起續(xù)約(renew),避免租約過期。

默認(rèn)情況下,租約有效期為90秒,續(xù)約頻率為30秒。兩者比例為1:3,保證在網(wǎng)絡(luò)異常等情況下,有三次重試的機(jī)會。

1)、初始化定時任務(wù)

Eureka Client在初始化過程中,創(chuàng)建心跳線程,固定間隔向Eureka Server發(fā)起續(xù)約。實現(xiàn)代碼如下

@Singleton
public class DiscoveryClient implements EurekaClient {

    /**
     * 初始化所有計劃的任務(wù)
     */
    private void initScheduledTasks() {
        //獲取注冊信息的定時任務(wù)
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            //心跳定時任務(wù)
            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            //服務(wù)實例同步定時任務(wù)
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            
            // 注冊應(yīng)用實例狀態(tài)變更監(jiān)聽器
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            //初始化定時服務(wù)注冊任務(wù)
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
}

2)、發(fā)起續(xù)約

@Singleton
public class DiscoveryClient implements EurekaClient {

    //最后成功向Eureka Server心跳時間戳
    private volatile long lastSuccessfulHeartbeatTimestamp = -1;

    private class HeartbeatThread implements Runnable {
        public void run() {
            // 調(diào)用續(xù)約方法
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    //服務(wù)續(xù)約
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //發(fā)Restful請求,即心跳
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            //404錯誤會觸發(fā)注冊邏輯
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            //返回碼200表示心跳成功
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }   
}

AbstractJerseyEurekaHttpClient的renew()方法使用PUT請求調(diào)用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,參數(shù)為status、lastDirtyTimestamp、overriddenstatus,實現(xiàn)續(xù)約。

繼續(xù)展開上面代碼段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源碼在EurekaHttpClientDecorator類中:

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
                                                      final String id,
                                                      final InstanceInfo info,
                                                      final InstanceStatus overriddenStatus) {
    return execute(new RequestExecutor<InstanceInfo>() {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
            //網(wǎng)絡(luò)處理委托給代理類完成
            return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public RequestType getRequestType() {
            //請求類型為心跳
            return RequestType.SendHeartBeat;
        }
    });
}

繼續(xù)展開delegate.sendHeartBeat,多層調(diào)用一路展開,最終由JerseyApplicationClient類來完成操作,對應(yīng)源碼在父類AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey庫的Restful Api將自身的信息PUT到Eureka server,注意:這里不是POST,也不是GET,而是PUT:

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        //請求參數(shù)有兩個:Eureka client自身狀態(tài)、自身關(guān)鍵信息(狀態(tài)、元數(shù)據(jù)等)最后一次變化的時間
        WebResource webResource = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
         //注意:這里不是POST,也不是GET,而是PUT
        response = requestBuilder.put(ClientResponse.class);
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity() &&
                !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

至此,Eureka client向服務(wù)續(xù)租的源碼就分析完畢了,過程相對簡單,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等實例各司其職,定時發(fā)送PUT請求到Eureka server。

2、Eureka Server接收續(xù)約

Eureka Server接收續(xù)約核心流程如下圖:

1)、接收續(xù)約請求

@Produces({"application/xml", "application/json"})
public class InstanceResource {

    @PUT
    public Response renewLease(
            // 是否是Replication模式 復(fù)制,同步
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            
            @QueryParam("overriddenstatus") String overriddenStatus,    // 實例的覆蓋狀態(tài)
            
            @QueryParam("status") String status,    // 實例狀態(tài)
            
            // 實例信息在EurekClient端上次被修改的時間
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 續(xù)約
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        // 續(xù)租失敗,返回404,EurekaClient端收到404后會發(fā)起注冊請求
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        // 比較InstanceInfo的lastDirtyTimestamp屬性
        Response response;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
             // 驗證傳入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
             // 續(xù)約成功,返回200
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
}

PeerAwareInstanceRegistryImpl中調(diào)用了父類AbstractInstanceRegistry的renew(...)方法續(xù)約實例信息

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 調(diào)用父類里的renew(appName, id, isReplication)方法續(xù)約
        if (super.renew(appName, id, isReplication)) {
            // 如果是續(xù)約請求則向其他EurekaServer節(jié)點同步續(xù)約信息

            // 如果是同步信息請求則直接返回
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
}

2)、續(xù)約應(yīng)用實例信息

調(diào)用了AbstractInstanceRegistry的renew(...)方法,續(xù)約實例信息,代碼如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public boolean renew(String appName, String id, boolean isReplication) {
        // 增加續(xù)約次數(shù)到監(jiān)控
        RENEW.increment(isReplication);
        // 獲取應(yīng)用名對應(yīng)的租約,即根據(jù)實例名稱取出實例信息集合
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 根據(jù)實例id取出具體實例租約信息
            leaseToRenew = gMap.get(id);
        }
        // 租約不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                // 獲得實例的覆蓋狀態(tài)
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                
                // 實例覆蓋狀態(tài)為UNKNOWN,續(xù)租失敗
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                
                // 實例狀態(tài)與覆蓋狀態(tài)不一致
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    
                    // 強(qiáng)行把實例的覆蓋狀態(tài)設(shè)為實例狀態(tài)
                    // 即status = overriddenInstanceStatus
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            // 新增續(xù)租每分鐘次數(shù)
            renewsLastMin.increment();
            // 續(xù)租(設(shè)置lastUpdateTimestamp(租約最后更新時間))
            leaseToRenew.renew();
            return true;
        }
    }
}
public class Lease<T> {

    enum Action {
        Register, Cancel, Renew
    };
    
    private volatile long lastUpdateTimestamp;
    
    public void renew() {
        // 設(shè)置租約最后更新時間戳
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
}

續(xù)約的整個過程修改租約的過期時間,即使并發(fā)請求,也不會對數(shù)據(jù)的一致性產(chǎn)生影響,因此不需要像注冊操作一樣加鎖。

3)、eureka引入overriddenstatus用來解決狀態(tài)被覆蓋問題

客戶端調(diào)用updateStatus方法時,同時更新server端實例的status和overriddenStatus狀態(tài)。

客戶端調(diào)用renew方法時,也要更新server端實例的status和overriddenstatus狀態(tài),但是有以下規(guī)則的

  • (1):如果客戶端上傳的實例狀態(tài)是down或者starting,表明客戶端是重啟或者h(yuǎn)ealthCheck失敗。此時這個實例不能作為服務(wù)提供服務(wù)。因此即使客戶端調(diào)用updateStatus把實例狀態(tài)更新為up,也是沒用的。此時客戶端實例的準(zhǔn)確狀態(tài)就是down或者starting。

  • (2):如果客戶端的實例是up或者out_of_service,此時是不可信的。就像第二大節(jié)介紹的那樣。有可能client端的實例狀態(tài)已被改變,此時要使用overriddenstatus狀態(tài)作為當(dāng)前實例的狀態(tài),避免被覆蓋。

  • (3):(2)中的overriddenstatus有可能不存在,緩存失效,此時要使用server端已經(jīng)存在的實例的狀態(tài)。

參考:
https://xinchen.blog.csdn.net/article/details/82915355

https://blog.csdn.net/qq_40378034/article/details/119079180

https://blog.csdn.net/NEW_BUGGER/article/details/93710797

https://www.cnblogs.com/liujunj/p/13401808.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容