一. 前言
本文詳細(xì)介紹了eureka server端關(guān)于服務(wù)治理的幾個(gè)核心方法實(shí)現(xiàn)。主要實(shí)現(xiàn)邏輯集中在com.netflix.eureka.registry.AbstractInstanceRegistry抽象類中。
client端注冊邏輯參見Spring Cloud Eureka 源碼分析 —— Client端
二. 服務(wù)治理
服務(wù)治理包括服務(wù)注冊(register)、服務(wù)續(xù)約(renew)、服務(wù)獲取(getApplication)、服務(wù)下線(cancel)、心跳同步(heartbeat)、服務(wù)剔除(evict)、自我保護(hù)
服務(wù)狀態(tài)管理包括覆蓋狀態(tài)(overriddenStatus)、狀態(tài)變更(statusUpdate)、刪除覆蓋狀態(tài)(deleteStatusOverride)、規(guī)則計(jì)算
三. 源碼分析
服務(wù)注冊(register)
//com.netflix.eureka.registry.AbstractInstanceRegistry
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
* registrant 表示本次注冊的實(shí)例信息
* leaseDuration 表示續(xù)約周期
* isReplication 表示本次是server間的同步(true),還是客戶端主動(dòng)注冊(false)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// registry緩存了實(shí)例注冊的信息,注冊實(shí)現(xiàn)本質(zhì)上就是將實(shí)例信息添加到register屬性中。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 計(jì)數(shù)器+1
REGISTER.increment(isReplication);
//初始化gMap
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
// 如果之前有緩存續(xù)約信息,比較兩個(gè)對(duì)象的時(shí)間差
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
// 如果本次注冊是更老的實(shí)例信息,就還使用上次緩存的對(duì)象
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 如果本次是第一次注冊,expectedNumberOfClientsSendingRenews+1,并更新自我保護(hù)閾值
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 使用之前的服務(wù)開啟時(shí)間
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 將這個(gè)實(shí)例添加到register緩存中
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
// recentRegisteredQueue是一個(gè)記錄注冊操作的隊(duì)列,key是注冊時(shí)間,value是客戶端實(shí)例id,主要用于debug或統(tǒng)計(jì)使用
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 如果請求實(shí)例的覆蓋狀態(tài)不是UNKONWN,且之前map中沒有緩存過,則保存請求的覆蓋狀態(tài)
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
// 如果之前map已經(jīng)緩存過覆蓋狀態(tài),則以map的覆蓋狀態(tài)為準(zhǔn),這是因?yàn)閙ap緩存的值可以被statusUpdate方法調(diào)整,優(yōu)先服務(wù)端的配置值
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 根據(jù)當(dāng)前的覆蓋狀態(tài)和規(guī)則,計(jì)算當(dāng)前實(shí)例狀態(tài),具體計(jì)算規(guī)則下文具體分析
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 如果是UP狀態(tài),就設(shè)置serviceUp的時(shí)間戳
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 設(shè)置操作類型為ADDED,表示添加到server端列表中
registrant.setActionType(ActionType.ADDED);
// 本次register添加到變更隊(duì)列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新變更時(shí)間
registrant.setLastUpdatedTimestamp();
// 失效當(dāng)前實(shí)例的緩存,以及all和all_delta
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
服務(wù)注冊主要有以下幾步:
- 初始化registry中的gMap(非必要)
- 更新expectedNumberOfClientsSendingRenews 并重新計(jì)算自我保護(hù)閾值
- registrant添加到gMap
- 更新overriddenInstanceStatusMap中的覆蓋狀態(tài)
- 計(jì)算并更新registrant的status
- 失效緩存
服務(wù)續(xù)約(renew)
/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
* appName 服務(wù)名
* id 服務(wù)實(shí)例id
* isReplication 表示本次是server間的同步(true),還是客戶端主動(dòng)注冊(false)
*/
public boolean renew(String appName, String id, boolean isReplication) {
// 續(xù)約次數(shù)+1
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
// 根據(jù)服務(wù)名和實(shí)例id獲取服務(wù)實(shí)例
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
// 如果沒有找到,次數(shù)+1
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
// 獲取緩存的實(shí)例信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 獲取計(jì)算后的實(shí)例狀態(tài),計(jì)算方式下文介紹,如果此時(shí)狀態(tài)是UNKNOWN,可能實(shí)例已經(jīng)被刪除
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 如果計(jì)算后的狀態(tài) != 實(shí)例狀態(tài),將計(jì)算后的狀態(tài)作為實(shí)例狀態(tài)
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 增加續(xù)約次數(shù)
renewsLastMin.increment();
// 更新Lease續(xù)約時(shí)間,會(huì)在后續(xù)判斷續(xù)約是否過期時(shí)使用
leaseToRenew.renew();
return true;
}
}
服務(wù)續(xù)約主要有以下幾步:
- 判斷registry中是否存在當(dāng)前實(shí)例id,不存在就直接返回
- 根據(jù)覆蓋狀態(tài)計(jì)算并更新registrant的status
- 更新續(xù)約時(shí)間
服務(wù)獲取(getApplication)
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 是否允許讀取只讀cache的實(shí)例信息
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
// 只讀cache沒有取到,就從讀寫cache中獲取
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
三級(jí)緩存
客戶端在獲取服務(wù)注冊表時(shí),默認(rèn)使用三級(jí)緩存的機(jī)制獲取。
readOnlyCacheMap (ConcurrentMap<Key, Value>)只讀緩存
readWriteCacheMap (LoadingCache<Key, Value>) 讀寫緩存
register (ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>) 注冊表緩存
讀取數(shù)據(jù)先從只讀緩存讀取,只讀緩存沒有再從讀寫緩存讀,讀寫緩存沒有最后再從注冊表讀。
更新機(jī)制
register更新機(jī)制是幾乎每個(gè)服務(wù)操作都會(huì)對(duì)其對(duì)更新,有的是put或remove,有的是更新緩存對(duì)象的一些屬性。
readWriteCacheMap更新機(jī)制是通過invalidateCache方法,或者通過任務(wù)自動(dòng)過期,默認(rèn)過期時(shí)間180s
readOnlyCacheMap 默認(rèn)每隔30秒會(huì)從讀寫緩存中同步
通過下面代碼可以看到invalidateCache的過期實(shí)現(xiàn)和readOnlyCacheMap的刷新任務(wù)。
@Override
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
// appName,全量apps, 增量apps都會(huì)過期
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
/**
* Invalidate the cache information given the list of keys.
*/
public void invalidate(Key... keys) {
for (Key key : keys) {
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
...
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
// responseCacheUpdateIntervalMs 默認(rèn)值30s
...
// 只讀cache刷新任務(wù)
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
服務(wù)下線(cancel)
/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
// 計(jì)數(shù)器+1
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 從registry緩存中移除當(dāng)前實(shí)例id
leaseToCancel = gMap.remove(id);
}
// 下線實(shí)例id加入到下線操作隊(duì)列里
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
// 覆蓋狀態(tài)緩存移除當(dāng)前實(shí)例id
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
// 如果register緩存中沒有找到實(shí)例id,計(jì)數(shù)器+1
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 更新剔除時(shí)間戳
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 設(shè)置操作類型為DELETED,表示從server端列表中刪除
instanceInfo.setActionType(ActionType.DELETED);
// 變更隊(duì)列中添加本次剔除操作
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
// 更新變更時(shí)間戳
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 失效當(dāng)前實(shí)例的緩存,以及all和all_delta
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
服務(wù)下線主要有以下幾步:
- 從registry中將此實(shí)例id移除
- 從overriddenInstanceStatusMap中將此實(shí)例id移除
- 失效緩存
心跳同步(heartbeat)
private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
int responseStatus = response.getStatus();
Builder responseBuilder = new Builder().setStatusCode(responseStatus);
if ("false".equals(config.getExperimental("bugfix.934"))) {
if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
} else {
if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
&& response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
}
return responseBuilder;
}
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
...
}
心跳同步主要用于peer server端之間最終一致性的數(shù)據(jù)同步。最終也調(diào)用到renew方法。
服務(wù)剔除(evict)
// additionalLeaseMs 為補(bǔ)償時(shí)間
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 如果當(dāng)前處于自我保護(hù)狀態(tài),就不允許剔除
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 將所有的過期實(shí)例加入到list中
// 這段注釋是說將所有過期實(shí)例加入到list中,以隨機(jī)的方式剔除,防止在自我保護(hù)開啟前就已經(jīng)把某些服務(wù)的實(shí)例全部剔除掉
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 當(dāng)前實(shí)例總數(shù)
int registrySize = (int) getLocalRegistrySize();
// 當(dāng)前實(shí)例總數(shù) * 自我保護(hù)閾值(0.85)
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 這里計(jì)算剔除數(shù)量,將應(yīng)剔除的總數(shù) 和 最大剔除數(shù)量間 取最小值,每次任務(wù)最多只剔除15%的實(shí)例
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
// 通過隨機(jī)的方式挑選出實(shí)例(洗牌算法)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
// 計(jì)數(shù)器+1
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 找到要剔除的實(shí)例,調(diào)用cancal方法將其下線
internalCancel(appName, id, false);
}
}
}
// 判斷是否過期,需要加上補(bǔ)償時(shí)間,解釋見下文
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
// 檢查是否可以剔除過期實(shí)例
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// 如果沒有開啟自我保護(hù)模式,直接返回true,表示允許過期剔除
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
// 如果開啟了自我保護(hù)模式,且當(dāng)前每分鐘續(xù)約數(shù) > 既定閾值,也允許剔除,否則表示當(dāng)前處于自我保護(hù)狀態(tài)
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
服務(wù)剔除主要有以下幾步:
- 根據(jù)自我保護(hù)機(jī)制以及續(xù)約時(shí)間的判斷找到所有過期實(shí)例
- 計(jì)算剔除數(shù)量,將應(yīng)剔除的總數(shù)和最大剔除數(shù)量之間取最小值
- 使用洗牌算法找出要剔除的實(shí)例id
- 調(diào)用
internalCancel方法將其下線
剔除時(shí)間補(bǔ)償
/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
// 當(dāng)前時(shí)間戳
long currNanos = getCurrentTimeNano();
// 上次時(shí)間戳
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
// 執(zhí)行時(shí)間差
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 補(bǔ)償時(shí)間 = 執(zhí)行時(shí)間差 - 周期時(shí)間
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
設(shè)置補(bǔ)償時(shí)間的目的在于,如果在執(zhí)行過程中出現(xiàn)gc或者時(shí)鐘回退,會(huì)導(dǎo)致執(zhí)行周期時(shí)間略大于配置周期時(shí)間,以至于出現(xiàn)為到期而被剔除的情況。通過時(shí)間補(bǔ)償記錄兩次時(shí)間差,補(bǔ)償一些時(shí)間損耗。
覆蓋狀態(tài)(overriddenStatus)
InstanceInfo有兩個(gè)狀態(tài)的表示字段,分別是status和overriddenStatus,分別表示實(shí)例的狀態(tài)和覆蓋狀態(tài),eureka定義覆蓋狀態(tài)的目的在于,在不改變真實(shí)client端狀態(tài)的前提下,修改server端注冊表的狀態(tài)。
在大多數(shù)情況下,都是InstanceStatus.UP 和InstanceStatus.OUT_OF_SERVICE直接做狀態(tài)切換。InstanceStatus.UP表示接收流量;InstanceStatus.OUT_OF_SERVICE表示摘除流量。
說的更通俗一點(diǎn),就是真實(shí)客戶端保持不動(dòng),是正常狀態(tài),通過修改server端的覆蓋狀態(tài),來達(dá)到讓實(shí)例的InstanceStatus從UP變更為OUT_OF_SERVICE,從而實(shí)現(xiàn)摘除流量的目的。
// key是實(shí)例id
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
.newBuilder().initialCapacity(500)
.expireAfterAccess(1, TimeUnit.HOURS)
.<String, InstanceStatus>build().asMap();
public enum InstanceStatus {
UP, // Ready to receive traffic
DOWN, // Do not send traffic- healthcheck callback failed
STARTING, // Just about starting- initializations to be done - do not
// send traffic
OUT_OF_SERVICE, // Intentionally shutdown for traffic
UNKNOWN;
public static InstanceStatus toEnum(String s) {
if (s != null) {
try {
return InstanceStatus.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
// ignore and fall through to unknown
logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
}
}
return UNKNOWN;
}
}
實(shí)例狀態(tài)的計(jì)算覆蓋規(guī)則
/**
* A single rule that if matched it returns an instance status.
* The idea is to use an ordered list of such rules and pick the first result that matches.
*
* It is designed to be used by
* {@link AbstractInstanceRegistry#getOverriddenInstanceStatus(InstanceInfo, Lease, boolean)}
*
* Created by Nikos Michalakis on 7/13/16.
*/
public interface InstanceStatusOverrideRule {
/**
* Match this rule.
*
* @param instanceInfo The instance info whose status we care about.
* @param existingLease Does the instance have an existing lease already? If so let's consider that.
* @param isReplication When overriding consider if we are under a replication mode from other servers.
* @return A result with whether we matched and what we propose the status to be overriden to.
*/
StatusOverrideResult apply(final InstanceInfo instanceInfo, final Lease<InstanceInfo> existingLease, boolean isReplication);
}
/**
* Container for a result computed by an {@link InstanceStatusOverrideRule}.
*
* Created by Nikos Michalakis on 7/13/16.
*/
public class StatusOverrideResult {
public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null);
public static StatusOverrideResult matchingStatus(InstanceInfo.InstanceStatus status) {
return new StatusOverrideResult(true, status);
}
// Does the rule match?
private final boolean matches;
// The status computed by the rule.
private final InstanceInfo.InstanceStatus status;
private StatusOverrideResult(boolean matches, InstanceInfo.InstanceStatus status) {
this.matches = matches;
this.status = status;
}
public boolean matches() {
return matches;
}
public InstanceInfo.InstanceStatus status() {
return status;
}
}
這里定義了覆蓋規(guī)則的接口,每個(gè)規(guī)則實(shí)現(xiàn)類需要實(shí)現(xiàn)apply,來返回當(dāng)前實(shí)例狀態(tài)是否滿足本規(guī)則,且如果滿足,應(yīng)該返回InstanceInfo.InstanceStatus是什么。
eureka默認(rèn)實(shí)現(xiàn)類的關(guān)系圖:

默認(rèn)有6種實(shí)現(xiàn)類,有一個(gè)
AsgEnabledRule是AWS環(huán)境的實(shí)現(xiàn)類,這里就不多說明。主要分析圖中這5種。
OverrideExistsRule
/**
* This rule checks to see if we have overrides for an instance and if we do then we return those.
*/
public class OverrideExistsRule implements InstanceStatusOverrideRule {
private Map<String, InstanceInfo.InstanceStatus> statusOverrides;
public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
this.statusOverrides = statusOverrides;
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
// If there are instance specific overrides, then they win - otherwise the ASG status
if (overridden != null) {
logger.debug("The instance specific override for instance {} and the value is {}",
instanceInfo.getId(), overridden.name());
return StatusOverrideResult.matchingStatus(overridden);
}
return StatusOverrideResult.NO_MATCH;
}
該規(guī)則使用statusOverrides緩存的值作為返回值,而statusOverrides其實(shí)就是AbstractInstanceRegistry.overriddenInstanceStatusMap,同時(shí)該規(guī)則每次調(diào)用都會(huì)刷新overriddenInstanceStatusMap的有效期,來保證不會(huì)過期。
AlwaysMatchInstanceStatusRule
/**
* This rule matches always and returns the current status of the instance.
*/
public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
該規(guī)則忽視覆蓋狀態(tài),直接按照當(dāng)前實(shí)例的狀態(tài)返回。
LeaseExistsRule
/**
* This rule matches if we have an existing lease for the instance that is UP or OUT_OF_SERVICE.
*/
public class LeaseExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// This is for backward compatibility until all applications have ASG names, otherwise while starting up
// the client status may override status replicated from other servers
if (!isReplication) {
InstanceInfo.InstanceStatus existingStatus = null;
if (existingLease != null) {
existingStatus = existingLease.getHolder().getStatus();
}
// Allow server to have its way when the status is UP or OUT_OF_SERVICE
if ((existingStatus != null) &&
(InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus) || InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
logger.debug("There is already an existing lease with status {} for instance {}",
existingLease.getHolder().getStatus().name(),
existingLease.getHolder().getId());
return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
}
}
return StatusOverrideResult.NO_MATCH;
}
該規(guī)則只對(duì)客戶端主動(dòng)續(xù)約生效,不對(duì)server間同步生效。
該規(guī)則是根據(jù)existingLease來做判斷,也就是匹配已存在續(xù)約的應(yīng)用實(shí)例的狀態(tài)UP或OUT_OF_SERVICE,滿足這兩個(gè)狀態(tài)就直接返回,否則不匹配。
DownOrStartingRule
/**
* This rule matches if the instance is DOWN or STARTING.
*/
public class DownOrStartingRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
// The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
// the client says. The same is the case with replica as well.
// The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
// currently in SERVICE
if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
&& (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
logger.debug("Trusting the instance status {} from replica or instance for instance {}",
instanceInfo.getStatus(), instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
return StatusOverrideResult.NO_MATCH;
}
該規(guī)則只匹配請求實(shí)例的狀態(tài)是DOWN或STARTING。
FirstMatchWinsCompositeRule
/**
* This rule takes an ordered list of rules and returns the result of the first match or the
* result of the {@link AlwaysMatchInstanceStatusRule}.
*/
public class FirstMatchWinsCompositeRule implements InstanceStatusOverrideRule {
private final InstanceStatusOverrideRule[] rules;
private final InstanceStatusOverrideRule defaultRule;
private final String compositeRuleName;
public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
this.rules = rules;
this.defaultRule = new AlwaysMatchInstanceStatusRule();
// Let's build up and "cache" the rule name to be used by toString();
List<String> ruleNames = new ArrayList<>(rules.length+1);
for (int i = 0; i < rules.length; ++i) {
ruleNames.add(rules[i].toString());
}
ruleNames.add(defaultRule.toString());
compositeRuleName = ruleNames.toString();
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
for (int i = 0; i < this.rules.length; ++i) {
StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
if (result.matches()) {
return result;
}
}
return defaultRule.apply(instanceInfo, existingLease, isReplication);
}
該規(guī)則并沒有具體的實(shí)現(xiàn),而是將之前所有需要執(zhí)行的規(guī)則都保存到了一個(gè)數(shù)組中,按順序依次執(zhí)行。 具體的順序在PeerAwareInstanceRegistryImpl的構(gòu)造方法中已指定。
@Inject
public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
根據(jù)此順序可以發(fā)現(xiàn),如果是一個(gè)正常的注冊或續(xù)約請求,會(huì)優(yōu)先按照覆蓋狀態(tài)的值匹配。
狀態(tài)變更(statusUpdate)
/**
* Updates the status of an instance. Normally happens to put an instance
* between {@link InstanceStatus#OUT_OF_SERVICE} and
* {@link InstanceStatus#UP} to put the instance in and out of traffic.
*
* @param appName the application name of the instance.
* @param id the unique identifier of the instance.
* @param newStatus the new {@link InstanceStatus}.
* @param lastDirtyTimestamp last timestamp when this instance information was updated.
* @param isReplication true if this is a replication event from other nodes, false
* otherwise.
* @return true if the status was successfully updated, false otherwise.
*/
@Override
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 計(jì)數(shù)器+1
STATUS_UPDATE.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
// 如果之前沒有register緩存,就直接返回
if (lease == null) {
return false;
} else {
// 更新時(shí)間戳
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
// 如果緩存的status != 本次變更的status,就更新緩存
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// Mark service as UP if needed
// 如果本次是UP且之前不是UP,就更新一下服務(wù)up的時(shí)間戳
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// This is NAC overriden status
// 將本次新的狀態(tài)緩存到覆蓋狀態(tài)的map中
overriddenInstanceStatusMap.put(id, newStatus);
// Set it for transfer of overridden status to replica on
// replica start up
// 更新當(dāng)前實(shí)例的狀態(tài)和覆蓋狀態(tài)為newStatus
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
// 如果本次的dirty時(shí)間戳 > 緩存的時(shí)間戳,就更新緩存info的時(shí)間戳
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 當(dāng)前操作類型為MODIFIED
info.setActionType(ActionType.MODIFIED);
// 加入到變更隊(duì)列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新變更時(shí)間戳
info.setLastUpdatedTimestamp();
// 失效當(dāng)前實(shí)例的緩存,以及all和all_delta
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
本方法主要用在InstanceStatus.UP和InstanceStatus.OUT_OF_SERVICE之間切換,用于只在服務(wù)端控制某個(gè)實(shí)例是否接收流量。方法實(shí)現(xiàn)本質(zhì)是修改ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap的覆蓋狀態(tài)緩存值。
通過修改覆蓋狀態(tài),并結(jié)合覆蓋狀態(tài)計(jì)算規(guī)則,達(dá)到只在服務(wù)端變更實(shí)例狀態(tài)的目的。
刪除覆蓋狀態(tài)(deleteStatusOverride)
/**
* Removes status override for a give instance.
*
* @param appName the application name of the instance.
* @param id the unique identifier of the instance.
* @param newStatus the new {@link InstanceStatus}.
* @param lastDirtyTimestamp last timestamp when this instance information was updated.
* @param isReplication true if this is a replication event from other nodes, false
* otherwise.
* @return true if the status was successfully updated, false otherwise.
*/
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 計(jì)數(shù)器+1
STATUS_OVERRIDE_DELETE.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
if (lease == null) {
return false;
} else {
// 更新時(shí)間戳
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
// 移除時(shí),如果之前已緩存,會(huì)返回
if (currentOverride != null && info != null) {
// 設(shè)置覆蓋狀態(tài)為UNKNOWN
info.setOverriddenStatus(InstanceStatus.UNKNOWN);
// 按照參數(shù)值來設(shè)置狀態(tài),WithoutDirty表示不需要更新dirty時(shí)間戳
info.setStatusWithoutDirty(newStatus);
long replicaDirtyTimestamp = 0;
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
// 如果本次的dirty時(shí)間戳 > 緩存的時(shí)間戳,就更新緩存info的時(shí)間戳
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 當(dāng)前操作類型為MODIFIED
info.setActionType(ActionType.MODIFIED);
// 加入到變更隊(duì)列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新變更時(shí)間戳
info.setLastUpdatedTimestamp();
// 失效當(dāng)前實(shí)例的緩存,以及all和all_delta
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
刪除覆蓋狀態(tài)代碼的實(shí)現(xiàn)邏輯跟更新代碼相差不大,只是一個(gè)是更新map,一個(gè)是從map中刪除。
自我保護(hù)
自我保護(hù)的目的是防止因?yàn)閟erver端和client端的網(wǎng)絡(luò)抖動(dòng)問題,導(dǎo)致大量有效client端被下線,從而影響到client之間本身的相互調(diào)用。
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
計(jì)算公式:每分鐘續(xù)約閾值 = 預(yù)計(jì)客戶端續(xù)約數(shù)(客戶端實(shí)例數(shù)) * 每分鐘續(xù)約的次數(shù) * 閾值百分比
如果每分鐘的續(xù)約數(shù)量 > 閾值,則過期實(shí)例可能被剔除; 否則就進(jìn)入自動(dòng)保護(hù)狀態(tài),不會(huì)有實(shí)例被過期。具體剔除規(guī)則參考evict的說明。