一. 前言
eureka的client端主要完成幾件事情:
- 服務實例的注冊
- 服務實例的續(xù)約
- 拉取server端的注冊表
整個源碼有幾個重點類值得關注:
| 類名 | 說明 |
|---|---|
| EurekaClientConfigBean | 配置參數的bean,保存eureka.client.xxx的所有參數 |
| EurekaInstanceConfigBean | 配置參數的bean,保存eureka.instance.xxx的所有參數 |
| InstanceInfo | 服務實例信息的封裝類 |
| ApplicationInfoManager | 封裝注冊到eureka的應用信息,包括實例信息,參數配置,實例狀態(tài)等,并管理實例的各種操作 |
| CloudEurekaClient | 發(fā)起操作的客戶端類,如register、fetch、heartbeat |
二. 源碼分析
1. 啟動配置bean注入
EurekaClientAutoConfiguration
EurekaClientConfigBean
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
EurekaClientConfigBean 對應的配置前綴是eureka.client,從配置文件讀取的配置會保存到此bean中
常用配置:
| 參數 | 說明 |
|---|---|
| defaultZone | 默認zone的地址 |
| transport | 各種通信配置 |
| registryFetchIntervalSeconds | 每次從eureka服務端拉取服務列表的時間間隔(默認30s) |
| instanceInfoReplicationIntervalSeconds | 復制實例變化信息到eureka服務端的時間間隔(默認30s) |
| initialInstanceInfoReplicationIntervalSeconds | 初始時復制實例信息到eureka服務端所需的時間(默認40s) |
| eurekaServiceUrlPollIntervalSeconds | 拉取eureka服務端地址更改的時間間隔(默認300s) |
| registerWithEureka | 本實例是否注冊到eureka服務端 |
EurekaInstanceConfigBean
@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
ManagementMetadataProvider managementMetadataProvider) {
String hostname = getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = getProperty("eureka.instance.ip-address");
boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
String serverContextPath = env.getProperty("server.servlet.context-path", "/");
int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));
Integer managementPort = env.getProperty("management.server.port", Integer.class);
String managementContextPath = env.getProperty("management.server.servlet.context-path");
Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(serverPort);
instance.setInstanceId(getDefaultInstanceId(env));
instance.setPreferIpAddress(preferIpAddress);
instance.setSecurePortEnabled(isSecurePortEnabled);
if (StringUtils.hasText(ipAddress)) {
instance.setIpAddress(ipAddress);
}
if (isSecurePortEnabled) {
instance.setSecurePort(serverPort);
}
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
managementContextPath, managementPort);
if (metadata != null) {
instance.setStatusPageUrl(metadata.getStatusPageUrl());
instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
if (instance.isSecurePortEnabled()) {
instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
}
Map<String, String> metadataMap = instance.getMetadataMap();
metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
}
else {
// without the metadata the status and health check URLs will not be set
// and the status page and health check url paths will not include the
// context path so set them here
if (StringUtils.hasText(managementContextPath)) {
instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
}
}
setupJmxPort(instance, jmxPort);
return instance;
}
EurekaInstanceConfigBean 對應的配置前綴是eureka.instance,從配置文件讀取的實例信息配置會保存到此bean中
常用配置:
| 參數 | 說明 |
|---|---|
| instanceId | 注冊到eureka服務端實例id,默認是ip:服務名:端口 |
| hostname | 注冊的主機名, 默認使用主機名注冊 |
| preferIpAddress | 是否開啟ip注冊 |
| ipAddress | 注冊的ip地址 |
| leaseRenewalIntervalInSeconds | 向eureka服務端續(xù)約的時間間隔(默認30s) |
| leaseExpirationDurationInSeconds | 向eureka服務端設置續(xù)約到期的時間間隔(默認90s),超過此間隔服務端會下掉該服務 |
| initialStatus | 注冊到eureka服務端的初始化狀態(tài),默認UP |
EurekaServiceRegistry
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
EurekaServiceRegistry 實現(xiàn)了ServiceRegistry,是實例注冊的具體實現(xiàn)類,內部通過register完成服務注冊事件的發(fā)送動作。
EurekaRegistration
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
@Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
.with(healthCheckHandler).build();
}
EurekaRegistration 是真正被注冊的對象,內部封裝了客戶端所有的注冊信息,是org.springframework.cloud.client.serviceregistry.ServiceInstance此標準的具體實現(xiàn)類
ApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka
// server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {
namespace = namespace + ".";
}
builder.setNamespace(namespace).setAppName(config.getAppname()).setInstanceId(config.getInstanceId())
.setAppGroupName(config.getAppGroupName()).setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
.setPort(config.getNonSecurePort())
.enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName()).setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(),
config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + initialStatus);
}
builder.setStatus(initialStatus);
}
else {
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP
+ ". This may be too early for the instance to advertise itself as available. "
+ "You would instead want to control this via a healthcheck handler.");
}
}
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
首先創(chuàng)建了初始化狀態(tài)的InstanceInfo,根據配置對InstanceInfo的大量屬性賦值。
構建ApplicationInfoManager內部封裝了注冊到eureka的應用信息,包括實例信息,參數配置,實例狀態(tài)等。
InstanceInfo對象也會被設置到到ApplicationInfoManager中。
CloudEurekaClient
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a problem
// when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
這個bean非常重要,在內部完成了幾乎所有客戶端操作的定義,是發(fā)起操作的客戶端類。重點在構造方法中實現(xiàn),下文會具體分析。
2. SmartLifecycle的啟動點
eureka客戶端的注冊動作,通過SmartLifecycle來觸發(fā),對應的實現(xiàn)類是EurekaAutoServiceRegistration
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);
private AtomicBoolean running = new AtomicBoolean(false);
private int order = 0;
private AtomicInteger port = new AtomicInteger(0);
private ApplicationContext context;
private EurekaServiceRegistry serviceRegistry;
private EurekaRegistration registration;
public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
EurekaRegistration registration) {
this.context = context;
this.serviceRegistry = serviceRegistry;
this.registration = registration;
}
@Override
public void start() {
...
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
// getNonSecurePort() 就是項目的啟動端口
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
直接看start方法,根據初始化參數,直接進入if方法。有兩個關鍵方法,分別是register()開始注冊, publishEvent 發(fā)布當前實例注冊的事件。
EurekaServiceRegistry
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler()
.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
// 這個方法的實際作用是讓ApplicationInfoManager和CloudEurekaClient被使用,來觸發(fā)bean注入,這兩個bean都是lazy的
private void maybeInitializeClient(EurekaRegistration reg) {
// force initialization of possibly scoped proxies
reg.getApplicationInfoManager().getInfo();
reg.getEurekaClient().getApplications();
}
maybeInitializeClient 觸發(fā)了后續(xù)兩個重要的bean:ApplicationInfoManager和EurekaClient的實例化過程。
ApplicationInfoManager存放了InstanceInfo的信息和InstanceConfig的所有配置信息。
EurekaClient(CloudEurekaClient) 在構造過程中,完成了整個客戶端的注冊、向服務端進行數據同步,schedule任務的定義和開啟。
依次介紹這兩個類的創(chuàng)建過程。
new ApplicationInfoManager()
bean的定義在上面的AutoConfiguration中
@Inject
public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
this.config = config;
this.instanceInfo = instanceInfo;
this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
if (optionalArgs != null) {
this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
} else {
this.instanceStatusMapper = NO_OP_MAPPER;
}
// Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
instance = this;
}
這里首先通過工廠創(chuàng)建了一個InstanceInfo的對象,create方法將EurekaInstanceConfig里的所有配置通過builder模式賦值到InstanceInfo中。
并在內部初始化了兩個屬性,listeners和instanceStatusMapper。
new CloudEurekaClient()
bean的定義在上面的AutoConfiguration中
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
先調用父類的super構造方法
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
在構造方法的初始邏輯前,先初始化了BackupRegistry的對象,這個是當eureka所有的url都無法連接時,會按照backupRegistryImpl指定的自定義類來實現(xiàn),默認使用NotImplementedRegistryImpl,內部什么都不處理。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 是否用自定義參數來覆蓋
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
// ------------- 將構造方法的參數 給全局參數賦值 -----------------
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// ------------- 將構造方法的參數 給全局參數賦值 -----------------
// 如果開啟了拉取注冊表,就在ThresholdLevelsMetric中注冊JXM,提供metric的查詢
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 如果開啟了注冊到eurkea,就在ThresholdLevelsMetric中注冊JXM,提供metric的查詢
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
//如果既不需要拉取,又不需要注,則直接結束
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
return; // no need to setup up an network tasks and we are done
}
// 否則就完成整個client端初始化的動作。
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 線程池創(chuàng)建兩個線程,分別給心跳和刷新cache使用
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 用于執(zhí)行心跳任務的線程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 用于執(zhí)行刷新緩存任務的線程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 完成eurekaTransport的創(chuàng)建和內部屬性的初始化
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
// 初始化默認的az和region的對應關系
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
// 構建instanceRegionChecker 用于查詢InstanceInfo對應的region,判斷是否localRegion等
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry()) {
try {
// 重要方法,具體說明看fetchRegistry的分析
boolean primaryFetchRegistryResult = fetchRegistry(false);
// 如果拉取失敗,則會輸出如下異常
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
// 拉取失敗則調用backup的實現(xiàn)類
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
// 提供了一個擴展點,需要在發(fā)起注冊之前,進行一些操作。
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 在初始化階段強制注冊,這里shouldEnforceRegistrationAtInit()默認值是false,如果注冊失敗會拋出異常
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
// 關鍵方法,初始化各種schedule任務,包括心跳、拉取注冊表等,具體說明看initScheduledTasks的分析
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// Application 內部封裝了從eureka服務端返回的所有注冊表信息,第一次啟動時這里時空的,在拉取全量或增量時更新
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量拉取注冊表信息
getAndStoreFullRegistry();
} else {
// 增量拉取注冊表信息
getAndUpdateDelta(applications);
}
// 這里的hashCode用來判斷拉取跟上次相比是否發(fā)生過變更
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
這個fetchRegistry方法會在啟動時調用,以及schedule任務執(zhí)行時,每次刷新客戶端緩存時調用。主要做了幾件事情。
- 如果是啟動時,會全量拉取eureka的注冊信息,調用/eureka/apps
private void getAndStoreFullRegistry() throws Throwable {
...
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
...
這里的apps就是從eureka服務端拉取的全量注冊表信息,如下圖

- 如果是schedule任務會增量拉取,調用getAndUpdateDelta(),內部通過http請求/eureka/apps/delta,這里的delta就是從eureka服務端拉取的增量注冊表信息。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 獲取到增量列表后,依次遍歷每一個application,更新到本地cache中
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
}
...
- 調用onCacheRefreshed(),刷新客戶端緩存。子類CloudEurekaClient重寫了onCacheRefresh,由eventListeners來處理CacheRefreshedEvent,并發(fā)出HeartbeatEvent事件。
@Override
protected void onCacheRefreshed() {
super.onCacheRefreshed();
if (this.cacheRefreshedCount != null) { // might be called during construction and
// will be null
long newCount = this.cacheRefreshedCount.incrementAndGet();
log.trace("onCacheRefreshed called with count: " + newCount);
this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
}
}
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener : eventListeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
}
}
}
- 調用updateInstanceRemoteStatus將當前實例的狀態(tài)更新到eureka服務端。
private synchronized void updateInstanceRemoteStatus() {
// Determine this instance's status for this app and set to UNKNOWN if not found
InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
if (instanceInfo.getAppName() != null) {
Application app = getApplication(instanceInfo.getAppName());
if (app != null) {
InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
if (remoteInstanceInfo != null) {
// 本次拉取的服務狀態(tài)
currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
}
}
}
if (currentRemoteInstanceStatus == null) {
currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
}
// Notify if status changed
// 本次拉取的服務狀態(tài) 和 上次拉取的服務狀態(tài)不一致
if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
lastRemoteInstanceStatus = currentRemoteInstanceStatus;
}
}
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
fireEvent(new StatusChangeEvent(oldStatus, newStatus));
}
先將服務遠程的當前狀態(tài)跟上一次拉取的狀態(tài)進行比較,如果一致,則結束,不一致則發(fā)出一個StatusChangeEvent事件。
如果成功從eureka 服務端拉取注冊表成功,則fetchRegistry返回true。
initScheduledTasks()
// 初始化所有的schedule任務
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 定義cacheRefreshTask,schedule每隔registryFetchIntervalSeconds來刷新客戶端緩存
cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 定義heartbeatTask,schedule每隔renewalIntervalInSecs來發(fā)起心跳(續(xù)約)
heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 實例信息的復制器,內部包含schedule,同步到eureka服務端
instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
// 定義了狀態(tài)變更的listener,并注冊到applicationInfoManager中
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 開啟schedule任務
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
- 首先給cache和heartbeat創(chuàng)建了兩個TimedSupervisorTask,定義如下,scheduler會每隔一定時間來執(zhí)行task。
- 創(chuàng)建了instanceInfoReplicator,是實例信息變化的復制器,當發(fā)現(xiàn)出現(xiàn)變化時,內部通過register方法將新的信息同步到eureka服務端。
- 創(chuàng)建了statusChangeListener,當觸發(fā)狀態(tài)變更回調時,調用instanceInfoReplicator.onDemandUpdate(); 并將其添加到applicationInfoManager中。
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
...
}
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
這里還需要 重點說明下run的實現(xiàn):
scheduler.schedule在調度任務時,只傳了delay的時間,并沒有傳周期時間,而執(zhí)行周期其實是會動態(tài)變化的,這里的動態(tài)變化是通過scheduler.schedule的遞歸來完成。
可以看到在try塊中,delay的值會被重置,并且如果出現(xiàn)超時,則將newDelay可能設置成2倍的時間,再在finally中再次執(zhí)行schedule,延遲的時間就是2倍的delay,這是一種動態(tài)周期的schedule實現(xiàn)方案。
cacheRefreshTask
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
...
//remoteRegionsModified
// 前文判斷根據remoteRegions是否發(fā)生變化來決定是全量拉取還是增量拉取
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
// 這里的localRegionApps是拉取注冊表信息內容的本地緩存
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
...
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
這個task其實就是調用了fetchRegistry,拉取最新的注冊表,刷新本地cache。
heartbeatTask
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
... //這里判斷如果是404的請求,則再發(fā)起一次注冊請求,同步實例信息
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
這個task就是發(fā)起了一個http請求,調用了sendHearBeat方法,發(fā)起了一次心跳續(xù)約。
InstanceInfoReplicator
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
InstanceInfoReplicator 是實例信息的同步器,作用是當發(fā)生變更時(狀態(tài)、地址、ip、配置參數值等),將新的InstanceInfo同步給eureka服務端。
這里的三個方法:
start():在初始化時,調用start開啟schedule任務,每隔一段時間來調用run()檢測變更。
onDemandUpdate():在觸發(fā)狀態(tài)變更時,作為listener來調用,也會調用run()檢測變更。
run():檢測當前InstanceInfo是否跟上次相同,如果出現(xiàn)變更,則調用discoveryClient.register();將新的信息同步到eureka服務端。依據則是instanceInfo是否被標記為isDirty
而標記的依據在discoveryClient.refreshInstanceInfo();中實現(xiàn)。
/**
* Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
* isDirty flag on the instanceInfo is set to true
*/
void refreshInstanceInfo() {
// 判斷地址、ip、實例所在的部署節(jié)點是否變更
applicationInfoManager.refreshDataCenterInfoIfRequired();
// 判斷各種續(xù)約配置值(LeaseInfo) 是否變更
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
先根據兩個refresh方法來判斷是否變更,如果變更,內部會調用instanceInfo.setIsDirty();來進行標記。
public synchronized void setIsDirty() {
isInstanceInfoDirty = true;
lastDirtyTimestamp = System.currentTimeMillis();
}
至此,initScheduledTasks分析完成。內部分別啟動了三個schedule,分別用于將新的注冊表刷新本地cache、發(fā)送心跳續(xù)約、發(fā)送客戶端最新的實例信息。
再回到DiscoveryClient的構造方法,在initScheduledTasks執(zhí)行完成后,就是注冊JMX bean,設置一些全局屬性。
CloudEurekaClient 在構造完成后,會根據是否配置了healthCheckHandler(默認的EurekaHealthCheckHandler實現(xiàn)需要配置springboot actuator),如果配置了會將健康狀態(tài)同步到eureka 服務端一次。
@Override
public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
if (instanceInfo == null) {
logger.error("Cannot register a healthcheck handler when instance info is null!");
}
if (healthCheckHandler != null) {
this.healthCheckHandlerRef.set(healthCheckHandler);
// schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
if (instanceInfoReplicator != null) {
instanceInfoReplicator.onDemandUpdate();
}
}
}
至此,CloudEurekaClient的整個構建過程完成的所有內容也分析完成。
再回到EurekaServiceRegistry的register方法
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler()
.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
在完成整個過程的定義之后,先調用setInstanceStatus設置Instance的狀態(tài),啟動成功這里就是UP。
接著,如果配置了健康檢查,還會觸發(fā)一個發(fā)送健康檢查同步的事件。
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
還會發(fā)出一個StatusChangeEvent,通知listener來向eureka服務端做同步。
register至此分析完成。
再回到EurekaAutoServiceRegistration(SmartLifecycle)的start方法。
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
register完成后,context發(fā)出了一個InstanceRegisteredEvent事件,這個事件不是eureka的事件,而是一個ApplicationEvent。目的是提供一個擴展點,來通知eureka實例注冊完成。用戶可以自定義listener來處理。
至此,整個eureka客戶端的啟動流程,注冊原理分析完成。
3. 下線分析
在SmartLifecycle的定義中,當實例停止前,會調用stop方法,eureka也因此觸發(fā)下線流程。
@Override
public void stop() {
this.serviceRegistry.deregister(this.registration);
this.running.set(false);
}
@Override
public void deregister(EurekaRegistration reg) {
if (reg.getApplicationInfoManager().getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status DOWN");
}
reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
// shutdown of eureka client should happen with EurekaRegistration.close()
// auto registration will create a bean which will be properly disposed
// manual registrations will need to call close()
}
}
可以看到下線的處理流程非常簡單,就是設置了ApplicationInfoManager中的instanceStatus為DOWN的狀態(tài),設置down的同時,會觸發(fā)StatusChangeEvent事件的發(fā)送,由statusChangeListener來向eureka服務端發(fā)送新的下線通知。
總結
- eureka的客戶端通過
SmartLifecycle作為入口點,開啟整個eureka的bean注入; - 所有的核心代碼都在
CloudEurekaClient的父類構造方法中完成。
a. 定義refreshCacheTask來定時拉取注冊表
b. 定義heartbeatTask來定時發(fā)送續(xù)約請求
c. 定義instanceInfoReplicator來向eureka服務端發(fā)送實例信息
d. 定義statusChangeListener 來接受實例變化事件,發(fā)送register請求。 - 每當實例信息發(fā)生變更時,就發(fā)送StatusChangeEvent,判定變更的條件包括InstanceInfo實例屬性變化、InstanceConfig配置發(fā)生變更。