eureka-server服務(wù)注冊的實現(xiàn)、集群同步及eureka源碼分析

引用

注冊debug追蹤到InstanceRegistry


ApplicationResource #addInstance
 registry.register(info, "true".equals(isReplication));
 
InstanceRegistry # 
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //springcloud發(fā)布一個事件EurekaInstanceRegisteredEvent
        handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
 
        //調(diào)用父類的register
        super.register(info, isReplication);
    }
 

PeerAwareInstanceRegistryImpl #

/**                                                                                                           
 * Registers the information about the {@link InstanceInfo} and replicates                                    
 * this information to all peer eureka nodes. If this is replication event                                    
 * from other replica nodes then it is not replicated.                                                        
 *                                                                                                            
 * @param info                                                                                                
 *            the {@link InstanceInfo} to be registered and replicated.                                       
 * @param isReplication                                                                                       
 *            true if this is a replication event from other replica nodes,                                   
 *            false otherwise.                                                                                
 */                                                                                                           
@Override                                                                                                     
public void register(final InstanceInfo info, final boolean isReplication) {  
    //服務(wù)過期時間默認90秒                                
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;                                                       
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {                         
        leaseDuration = info.getLeaseInfo().getDurationInSecs();                                              
    }         
    //服務(wù)注冊                                                                                                
    super.register(info, leaseDuration, isReplication);
    //集群同步                                                       
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);            

AbstractInstanceRegistry#register 服務(wù)注冊

我們知道spring容器底層就是一個ConcurrentHashMap,那么eureka的底層注冊是什么樣的數(shù)據(jù)結(jié)構(gòu)呢?沒錯一定也是一個map.

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

registry的數(shù)據(jù)結(jié)構(gòu)如上等價于<applicationName, Map<instanceId, Lease>>,為了進行相同服務(wù)的集群話,為上一層模塊進行調(diào)用時方便負載均衡.

/**
 * Registers a new instance with a given duration.
 *
 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
 */
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        //根據(jù)應(yīng)用名稱獲取對應(yīng)的服務(wù),因為微服務(wù)的application name可以相同,
        //服務(wù)實例instance id是不同的(方便集群,為負載均衡作準備),
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
         //如果上面存在相同的服務(wù)的application name的微服務(wù),那么就根據(jù)對應(yīng)的服務(wù)的實例instance id來區(qū)分
         //嘗試通過id拿到一個微服務(wù)實例,
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            //已經(jīng)存在的微服務(wù)實例最后修改時間
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            //要注冊的微服務(wù)實例最后修改時間
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            //如果已存的微服務(wù)時間>要注冊的(時間越大說明操作越新),用已存的覆蓋要注冊的
            //即如果出現(xiàn)沖突的話拿最新的微服務(wù)實例
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                //期待發(fā)送心跳的客戶端數(shù)量
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    //要注冊進來了,默認心跳30秒一次,每次心跳在原基礎(chǔ)上加一,一分鐘2次所以加2
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //如果if(gMap == null)都沒有進,說明微服務(wù)組內(nèi)已經(jīng)有微服務(wù)了,直接put(id,instance)即可
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            //最近注冊隊列添加此微服務(wù)
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }
        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        //標記微服務(wù)實例ADDED
        registrant.setActionType(ActionType.ADDED);
        //最近改變隊列添加此微服務(wù),此隊列會保存近三分鐘有改動的微服務(wù),用于增量更新
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //設(shè)置最后更新的時間戳
        registrant.setLastUpdatedTimestamp();
        // 放入緩存中
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

updateRenewsPerMinThreshold()

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

expectedNumberOfClientsSendingRenews:期待發(fā)送心跳的客戶端數(shù)量
ExpectedClientRenewalIntervalSeconds:期待客戶端發(fā)送心跳的間隔秒數(shù)
RenewalPercentThreshold:續(xù)期的百分比閾值85%
numberOfRenewsPerMinThreshold:客戶端每分鐘發(fā)送心跳數(shù)的閾值,如果server在一分鐘內(nèi)沒有收到這么多的心跳數(shù)就會觸發(fā)自我保護機制

