Spring Cloud Eureka 源碼分析 —— Client端

一. 前言

eureka的client端主要完成幾件事情:

  1. 服務實例的注冊
  2. 服務實例的續(xù)約
  3. 拉取server端的注冊表

整個源碼有幾個重點類值得關注:

類名 說明
EurekaClientConfigBean 配置參數的bean,保存eureka.client.xxx的所有參數
EurekaInstanceConfigBean 配置參數的bean,保存eureka.instance.xxx的所有參數
InstanceInfo 服務實例信息的封裝類
ApplicationInfoManager 封裝注冊到eureka的應用信息,包括實例信息,參數配置,實例狀態(tài)等,并管理實例的各種操作
CloudEurekaClient 發(fā)起操作的客戶端類,如register、fetch、heartbeat

二. 源碼分析

1. 啟動配置bean注入

EurekaClientAutoConfiguration

EurekaClientConfigBean
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
    }

EurekaClientConfigBean 對應的配置前綴是eureka.client,從配置文件讀取的配置會保存到此bean中

常用配置:
參數 說明
defaultZone 默認zone的地址
transport 各種通信配置
registryFetchIntervalSeconds 每次從eureka服務端拉取服務列表的時間間隔(默認30s)
instanceInfoReplicationIntervalSeconds 復制實例變化信息到eureka服務端的時間間隔(默認30s)
initialInstanceInfoReplicationIntervalSeconds 初始時復制實例信息到eureka服務端所需的時間(默認40s)
eurekaServiceUrlPollIntervalSeconds 拉取eureka服務端地址更改的時間間隔(默認300s)
registerWithEureka 本實例是否注冊到eureka服務端
EurekaInstanceConfigBean
    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
            ManagementMetadataProvider managementMetadataProvider) {
        String hostname = getProperty("eureka.instance.hostname");
        boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
        String ipAddress = getProperty("eureka.instance.ip-address");
        boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

        String serverContextPath = env.getProperty("server.servlet.context-path", "/");
        int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));

        Integer managementPort = env.getProperty("management.server.port", Integer.class);
        String managementContextPath = env.getProperty("management.server.servlet.context-path");
        Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

        instance.setNonSecurePort(serverPort);
        instance.setInstanceId(getDefaultInstanceId(env));
        instance.setPreferIpAddress(preferIpAddress);
        instance.setSecurePortEnabled(isSecurePortEnabled);
        if (StringUtils.hasText(ipAddress)) {
            instance.setIpAddress(ipAddress);
        }

        if (isSecurePortEnabled) {
            instance.setSecurePort(serverPort);
        }

        if (StringUtils.hasText(hostname)) {
            instance.setHostname(hostname);
        }
        String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
        String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

        if (StringUtils.hasText(statusPageUrlPath)) {
            instance.setStatusPageUrlPath(statusPageUrlPath);
        }
        if (StringUtils.hasText(healthCheckUrlPath)) {
            instance.setHealthCheckUrlPath(healthCheckUrlPath);
        }

        ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
                managementContextPath, managementPort);

        if (metadata != null) {
            instance.setStatusPageUrl(metadata.getStatusPageUrl());
            instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
            if (instance.isSecurePortEnabled()) {
                instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
            }
            Map<String, String> metadataMap = instance.getMetadataMap();
            metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
        }
        else {
            // without the metadata the status and health check URLs will not be set
            // and the status page and health check url paths will not include the
            // context path so set them here
            if (StringUtils.hasText(managementContextPath)) {
                instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
                instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
            }
        }

        setupJmxPort(instance, jmxPort);
        return instance;
    }

EurekaInstanceConfigBean 對應的配置前綴是eureka.instance,從配置文件讀取的實例信息配置會保存到此bean中