舉個例子就明白了:
假設(shè)有100個客戶端,發(fā)送心跳間隔為30s,那么一分鐘如果全部正常的話server收到的心跳應(yīng)該是200次,
如果server一分鐘收到的心跳<200*85%,即170個觸發(fā)自我保護機制

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            // 判斷是否是集群同步請求,如果是,則記錄最后一分鐘的同步次數(shù)
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            // 集群節(jié)點為空,或者這是一個Eureka Server 同步請求,直接return
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 循環(huán)相鄰的Eureka Server Node, 分別發(fā)起請求同步
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 發(fā)起同步請求
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

Eureka Server集群數(shù)據(jù)同步

數(shù)據(jù)同步

Eureka Server之間會互相進行注冊,構(gòu)建Eureka Server集群,不同Eureka Server之間會進行服務(wù)同步,用來保證服務(wù)信息的一致性。當服務(wù)提供者發(fā)送注冊請求到一個服務(wù)注冊中心時, 它會將該請求轉(zhuǎn)發(fā)給集群中相連的其他注冊中心, 從而實現(xiàn)注冊中心之間的服務(wù)同步。通過服務(wù)同步,兩個服務(wù)提供者的服務(wù)信息就可以通過這兩臺服務(wù)注冊中心中的任意一臺獲取到。

源碼解析

Eureka Server 集群不區(qū)分主從節(jié)點,所有節(jié)點相同角色(也就是沒有角色),完全對等。

提供集群功能的包路徑:com.netflix.eureka.cluster

以下提到的同步,準確來說,就是復(fù)制(Replication)。

1、集群節(jié)點初始化

Eureka Server 封裝了一個集群節(jié)點管理的類 PeerEurekaNodes。

/**
 * Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
 *
 * @author Tomasz Bak
 */
@Singleton
public class PeerEurekaNodes {
    //應(yīng)用實例注冊表
    protected final PeerAwareInstanceRegistry registry;
    //Eureka Server配置
    protected final EurekaServerConfig serverConfig;
    //Eureka Client配置
    protected final EurekaClientConfig clientConfig;
    //Eureka Server 編解碼
    protected final ServerCodecs serverCodecs;
    //應(yīng)用實例信息管理器
    private final ApplicationInfoManager applicationInfoManager;
    //集群節(jié)點集合
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    //集群節(jié)點URL集合
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
    //定時任務(wù)線程池
    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager) {
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
    <!---略--->
}

peerEurekaNodes、peerEurekaNodeUrls、taskExecutor 屬性,在構(gòu)造方法中未設(shè)置和初始化,而是在 PeerEurekaNodes#start() 方法中,設(shè)置和初始化,接下來我們看 start() 方法的實現(xiàn)。

PeerEurekaNodes#start() 集群節(jié)點啟動方法,主要完成以下幾件事:

  • 初始化定時任務(wù)線程池
  • 初始化集群節(jié)點信息 updatePeerEurekaNodes 方法
  • 初始化固定周期(默認10分鐘,可配置)更新集群節(jié)點信息的任務(wù)的線程
  • 通過定時任務(wù),線程池定時執(zhí)行更新集群節(jié)點線程
public void start() {
    //1、初始化定時任務(wù)線程池
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    //后臺運行
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        //2、初始化集群節(jié)點信息
        updatePeerEurekaNodes(resolvePeerUrls());
        //3、初始化固定周期更新集群節(jié)點信息的任務(wù)(節(jié)點更新任務(wù)線程)
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }
            }
        };
        //4、創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執(zhí)行終止
        //和下一次執(zhí)行開始之間都存在給定的延遲。如果任務(wù)的任一執(zhí)行遇到異常,就會取
        //消后續(xù)執(zhí)行,否則,只能通過執(zhí)行程序的取消或終止方法來終止該任務(wù)。
        //參數(shù):command(第一個參數(shù))-要執(zhí)行的任務(wù),initialdelay(第二個參數(shù))-首次執(zhí)行的延遲時間
        //delay(第三個參數(shù))-一次執(zhí)行終止和下一次執(zhí)行開始之間的延遲,默認10分鐘
        //unit(第四個參數(shù))-initialdelay和delay參數(shù)的時間單位,默認10分鐘
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