常用配置:
參數 說明
instanceId 注冊到eureka服務端實例id,默認是ip:服務名:端口
hostname 注冊的主機名, 默認使用主機名注冊
preferIpAddress 是否開啟ip注冊
ipAddress 注冊的ip地址
leaseRenewalIntervalInSeconds 向eureka服務端續(xù)約的時間間隔(默認30s)
leaseExpirationDurationInSeconds 向eureka服務端設置續(xù)約到期的時間間隔(默認90s),超過此間隔服務端會下掉該服務
initialStatus 注冊到eureka服務端的初始化狀態(tài),默認UP
EurekaServiceRegistry
    @Bean
    public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
    }

EurekaServiceRegistry 實現(xiàn)了ServiceRegistry,是實例注冊的具體實現(xiàn)類,內部通過register完成服務注冊事件的發(fā)送動作。

EurekaRegistration
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
            @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
                    .with(healthCheckHandler).build();
    }

EurekaRegistration 是真正被注冊的對象,內部封裝了客戶端所有的注冊信息,是org.springframework.cloud.client.serviceregistry.ServiceInstance此標準的具體實現(xiàn)類

ApplicationInfoManager
    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }
public InstanceInfo create(EurekaInstanceConfig config) {
        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        // Builder the instance information to be registered with eureka
        // server
        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();

        String namespace = config.getNamespace();
        if (!namespace.endsWith(".")) {
            namespace = namespace + ".";
        }
        builder.setNamespace(namespace).setAppName(config.getAppname()).setInstanceId(config.getInstanceId())
                .setAppGroupName(config.getAppGroupName()).setDataCenterInfo(config.getDataCenterInfo())
                .setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
                .setPort(config.getNonSecurePort())
                .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled())
                .setSecurePort(config.getSecurePort())
                .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
                .setVIPAddress(config.getVirtualHostName()).setSecureVIPAddress(config.getSecureVirtualHostName())
                .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
                .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(),
                        config.getSecureHealthCheckUrl())
                .setASGName(config.getASGName());

        // Start off with the STARTING state to avoid traffic
        if (!config.isInstanceEnabledOnit()) {
            InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: " + initialStatus);
            }
            builder.setStatus(initialStatus);
        }
        else {
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP
                        + ". This may be too early for the instance to advertise itself as available. "
                        + "You would instead want to control this via a healthcheck handler.");
            }
        }

        // Add any user-specific metadata information
        for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
            String key = mapEntry.getKey();
            String value = mapEntry.getValue();
            // only add the metadata if the value is present
            if (value != null && !value.isEmpty()) {
                builder.add(key, value);
            }
        }

        InstanceInfo instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        return instanceInfo;
    }

首先創(chuàng)建了初始化狀態(tài)的InstanceInfo,根據配置對InstanceInfo的大量屬性賦值。
構建ApplicationInfoManager內部封裝了注冊到eureka的應用信息,包括實例信息,參數配置,實例狀態(tài)等。
InstanceInfo對象也會被設置到到ApplicationInfoManager中。

CloudEurekaClient
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
                EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
            // If we use the proxy of the ApplicationInfoManager we could run into a problem
            // when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
            // requested but wont be allowed because we are shutting down. To avoid this
            // we use the object directly.
            ApplicationInfoManager appManager;
            if (AopUtils.isAopProxy(manager)) {
                appManager = ProxyUtils.getTargetObject(manager);
            }
            else {
                appManager = manager;
            }
            CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
                    this.context);
            cloudEurekaClient.registerHealthCheck(healthCheckHandler);
            return cloudEurekaClient;
        }

這個bean非常重要,在內部完成了幾乎所有客戶端操作的定義,是發(fā)起操作的客戶端類。重點在構造方法中實現(xiàn),下文會具體分析。

2. SmartLifecycle的啟動點

eureka客戶端的注冊動作,通過SmartLifecycle來觸發(fā),對應的實現(xiàn)類是EurekaAutoServiceRegistration

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {

    private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);

    private AtomicBoolean running = new AtomicBoolean(false);

    private int order = 0;

    private AtomicInteger port = new AtomicInteger(0);

    private ApplicationContext context;

    private EurekaServiceRegistry serviceRegistry;

    private EurekaRegistration registration;

    public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
            EurekaRegistration registration) {
        this.context = context;
        this.serviceRegistry = serviceRegistry;
        this.registration = registration;
    }

    @Override
    public void start() {
        ...
        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        // getNonSecurePort() 就是項目的啟動端口
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }

直接看start方法,根據初始化參數,直接進入if方法。有兩個關鍵方法,分別是register()開始注冊, publishEvent 發(fā)布當前實例注冊的事件。

EurekaServiceRegistry

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {

    private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);

    @Override
    public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                    + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
        }

        reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        reg.getHealthCheckHandler()
                .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
    }

    // 這個方法的實際作用是讓ApplicationInfoManager和CloudEurekaClient被使用,來觸發(fā)bean注入,這兩個bean都是lazy的
    private void maybeInitializeClient(EurekaRegistration reg) {
        // force initialization of possibly scoped proxies
        reg.getApplicationInfoManager().getInfo();
        reg.getEurekaClient().getApplications();
    }

maybeInitializeClient 觸發(fā)了后續(xù)兩個重要的bean:ApplicationInfoManagerEurekaClient的實例化過程。
ApplicationInfoManager存放了InstanceInfo的信息和InstanceConfig的所有配置信息。
EurekaClient(CloudEurekaClient) 在構造過程中,完成了整個客戶端的注冊、向服務端進行數據同步,schedule任務的定義和開啟。

依次介紹這兩個類的創(chuàng)建過程。

new ApplicationInfoManager()

bean的定義在上面的AutoConfiguration中

    @Inject
    public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
        this.config = config;
        this.instanceInfo = instanceInfo;
        this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
        if (optionalArgs != null) {
            this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
        } else {
            this.instanceStatusMapper = NO_OP_MAPPER;
        }

        // Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
        instance = this;
    }

這里首先通過工廠創(chuàng)建了一個InstanceInfo的對象,create方法將EurekaInstanceConfig里的所有配置通過builder模式賦值到InstanceInfo中。
并在內部初始化了兩個屬性,listeners和instanceStatusMapper。

new CloudEurekaClient()

bean的定義在上面的AutoConfiguration中

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
            AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }

先調用父類的super構造方法

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        }, randomizer);
    }

在構造方法的初始邏輯前,先初始化了BackupRegistry的對象,這個是當eureka所有的url都無法連接時,會按照backupRegistryImpl指定的自定義類來實現(xiàn),默認使用NotImplementedRegistryImpl,內部什么都不處理。

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        // 是否用自定義參數來覆蓋
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        // -------------   將構造方法的參數 給全局參數賦值   -----------------
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
        // -------------   將構造方法的參數 給全局參數賦值   -----------------

        // 如果開啟了拉取注冊表,就在ThresholdLevelsMetric中注冊JXM,提供metric的查詢
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        
        // 如果開啟了注冊到eurkea,就在ThresholdLevelsMetric中注冊JXM,提供metric的查詢
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        //如果既不需要拉取,又不需要注,則直接結束
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);

            return;  // no need to setup up an network tasks and we are done
        }

        // 否則就完成整個client端初始化的動作。
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            // 線程池創(chuàng)建兩個線程,分別給心跳和刷新cache使用
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            // 用于執(zhí)行心跳任務的線程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
             
           // 用于執(zhí)行刷新緩存任務的線程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            // 完成eurekaTransport的創(chuàng)建和內部屬性的初始化
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            // 初始化默認的az和region的對應關系
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            // 構建instanceRegionChecker 用于查詢InstanceInfo對應的region,判斷是否localRegion等
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry()) {
            try {
                // 重要方法,具體說明看fetchRegistry的分析
                boolean primaryFetchRegistryResult = fetchRegistry(false);
                // 如果拉取失敗,則會輸出如下異常
                if (!primaryFetchRegistryResult) {
                    logger.info("Initial registry fetch from primary servers failed");
                }
                boolean backupFetchRegistryResult = true;
                // 拉取失敗則調用backup的實現(xiàn)類
                if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                    backupFetchRegistryResult = false;
                    logger.info("Initial registry fetch from backup servers failed");
                }
                if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                    throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                }
            } catch (Throwable th) {
                logger.error("Fetch registry error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        // 提供了一個擴展點,需要在發(fā)起注冊之前,進行一些操作。
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        // 在初始化階段強制注冊,這里shouldEnforceRegistrationAtInit()默認值是false,如果注冊失敗會拋出異常
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        // 關鍵方法,初始化各種schedule任務,包括心跳、拉取注冊表等,具體說明看initScheduledTasks的分析
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
    }
fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            // Application 內部封裝了從eureka服務端返回的所有注冊表信息,第一次啟動時這里時空的,在拉取全量或增量時更新
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                // 全量拉取注冊表信息
                getAndStoreFullRegistry();
            } else {
                // 增量拉取注冊表信息
                getAndUpdateDelta(applications);
            }
            // 這里的hashCode用來判斷拉取跟上次相比是否發(fā)生過變更
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                    appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;