2、更新集群節(jié)點信息

通過 start() 方法可以看出,Eureka Server 是通過一個定時線程定時去更新集群的節(jié)點信息達到對集群節(jié)點的動態(tài)發(fā)現(xiàn)和感知,在上面我們可以看到更新操作主要由 updatePeerEurekaNodes(resolvePeerUrls()) 方法完成,下面查看此方法的實現(xiàn)。

集群同步步驟
  1. 判斷集群節(jié)點是否為空,為空則返回
  2. isReplication 代表是否是一個復(fù)制請求, isReplication = true 表示是其他Eureka Server發(fā)過來的同步請求,這個時候是不需要繼續(xù)往下同步的。否則會陷入同步死循環(huán)
  3. 循環(huán)集群節(jié)點,過濾掉自身的節(jié)點
  4. 發(fā)起同步請求 ,調(diào)用replicateInstanceActionsToPeers

PS: 這里提到了PeerEurekaNode , 對于PeerEurekaNodes的集群節(jié)點更新及數(shù)據(jù)讀取,在服務(wù)啟動的時候,對PeerEurekaNodes集群開啟了線程更新集群節(jié)點信息。每10分鐘一次

/**
 * Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
 * create new ones.
 *
 * @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
 */
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }
    //計算要刪除的集群節(jié)點地址(從以前的地址中刪除最新的地址信息,剩下的就是不可用的地址)
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);
    //計算要新增的集群節(jié)點地址(從最新的地址中刪除以前的地址信息,剩下的就是新增的地址)
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);
    //如果這兩個集合都為空,說明前后地址信息一致,既沒有新增也沒有刪除,不需要更新直接返回
    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }
    //這是以前的所有服務(wù)節(jié)點
    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    //移除舊集合中不可用的節(jié)點信息
    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                //刪除不可用節(jié)點,并關(guān)閉
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }
    // Add new peers
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }
    //重新賦值:集群節(jié)點集合、集群節(jié)點URL集合
    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}

updatePeerEurekaNodes(resolvePeerUrls()) 根據(jù)傳入的新集群的 URL 集合完成節(jié)點的更新

  • 校驗傳入的 URL 集合是否需要更新
  • 移除新 URL 集合中沒有的舊節(jié)點并關(guān)閉節(jié)點
  • 創(chuàng)建舊節(jié)點集合中沒有的新 URL 節(jié)點,通過 createPeerEurekaNode(peerUrl) 方法
  • 重新賦值節(jié)點集合以及節(jié)點URL集合完成節(jié)點的更新

3、創(chuàng)建節(jié)點信息

updatePeerEurekaNodes(resolvePeerUrls()) 方法傳入的新 URL 集合,是通過resolvePeerUrls() 方法獲取,這個方法實際上就是解析配置文件中的 eureka.serviceUrl 前綴的配置,并動態(tài)監(jiān)聽配置的更新。下面我們看創(chuàng)建節(jié)點的方法

protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
    //創(chuàng)建一個連接遠程節(jié)點的客戶端
    HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
    //獲取新節(jié)點host信息
    String targetHost = hostFromUrl(peerEurekaNodeUrl);
    if (targetHost == null) {
        targetHost = "host";
    }
    //創(chuàng)建新節(jié)點
    return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}

4、集群節(jié)點數(shù)據(jù)同步

我們來看看創(chuàng)建新節(jié)點方法 PeerEurekaNode,集群節(jié)點間都有哪些數(shù)據(jù)需要同步

PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                 HttpReplicationClient replicationClient, EurekaServerConfig config,
                                 int batchSize, long maxBatchingDelayMs,
                                 long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    this.targetHost = targetHost;
    this.replicationClient = replicationClient;
    this.serviceUrl = serviceUrl;
    this.config = config;
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();
    String batcherName = getBatcherName();
    //初始化:集群復(fù)制任務(wù)處理器
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    //初始化:批量任務(wù)分發(fā)器
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
            batcherName,
            config.getMaxElementsInPeerReplicationPool(),
            batchSize,
            config.getMaxThreadsForPeerReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
    //初始化:單任務(wù)分發(fā)器
    this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
            targetHost,
            config.getMaxElementsInStatusReplicationPool(),
            config.getMaxThreadsForStatusReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
}

PeerEurekaNode 完成以下幾件事

  • 創(chuàng)建數(shù)據(jù)同步的任務(wù)處理器 ReplicationTaskProcessor
  • 創(chuàng)建批處理任務(wù)分發(fā)器
  • 創(chuàng)建單任務(wù)分發(fā)器
    說明:eureka 將節(jié)點間的數(shù)據(jù)同步工作包裝成一個個細微的任務(wù) ReplicationTask,每一個數(shù)據(jù)操作代表一個任務(wù),將任務(wù)發(fā)送給任務(wù)調(diào)度器 TaskDispatcher 去異步處理。

接下來我們看看 PeerEurekaNode 都可以創(chuàng)建哪些同步任務(wù):

  • register:當 Eureka Server 注冊新服務(wù)時,同時創(chuàng)建一個定時任務(wù)將新服務(wù)同步到集群其他節(jié)點
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    //任務(wù)調(diào)度器中添加一個請求類型為注冊register新服務(wù)的同步任務(wù)
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}
  • cancel:取消服務(wù)注冊任務(wù),當前節(jié)點有服務(wù)取消注冊,將信息同步到集群遠程節(jié)點
public void cancel(final String appName, final String id) throws Exception {
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    //任務(wù)調(diào)度器中添加一個請求類型為取消cancel服務(wù)的同步任務(wù)
    batchingDispatcher.process(
            taskId("cancel", appName, id),
            new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.cancel(appName, id);
                }
                @Override
                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        logger.warn("{}: missing entry.", getTaskName());
                    }
                }
            },
            expiryTime
    );
}
  • heartbeat:心跳同步任務(wù),當前節(jié)點有服務(wù)發(fā)送心跳續(xù)租,將信息同步到集群遠程節(jié)點