這個fetchRegistry方法會在啟動時調用,以及schedule任務執(zhí)行時,每次刷新客戶端緩存時調用。主要做了幾件事情。

  1. 如果是啟動時,會全量拉取eureka的注冊信息,調用/eureka/apps
  private void getAndStoreFullRegistry() throws Throwable {
        ...
        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        ...

這里的apps就是從eureka服務端拉取的全量注冊表信息,如下圖


  1. 如果是schedule任務會增量拉取,調用getAndUpdateDelta(),內部通過http請求/eureka/apps/delta,這里的delta就是從eureka服務端拉取的增量注冊表信息。
   private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
        if (delta == null) {
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 獲取到增量列表后,依次遍歷每一個application,更新到本地cache中
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } 
    ...
  1. 調用onCacheRefreshed(),刷新客戶端緩存。子類CloudEurekaClient重寫了onCacheRefresh,由eventListeners來處理CacheRefreshedEvent,并發(fā)出HeartbeatEvent事件。
    @Override
    protected void onCacheRefreshed() {
        super.onCacheRefreshed();

        if (this.cacheRefreshedCount != null) { // might be called during construction and
            // will be null
            long newCount = this.cacheRefreshedCount.incrementAndGet();
            log.trace("onCacheRefreshed called with count: " + newCount);
            this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
        }
    }
    protected void onCacheRefreshed() {
        fireEvent(new CacheRefreshedEvent());
    }

    protected void fireEvent(final EurekaEvent event) {
        for (EurekaEventListener listener : eventListeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
            }
        }
    }
  1. 調用updateInstanceRemoteStatus將當前實例的狀態(tài)更新到eureka服務端。
  private synchronized void updateInstanceRemoteStatus() {
        // Determine this instance's status for this app and set to UNKNOWN if not found
        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
        if (instanceInfo.getAppName() != null) {
            Application app = getApplication(instanceInfo.getAppName());
            if (app != null) {
                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                if (remoteInstanceInfo != null) {
                    // 本次拉取的服務狀態(tài)
                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                }
            }
        }
        if (currentRemoteInstanceStatus == null) {
            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
        }

        // Notify if status changed
        // 本次拉取的服務狀態(tài) 和 上次拉取的服務狀態(tài)不一致
        if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
            onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
            lastRemoteInstanceStatus = currentRemoteInstanceStatus;
        }
    }

  protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
        fireEvent(new StatusChangeEvent(oldStatus, newStatus));
    }

先將服務遠程的當前狀態(tài)跟上一次拉取的狀態(tài)進行比較,如果一致,則結束,不一致則發(fā)出一個StatusChangeEvent事件。

如果成功從eureka 服務端拉取注冊表成功,則fetchRegistry返回true。