public void heartbeat(final String appName, final String id,
                      final InstanceInfo info, final InstanceStatus overriddenStatus,
                      boolean primeConnection) throws Throwable {
    if (primeConnection) {
        // We do not care about the result for priming request.
        replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        return;
    }
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }
        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            super.handleFailure(statusCode, responseEntity);
            if (statusCode == 404) {
                logger.warn("{}: missing entry.", getTaskName());
                if (info != null) {
                    logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                            getTaskName(), info.getId(), info.getStatus());
                    register(info);
                }
            } else if (config.shouldSyncWhenTimestampDiffers()) {
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if (peerInstanceInfo != null) {
                    syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    //任務(wù)調(diào)度器中添加一個請求類型為heartbeat服務(wù)的同步任務(wù)
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

5、集群節(jié)點數(shù)據(jù)同步任務(wù)處理

在 PeerEurekaNode 的構(gòu)造函數(shù)中可以看到同步任務(wù)處理由 ReplicationTaskProcessor 完成,下面看此類源碼

/**
 * 單個處理 ReplicationTask任務(wù)
 */
@Override
public ProcessingResult process(ReplicationTask task) {
    try {
        //調(diào)用任務(wù)execute方法,完成任務(wù)的執(zhí)行
        EurekaHttpResponse<?> httpResponse = task.execute();
        //判斷任務(wù)返回結(jié)果
        int statusCode = httpResponse.getStatusCode();
        Object entity = httpResponse.getEntity();
        if (logger.isDebugEnabled()) {
            logger.debug("Replication task {} completed with status {}, (includes entity {})", task.getTaskName(), statusCode, entity != null);
        }
        if (isSuccess(statusCode)) {
            task.handleSuccess();
        } else if (statusCode == 503) {
            logger.debug("Server busy (503) reply for task {}", task.getTaskName());
            return ProcessingResult.Congestion;
        } else {
            task.handleFailure(statusCode, entity);
            return ProcessingResult.PermanentError;
        }
    } catch (Throwable e) {
        if (maybeReadTimeOut(e)) {
            logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
            //read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
            return ProcessingResult.Congestion;
        } else if (isNetworkConnectException(e)) {
            logNetworkErrorSample(task, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("{}: {} Not re-trying this exception because it does not seem to be a network exception",
                    peerId, task.getTaskName(), e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

單任務(wù)處理

  • 調(diào)用任務(wù) task 的 execute 完成遠程數(shù)據(jù)同步
  • 分析遠程返回結(jié)果
/**
 * 批量處理ReplicationTask任務(wù)
 */
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
    ReplicationList list = createReplicationListOf(tasks);
    try {
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (maybeReadTimeOut(e)) {
            logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
            //read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
            return ProcessingResult.Congestion;
        } else if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

批處理任務(wù),將一組任務(wù)一次性發(fā)送到遠程進行處理

  • 根據(jù)task集合創(chuàng)建 ReplicationList
  • 調(diào)用批量同步接口將同步集合發(fā)送到遠端節(jié)點同步數(shù)據(jù),即調(diào)用 rest API
  • 分析遠程返回結(jié)果

三、總結(jié)

數(shù)據(jù)同步功能主要由 PeerEurekaNodesPeerEurekaNode 類實現(xiàn)。

1、集群節(jié)點初始化:PeerEurekaNodes#start() 方法

  • 初始化定時任務(wù)線程池
  • 初始化集群節(jié)點信息 updatePeerEurekaNodes 方法
  • 初始化固定周期(默認10分鐘,可配置)更新集群節(jié)點信息的任務(wù)的線程
  • 通過定時任務(wù),線程池定時執(zhí)行更新集群節(jié)點線程

2、更新集群節(jié)點信息:updatePeerEurekaNodes(resolvePeerUrls()) 方法

  • 校驗傳入的 URL 集合是否需要更新
  • 移除新 URL 集合中沒有的舊節(jié)點并關(guān)閉節(jié)點
  • 創(chuàng)建舊節(jié)點集合中沒有的新 URL 節(jié)點,通過 createPeerEurekaNode(peerUrl) 方法
  • 重新賦值節(jié)點集合以及節(jié)點URL集合完成節(jié)點的更新
  • resolvePeerUrls() 方法,實際上就是解析配置文件中的 eureka.serviceUrl 前綴的配置,并動態(tài)監(jiān)聽配置的更新。

3、創(chuàng)建節(jié)點信息:createPeerEurekaNode(String peerEurekaNodeUrl) 方法

4、集群節(jié)點數(shù)據(jù)同步:PeerEurekaNode 方法

  • 創(chuàng)建數(shù)據(jù)同步的任務(wù)處理器 ReplicationTaskProcessor
  • 創(chuàng)建批處理任務(wù)分發(fā)器
  • 創(chuàng)建單任務(wù)分發(fā)器
  • PeerEurekaNode 可以創(chuàng)建的同步任務(wù):register、cancel、heartbeat、statusUpdate、deleteStatusOverride

5、集群節(jié)點數(shù)據(jù)同步任務(wù)處理:ReplicationTaskProcessor.process() 完成

  • 單任務(wù)處理
  • 批量任務(wù)處理

總結(jié)

1:@EnableEurekaServer注入一個Marker類,說明是一個注冊中心
2:EurekaServerAutoConfiguration注入一個filter,來攔截jersey請求轉(zhuǎn)發(fā)給resource
3:服務(wù)注冊,就是把信息存到一個ConcurrentHashMap<name, Map<id,Lease>>
4:對于注冊沖突拿最新的微服務(wù)實例
5:server每分鐘內(nèi)收到的心跳數(shù)低于理應(yīng)收到的85%就會觸發(fā)自我保護機制
6:Lease的renew bug, duration多加了一次,理應(yīng)加一個expireTime表示過期時間
7:集群同步:先注冊到一臺server,然后遍歷其他的集群的其他server節(jié)點調(diào)用register注冊到其他server,
isReplication=true代表此次注冊來源于集群同步的注冊,代表此次注冊不要再進行集群同步,避免無限注冊

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容