initScheduledTasks()
    // 初始化所有的schedule任務
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 定義cacheRefreshTask,schedule每隔registryFetchIntervalSeconds來刷新客戶端緩存
            cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
            scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            // 定義heartbeatTask,schedule每隔renewalIntervalInSecs來發(fā)起心跳(續(xù)約)
            heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
            scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            // 實例信息的復制器,內部包含schedule,同步到eureka服務端
            instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize

            // 定義了狀態(tài)變更的listener,并注冊到applicationInfoManager中
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 開啟schedule任務
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
  1. 首先給cache和heartbeat創(chuàng)建了兩個TimedSupervisorTask,定義如下,scheduler會每隔一定時間來執(zhí)行task。
  2. 創(chuàng)建了instanceInfoReplicator,是實例信息變化的復制器,當發(fā)現(xiàn)出現(xiàn)變化時,內部通過register方法將新的信息同步到eureka服務端。
  3. 創(chuàng)建了statusChangeListener,當觸發(fā)狀態(tài)變更回調時,調用instanceInfoReplicator.onDemandUpdate(); 并將其添加到applicationInfoManager中。
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        ...
    }
@Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } finally {
            if (future != null) {
                future.cancel(true);
            }

            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

這里還需要 重點說明下run的實現(xiàn)
scheduler.schedule在調度任務時,只傳了delay的時間,并沒有傳周期時間,而執(zhí)行周期其實是會動態(tài)變化的,這里的動態(tài)變化是通過scheduler.schedule的遞歸來完成。

可以看到在try塊中,delay的值會被重置,并且如果出現(xiàn)超時,則將newDelay可能設置成2倍的時間,再在finally中再次執(zhí)行schedule,延遲的時間就是2倍的delay,這是一種動態(tài)周期的schedule實現(xiàn)方案

cacheRefreshTask
class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
            ...
            //remoteRegionsModified  
            // 前文判斷根據remoteRegions是否發(fā)生變化來決定是全量拉取還是增量拉取
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                // 這里的localRegionApps是拉取注冊表信息內容的本地緩存
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            ...
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }

這個task其實就是調用了fetchRegistry,拉取最新的注冊表,刷新本地cache。

heartbeatTask
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            ... //這里判斷如果是404的請求,則再發(fā)起一次注冊請求,同步實例信息
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

這個task就是發(fā)起了一個http請求,調用了sendHearBeat方法,發(fā)起了一次心跳續(xù)約。

InstanceInfoReplicator
   public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

     public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

InstanceInfoReplicator 是實例信息的同步器,作用是當發(fā)生變更時(狀態(tài)、地址、ip、配置參數值等),將新的InstanceInfo同步給eureka服務端。
這里的三個方法:
start():在初始化時,調用start開啟schedule任務,每隔一段時間來調用run()檢測變更。
onDemandUpdate():在觸發(fā)狀態(tài)變更時,作為listener來調用,也會調用run()檢測變更。
run():檢測當前InstanceInfo是否跟上次相同,如果出現(xiàn)變更,則調用discoveryClient.register();將新的信息同步到eureka服務端。依據則是instanceInfo是否被標記為isDirty

而標記的依據在discoveryClient.refreshInstanceInfo();中實現(xiàn)。

/**
     * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
     * isDirty flag on the instanceInfo is set to true
     */
    void refreshInstanceInfo() {
        // 判斷地址、ip、實例所在的部署節(jié)點是否變更
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        // 判斷各種續(xù)約配置值(LeaseInfo) 是否變更
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }

先根據兩個refresh方法來判斷是否變更,如果變更,內部會調用instanceInfo.setIsDirty();來進行標記。

public synchronized void setIsDirty() {
        isInstanceInfoDirty = true;
        lastDirtyTimestamp = System.currentTimeMillis();
    }

至此,initScheduledTasks分析完成。內部分別啟動了三個schedule,分別用于將新的注冊表刷新本地cache發(fā)送心跳續(xù)約、發(fā)送客戶端最新的實例信息。

再回到DiscoveryClient的構造方法,在initScheduledTasks執(zhí)行完成后,就是注冊JMX bean,設置一些全局屬性。

CloudEurekaClient 在構造完成后,會根據是否配置了healthCheckHandler(默認的EurekaHealthCheckHandler實現(xiàn)需要配置springboot actuator),如果配置了會將健康狀態(tài)同步到eureka 服務端一次。

    @Override
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
        if (instanceInfo == null) {
            logger.error("Cannot register a healthcheck handler when instance info is null!");
        }
        if (healthCheckHandler != null) {
            this.healthCheckHandlerRef.set(healthCheckHandler);
            // schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
            if (instanceInfoReplicator != null) {
                instanceInfoReplicator.onDemandUpdate();
            }
        }
    }

至此,CloudEurekaClient的整個構建過程完成的所有內容也分析完成。

再回到EurekaServiceRegistry的register方法

  public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                    + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
        }

        reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        reg.getHealthCheckHandler()
                .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
    }

在完成整個過程的定義之后,先調用setInstanceStatus設置Instance的狀態(tài),啟動成功這里就是UP。
接著,如果配置了健康檢查,還會觸發(fā)一個發(fā)送健康檢查同步的事件。

public synchronized void setInstanceStatus(InstanceStatus status) {
        InstanceStatus next = instanceStatusMapper.map(status);
        if (next == null) {
            return;
        }

        InstanceStatus prev = instanceInfo.setStatus(next);
        if (prev != null) {
            for (StatusChangeListener listener : listeners.values()) {
                try {
                    listener.notify(new StatusChangeEvent(prev, next));
                } catch (Exception e) {
                    logger.warn("failed to notify listener: {}", listener.getId(), e);
                }
            }
        }
    }

還會發(fā)出一個StatusChangeEvent,通知listener來向eureka服務端做同步。

register至此分析完成。

再回到EurekaAutoServiceRegistration(SmartLifecycle)的start方法。

@Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {
            if (this.registration.getNonSecurePort() == 0) {
                this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                this.registration.setSecurePort(this.port.get());
            }
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }

register完成后,context發(fā)出了一個InstanceRegisteredEvent事件,這個事件不是eureka的事件,而是一個ApplicationEvent。目的是提供一個擴展點,來通知eureka實例注冊完成。用戶可以自定義listener來處理。

至此,整個eureka客戶端的啟動流程,注冊原理分析完成。

3. 下線分析

在SmartLifecycle的定義中,當實例停止前,會調用stop方法,eureka也因此觸發(fā)下線流程。

    @Override
    public void stop() {
        this.serviceRegistry.deregister(this.registration);
        this.running.set(false);
    }
@Override
    public void deregister(EurekaRegistration reg) {
        if (reg.getApplicationInfoManager().getInfo() != null) {

            if (log.isInfoEnabled()) {
                log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                        + " with eureka with status DOWN");
            }

            reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

            // shutdown of eureka client should happen with EurekaRegistration.close()
            // auto registration will create a bean which will be properly disposed
            // manual registrations will need to call close()
        }
    }

可以看到下線的處理流程非常簡單,就是設置了ApplicationInfoManager中的instanceStatus為DOWN的狀態(tài),設置down的同時,會觸發(fā)StatusChangeEvent事件的發(fā)送,由statusChangeListener來向eureka服務端發(fā)送新的下線通知。

總結

  1. eureka的客戶端通過SmartLifecycle作為入口點,開啟整個eureka的bean注入;
  2. 所有的核心代碼都在CloudEurekaClient的父類構造方法中完成。
    a. 定義refreshCacheTask來定時拉取注冊表
    b. 定義heartbeatTask來定時發(fā)送續(xù)約請求
    c. 定義instanceInfoReplicator來向eureka服務端發(fā)送實例信息
    d. 定義statusChangeListener 來接受實例變化事件,發(fā)送register請求。
  3. 每當實例信息發(fā)生變更時,就發(fā)送StatusChangeEvent,判定變更的條件包括InstanceInfo實例屬性變化、InstanceConfig配置發(fā)生變更。
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容