前言
dubbo的Configuration負責內(nèi)部所有配置信息的匯總、管理,考慮到各種可能場景,dubbo提供基于Configuration接口的靜態(tài)配置和動態(tài)配置功能。通常情況下,靜態(tài)配置用于服務、服務提供方、服務消費方等在dubbo服務暴露、引用階段的初始化工作,初始化完成之后執(zhí)行服務的暴露、注冊、引用;動態(tài)配置則主要用于動態(tài)更新dubbo對外暴露的可覆蓋配置以及用戶自定義配置,無需重啟服務,通常借助三方配置工具或配置中間件實現(xiàn)(比如Apollo、Zookeeper等,2.7版本暫不支持Nacos)。本文會分別從靜態(tài)配置、動態(tài)配置兩種場景進行分析;其中靜態(tài)配置加載的觸發(fā)階段(包括與Spring生態(tài)的對接、服務暴露、服務引用)會在對應章節(jié)進行分析,本文僅解析靜態(tài)配置的設計與使用;動態(tài)配置的處理相對獨立,方便進行完整解析。
一、Configuration
頂級接口Configuration,定義dubbo中配置的基礎功能,可以分為兩類,基礎功能和擴展接口,其中擴展接口主要用于子類個性化實現(xiàn)?;A功能以getString()方法為例,內(nèi)部借助convert()實現(xiàn)
default String getString(String key, String defaultValue) {
return convert(String.class, key, defaultValue);
}
default <T> T convert(Class<T> cls, String key, T defaultValue) {
// 這里僅處理字符串類型屬性
String value = (String) getProperty(key);
if (value == null) {
return defaultValue;
}
// 可以看出,dubbo僅支持幾種類型配置:字符串、數(shù)字、枚舉.
Object obj = value;
if (cls.isInstance(value)) {
return cls.cast(value);
}
if (String.class.equals(cls)) {
return cls.cast(value);
}
if (Boolean.class.equals(cls) || Boolean.TYPE.equals(cls)) {
obj = Boolean.valueOf(value);
} else if (Number.class.isAssignableFrom(cls) || cls.isPrimitive()) {
if (Integer.class.equals(cls) || Integer.TYPE.equals(cls)) {
obj = Integer.valueOf(value);
} else if (Long.class.equals(cls) || Long.TYPE.equals(cls)) {
obj = Long.valueOf(value);
} else if (Byte.class.equals(cls) || Byte.TYPE.equals(cls)) {
obj = Byte.valueOf(value);
} else if (Short.class.equals(cls) || Short.TYPE.equals(cls)) {
obj = Short.valueOf(value);
} else if (Float.class.equals(cls) || Float.TYPE.equals(cls)) {
obj = Float.valueOf(value);
} else if (Double.class.equals(cls) || Double.TYPE.equals(cls)) {
obj = Double.valueOf(value);
}
} else if (cls.isEnum()) {
obj = Enum.valueOf(cls.asSubclass(Enum.class), value);
}
return cls.cast(obj);
}
default Object getProperty(String key) {
return getProperty(key, null);
}
然后是擴展接口,以getProperty方法為例,最終借助擴展方法getInternalProperty實現(xiàn)
default Object getProperty(String key, Object defaultValue) {
//這里調用擴展接口
Object value = getInternalProperty(key);
return value != null ? value : defaultValue;
}
//模板方法,由子類實現(xiàn)
Object getInternalProperty(String key);
二、靜態(tài)配置(AbstractPrefixConfiguration)
解析靜態(tài)配置之前,先來了解一下java中的系統(tǒng)環(huán)境變量和系統(tǒng)屬性。系統(tǒng)環(huán)境變量我們比較熟悉,入職第一天必做的事情就是配置各種環(huán)境變量;系統(tǒng)屬性其實我們也經(jīng)常用,比如最常用的System.getProperty("xxx"),或者配置jvm參數(shù)"-DXX:NewRatio=4",最終都會被解析至系統(tǒng)屬性,另外,你會發(fā)現(xiàn),在你沒有手動配置系統(tǒng)環(huán)境變量與系統(tǒng)屬性之前,就可以直接使用了(jvm內(nèi)置的初始化屬性,可以直接在jdk注釋查看)。jdk對于enviroment的解釋:
# 環(huán)境變量
The environment is a system-dependent mapping from names to values which is passed from parent to child processes,If the system does not support environment variables, an empty map is returned。
環(huán)境變量,依賴系統(tǒng)的name-value鍵值對,可以由父進程傳遞給子進程;若系統(tǒng)不支持環(huán)境變量,那么會返回一個空的map。
下面的demo,可以幫你區(qū)分系統(tǒng)環(huán)境變量與系統(tǒng)屬內(nèi)容的不同
// 系統(tǒng)環(huán)境變量
Map<String,String> envProperties = System.getenv();
System.out.println("env.size = " + envProperties.size());
envProperties.forEach((k,v) ->{
System.out.println(k + " : " + v);
});
// 系統(tǒng)屬性 -D參數(shù)或者setProperties
Properties sysProperties = System.getProperties();
sysProperties.setProperty("哈哈哈","hahaha");
System.out.println("sys.size = " + sysProperties.entrySet().size());
sysProperties.forEach((k,v) ->{
System.out.println(k + " : " + v);
});
了解完系統(tǒng)環(huán)境變量與系統(tǒng)屬性,下面來看dubbo中靜態(tài)配置的設計。靜態(tài)環(huán)境配置并不是通過直接實現(xiàn)Configuration接口實現(xiàn),先由基類AbstractPrefixConfiguration實現(xiàn)Configuration接口,然后靜態(tài)配置全部繼承AbstractPrefixConfiguration。我們先來看AbstractPrefixConfiguration,比較容易理解,直譯就是抽象前綴配置,核心參數(shù)只有前綴prefix和id,直接來看代碼:
public AbstractPrefixConfiguration(String prefix, String id) {
super();
//前綴要么為空,要么必須以"."結尾
if (StringUtils.isNotEmpty(prefix) && !prefix.endsWith(".")) {
this.prefix = prefix + ".";
} else {
this.prefix = prefix;
}
this.id = id;
}
// 核心方法,內(nèi)部調用具體子類的getInternalProperty方法
@Override
public Object getProperty(String key, Object defaultValue) {
Object value = null;
if (StringUtils.isNotEmpty(prefix) && StringUtils.isNotEmpty(id)) {
value = getInternalProperty(prefix + id + "." + key);
}
if (value == null && StringUtils.isNotEmpty(prefix)) {
value = getInternalProperty(prefix + key);
}
if (value == null) {
value = getInternalProperty(key);
}
return value != null ? value : defaultValue;
}
基于AbstractPrefixConfiguration靜態(tài)配置基類,dubbo擴展出EnvironmentConfiguration(系統(tǒng)環(huán)境變量配置)、InmemoryConfiguration(內(nèi)存配置)、PropertiesConfiguration(系統(tǒng)屬性配置)、SystemConfiguration(系統(tǒng)配置),后兩個配置其實有重疊也有區(qū)別,具體到代碼分析階段會詳細說明。下面按照順序,或者說叫優(yōu)先級來依次解析
先來看EnvironmentConfiguration ,非常簡單,僅借助System.getenv()方法實現(xiàn)了getInternalProperty。
public class EnvironmentConfiguration extends AbstractPrefixConfiguration {
public EnvironmentConfiguration(String prefix, String id) {
super(prefix, id);
}
public EnvironmentConfiguration() {
this(null, null);
}
@Override
public Object getInternalProperty(String key) {
// 借助系統(tǒng)環(huán)境變量實現(xiàn)
return System.getenv(key);
}
}
InmemoryConfiguration,內(nèi)存緩存配置,內(nèi)部多了一個map,用于緩存相關配置,同時對外暴露一個addProperties、setProperties方法,核心方法getInternalProperty則是直接借助map緩存實現(xiàn),比較簡單
// 省略add、set方法
public class InmemoryConfiguration extends AbstractPrefixConfiguration {
private Map<String, String> store = new LinkedHashMap<>();
public InmemoryConfiguration(String prefix, String id) {
super(prefix, id);
}
public InmemoryConfiguration() {
this(null, null);
}
@Override
public Object getInternalProperty(String key) {
return store.get(key);
}
}
重點來看后兩種配置,先來看SystemConfiguration,比較簡單,內(nèi)部主要借助System.getProperty實現(xiàn)。
// SystemConfiguration直接借助System.getProperty方法實現(xiàn)
public class SystemConfiguration extends AbstractPrefixConfiguration {
public SystemConfiguration(String prefix, String id) {
super(prefix, id);
}
public SystemConfiguration() {
this(null, null);
}
@Override
public Object getInternalProperty(String key) {
return System.getProperty(key);
}
}
最后來看PropertiesConfiguration,邏輯上與SystemConfiguration有一些重復,getInternalProperty方法借助ConfigUtils.getProperty實現(xiàn)。大致可以分為三步:
- 先從系統(tǒng)屬性獲取配置,配置值非空則直接返回,否則執(zhí)行2;
- 加載制定配置文件,加載的優(yōu)先級順序: 系統(tǒng)屬性中配置的"dubbo.properties.file"文件 > 系統(tǒng)環(huán)境變量中配置的"dubbo.properties.file" > classPath下的dubbo.properties文件。
- 配置值中有占位符,則使用2中的properties進行替換,并返回。
//重點關注getInternalProperty
public class PropertiesConfiguration extends AbstractPrefixConfiguration {
private static final Logger logger = LoggerFactory.getLogger(PropertiesConfiguration.class);
public PropertiesConfiguration(String prefix, String id) {
super(prefix, id);
}
public PropertiesConfiguration() {
this(null, null);
}
@Override
public Object getInternalProperty(String key) {
//借助ConfigUtils實現(xiàn)
return ConfigUtils.getProperty(key);
}
}
//再來看ConfigUtils.getProperty方法
public static String getProperty(String key, String defaultValue) {
//優(yōu)先從系統(tǒng)屬性取,這里與SystemConfiguration功能重疊
String value = System.getProperty(key);
if (value != null && value.length() > 0) {
return value;
}
// 否則加載指定配置文件,這里需要注意配置優(yōu)先級
Properties properties = getProperties();
// 支持嵌套替換,舉個例子:key = "abc",properties中有["abc","1${a.b.c}2${a.b.c}3"],["a.b.c","ABC"],最終 // 結果是 "1ABC2ABC3"
return replaceProperty(properties.getProperty(key, defaultValue), (Map) properties);
}
// 屬性文件加載順序,系統(tǒng)配置(-D參數(shù),-Ddubbo.properties.file) -> 系統(tǒng)環(huán)境變量 -> dubbo.properties保底
public static Properties getProperties() {
//DCL保證屬性文件一致性。
if (PROPERTIES == null) {
synchronized (ConfigUtils.class) {
if (PROPERTIES == null) {
// 優(yōu)先讀取系統(tǒng)屬性 "dubbo.properties.file" 指定的文件,
// 如: -Ddubbo.properties.file=dubbo.properties
String path = System.getProperty(Constants.DUBBO_PROPERTIES_KEY);
if (path == null || path.length() == 0) {
// 系統(tǒng)屬性未配置,則從環(huán)境變量中讀取屬性 "dubbo.properties.file"
path = System.getenv(Constants.DUBBO_PROPERTIES_KEY);
if (path == null || path.length() == 0) {
// 系統(tǒng)環(huán)境變量未配置,則默認讀取 dubbo.properties配置文件
path = Constants.DEFAULT_DUBBO_PROPERTIES;
}
}
PROPERTIES = ConfigUtils.loadProperties(path, false, true);
}
}
}
return PROPERTIES;
}
// 屬性替換例如:expression= "1${a.b.c}2${a.b.c}3" ,params = ["a.b.c","ABC"]
// 返回,替換后的字符串:"1ABC2ABC3"
public static String replaceProperty(String expression, Map<String, String> params) {
// 僅處理帶占位符的屬性"${xxx}"
if (expression == null || expression.length() == 0 || expression.indexOf('$') < 0) {
return expression;
}
// 根據(jù)正則進行匹配
Matcher matcher = VARIABLE_PATTERN.matcher(expression);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
String key = matcher.group(1);
String value = System.getProperty(key);
if (value == null && params != null) {
value = params.get(key);
}
if (value == null) {
value = "";
}
// 字符串拼接
matcher.appendReplacement(sb, Matcher.quoteReplacement(value));
}
matcher.appendTail(sb);
// 最終結果
return sb.toString();
}
以上就是靜態(tài)配置的全部內(nèi)容,相對于動態(tài)配置來說還是比較簡單的。除了靜態(tài)配置、動態(tài)配置之外,dubbo還提供了動態(tài)配置、靜態(tài)配置的復合配置CompositeConfiguration,在復合配置中內(nèi)置Configuration列表,沒有靜態(tài)、動態(tài)配置之分。核心方法getInternalProperty,邏輯也比較簡單,只要匹配Configuration列表中第一個包含查找屬性的配置即可,下面來看代碼
// 復合配置,不區(qū)分動態(tài)、靜態(tài)配置
public class CompositeConfiguration implements Configuration {
private Logger logger = LoggerFactory.getLogger(CompositeConfiguration.class);
private List<Configuration> configList = new LinkedList<Configuration>();
public CompositeConfiguration() {
}
// 帶參數(shù)的構造方法
public CompositeConfiguration(Configuration... configurations) {
if (configurations != null && configurations.length > 0) {
Arrays.stream(configurations).filter(config -> !configList.contains(config)).forEach(configList::add);
}
}
// 添加配置
public void addConfiguration(Configuration configuration) {
if (configList.contains(configuration)) {
return;
}
this.configList.add(configuration);
}
public void addConfigurationFirst(Configuration configuration) {
this.addConfiguration(0, configuration);
}
// 指定位置插入配置
public void addConfiguration(int pos, Configuration configuration) {
this.configList.add(pos, configuration);
}
@Override
public Object getInternalProperty(String key) {
Configuration firstMatchingConfiguration = null;
for (Configuration config : configList) {
try {
//若有配置包含當前要查找的key信息,直接返回
if (config.containsKey(key)) {
firstMatchingConfiguration = config;
break;
}
} catch (Exception e) {
logger.error("Error when trying to get value for key " + key + " from " + config + ", will continue to try the next one.");
}
}
if (firstMatchingConfiguration != null) {
// 第一個匹配的配置,讀取配置值
return firstMatchingConfiguration.getProperty(key);
} else {
return null;
}
}
@Override
public boolean containsKey(String key) {
return configList.stream().anyMatch(c -> c.containsKey(key));
}
}
好了,以上就是所有靜態(tài)配置實現(xiàn)。下面我們接著來看動態(tài)配置。
三、動態(tài)配置(DynamicConfiguration)
DynamicConfiguration接口繼承頂級接口Configuration,用于動態(tài)同步三方配置。提到動態(tài)更新,就不得不提對應的Listener,即ConfigurationLisenter。很容易想到,動態(tài)配置借助ConfigurationListener監(jiān)聽配置變更,然后同步更新本地配置。在介紹Directory一文中,我們提到過ConfigurationListener,本文我們會對ConfigurationListener重點做介紹。
1、配置變更監(jiān)聽器(ConfigurationListener)
先來看ConfigurationListener接口設計,同樣是頂級接口,內(nèi)部只有一個process方法,用于處理配置變更事件?;贑onfigurationListener有兩個主要擴展,分別是AbstractConfiguratorListener和ListenableRouter,其中ListenableRouter在Router一文已經(jīng)做了介紹,本文不再冗述,僅關注AbstractConfiguratorListener。在AbstractConfigratorListener中,內(nèi)置Configurator配置器列表,重點關注構造方法以及核心process()的邏輯,同時還額外定義了notifyOverrides()供子類實現(xiàn)(子類均在RegistryDirectory、RegistryProtocol)。
protected final void initWith(String key) {
//動態(tài)配置
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 如果是nopDynamicConfiguration,那么這里什么都不會做
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
//直接根據(jù)字符串配置,賦值configurators.
process(new ConfigChangeEvent(key, rawConfig));
}
}
// 順便來看下DynamicConfiguration.getDynamicConfiguration方法的邏輯
static DynamicConfiguration getDynamicConfiguration() {
// 先從環(huán)境變量中取動態(tài)配置
Optional<Configuration> optional = Environment.getInstance().getDynamicConfiguration();
// 動態(tài)配置為空,則通過DynamicConfigurationFactory的SPI,獲取默認擴展實現(xiàn)。默認是NopDynamicConfiguration,即什 // 么都不做。
return (DynamicConfiguration) optional.orElseGet(() -> getExtensionLoader(DynamicConfigurationFactory.class)
.getDefaultExtension()
.getDynamicConfiguration(null));
}
//process邏輯比較核心
@Override
public void process(ConfigChangeEvent event) {
if (logger.isInfoEnabled()) {
logger.info("Notification of overriding rule, change type is: " + event.getChangeType() +
", raw config content is:\n " + event.getValue());
}
//配置刪除事件,則直接清空configurators
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
configurators.clear();
} else {
try {
// parseConfigurators will recognize app/service config automatically.
// 借助配置解析器,解析配置內(nèi)容至URL,最終再將URL轉換為Configurator,并存放于Configurator列表
configurators=Configurator.toConfigurators(ConfigParser.parseConfigurators(event.getValue()))
.orElse(configurators);
} catch (Exception e) {
logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " +event.getValue(), e);
return;
}
}
// 通知子類,同步變更配置,其實這里是直接刷新RegistryDirectory內(nèi)的Invoker。
notifyOverrides();
}
上面的代碼可以看到,配置的解析分為配置解析和配置緩存兩步。配置解析主要是將配置變更解析為URL列表,配置緩存則是把URL轉為Congurator配置列表緩存。先來看配置解析
// 解析入口
public static List<URL> parseConfigurators(String rawConfig) {
List<URL> urls = new ArrayList<>();
// 借助Yaml工具,將配置信息解析至ConfiguratorConfig,ConfiguratorConfig的數(shù)據(jù)結構這里就不再列出來了。
ConfiguratorConfig configuratorConfig = parseObject(rawConfig);
String scope = configuratorConfig.getScope();
// 具體配置項解析為URL,氛圍應用級配置和服務級配置,默認為服務級配置。
List<ConfigItem> items = configuratorConfig.getConfigs();
if (ConfiguratorConfig.SCOPE_APPLICATION.equals(scope)) {
//應用級配置
items.forEach(item -> urls.addAll(appItemToUrls(item, configuratorConfig)));
} else {
// service scope by default.
// 服務級配置
items.forEach(item -> urls.addAll(serviceItemToUrls(item, configuratorConfig)));
}
return urls;
}
// 應用級配置與服務級配置大同小異,直接來看服務級配置
private static List<URL> serviceItemToUrls(ConfigItem item, ConfiguratorConfig config) {
List<URL> urls = new ArrayList<>();
// 獲取配置地址,并根據(jù)配置地址解析為URL,這里有個細節(jié),如果地址列表size=0時會默認返回一個anyhost。
List<String> addresses = parseAddresses(item);
addresses.forEach(addr -> {
StringBuilder urlBuilder = new StringBuilder();
// URL中協(xié)議只能是override
urlBuilder.append("override://").append(addr).append("/");
// serviceKey轉URL信息,舉個例子 serviceKey = test/com.edu.nbu.scm.KeyService:1.0,
// appendService結果:com.edu.nbu.scm.KeyService?group=test&version=1.0
urlBuilder.append(appendService(config.getKey()));
// 參數(shù)拼接,比較簡單,這里會拼接category等參數(shù)信息
urlBuilder.append(toParameterString(item));
// 拼接enabled
parseEnabled(item, config, urlBuilder);
urlBuilder.append("&category=").append(Constants.DYNAMIC_CONFIGURATORS_CATEGORY);
urlBuilder.append("&configVersion=").append(config.getConfigVersion());
// 這里是與appItemToUrls的不同點,這里會根據(jù)applications值來決定是否需要拼接application
// 最終借助URL.valueOf方法,生成URL并放入列表。
List<String> apps = item.getApplications();
if (apps != null && apps.size() > 0) {
apps.forEach(app -> {
urls.add(URL.valueOf(urlBuilder.append("&application=").append(app).toString()));
});
} else {
urls.add(URL.valueOf(urlBuilder.toString()));
}
});
return urls;
}
配置的解析結果是URL列表,完成解析后,需要將URL轉為Configurator。思考為什么不是配置直接解析為Configurator,個人理解,dubbo中URL作為數(shù)據(jù)總線,整個RPC過程都會與URL進行交互,配置解析為URL更能保證數(shù)據(jù)的實時性,Congurator則不然,作用域僅限于配置變更階段,無法保證全局實時。下面來看URL->Configurator的邏輯
static Optional<List<Configurator>> toConfigurators(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return Optional.empty();
}
ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getAdaptiveExtension();
List<Configurator> configurators = new ArrayList<>(urls.size());
//empty協(xié)議,則直接清空configurator列表
for (URL url : urls) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
configurators.clear();
break;
}
Map<String, String> override = new HashMap<>(url.getParameters());
//The anyhost parameter of override may be added automatically, it can't change the judgement of changing url
override.remove(Constants.ANYHOST_KEY);
if (override.size() == 0) {
configurators.clear();
continue;
}
// 借助configuratorFactory,將URL轉為Configurator
configurators.add(configuratorFactory.getConfigurator(url));
}
// 排序規(guī)則需要注意,排序規(guī)則為: 地址ip -> 優(yōu)先級值
Collections.sort(configurators);
return Optional.of(configurators);
}
配置解析最終結果是,所有變更信息全部緩存于configruator列表,供子類使用。配置變更監(jiān)聽器的基類實現(xiàn)AbstractConfiguratorListener已經(jīng)全部介紹完畢了,下面來看他的子類實現(xiàn),上面也提到過,子類實現(xiàn)分別在RegidtryDirectory和RegistryProcotol中,主要包括:ConsumerConfigurationListener(消費者配置變更監(jiān)聽器)、ReferenceConfigurationListener(引用配置變更監(jiān)聽器)、ProviderConfigurationListener(服務提供者配置變更監(jiān)聽器)、ServiceConfigurationListener(服務配置變更監(jiān)聽器),下面依次進行解析
// 消費者配置變更監(jiān)聽器的邏輯比較簡單,直接刷新RegistryDirectory內(nèi)的Invoker即可,刷新邏輯參考Directory一文
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
// 內(nèi)置RegistryDirectory列表。
List<RegistryDirectory> listeners = new ArrayList<>();
ConsumerConfigurationListener() {
// 僅根據(jù)application名進行初始化
this.initWith(ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX);
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}
@Override
protected void notifyOverrides() {
// 最終調用 RegistryDirectory的refreshInvoer,配置變更會同步刷新directory中的invoker
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}
//引用配置變更監(jiān)聽器
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
// 需要借助url信息初始化
this.initWith(url.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX);
}
@Override
protected void notifyOverrides() {
// 最終調用 RegistryDirectory的refreshInvoer,配置變更會同步刷新directory中的invoker
directory.refreshInvoker(Collections.emptyList());
}
}
// 服務提供者配置變更監(jiān)聽器
private class ProviderConfigurationListener extends AbstractConfiguratorListener {
public ProviderConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
// 最終借助NotifyListener實現(xiàn)
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}
// 服務配置變更監(jiān)聽器
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
private URL providerUrl;
private OverrideListener notifyListener;
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
this.initWith(providerUrl.getEncodedServiceKey() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
// 同樣,最終借助notifyListener實現(xiàn)
notifyListener.doOverrideIfNecessary();
}
}
四個子類的實現(xiàn)也比較簡單,前兩個位于RegistroyDirectory,核心邏輯都是借助RegistroyDirectory的refreshInvoker方法法實現(xiàn);后兩個則位于RegistryProtocol,核心邏輯都是借助OverrideListener(NotifyListener的子類)的doOverrideIfNecessary方法實現(xiàn)。
2、動態(tài)配置(DynamicConfiguration)
了解完配置變更監(jiān)聽器的邏輯,再來看動態(tài)配置,就比較容易了。DynamicConfiguration接口擴展了Configuration,新增配置變更監(jiān)聽器管理以及配置獲取功能。dubbo內(nèi)部對DynamicConfiguration的實現(xiàn)主要有ApolloDynamicConfiguration(Apollo作為三方配置中心)、MockDynamicConfiguration(降級配置)、NopDynamicConfiguration(終極降級配置)、ZookeeperDynamicConfiguration(Zookeeper作為三方配置中心)。初次之外,DynamicConfiguration的所有實現(xiàn)均由DynamicConfigurationFactory創(chuàng)建,DynamicConfigurationFactory接口支持SPI擴展,默認實現(xiàn)是NopDynamicConfigurationFactory,也就是說默認的DynamicConfiguration擴展是NopDynamicConfiguration。DynamicConfigurationFactory比較簡單,就不做過多介紹了,下面直接來看DynamicConfiguration的四種實現(xiàn)。
ApolloDynamicConfiguration中,需要關注構造方法和配置變更監(jiān)聽器的管理以及Configuration接口方法的實現(xiàn)。構造方法中,初始化了大量與Apollo相關的附加屬性,比如env、apollo.meta、apollo.cluster等;配置變更監(jiān)聽器的管理和Configuration接口方法則是借助Apollo的Config以及ConfigChangeListener實現(xiàn)。下面直接來看相關代碼
// 構造方法,初始化附加屬性以及Apollo的config
ApolloDynamicConfiguration(URL url) {
this.url = url;
// Instead of using Dubbo's configuration, I would suggest use the original configuration method Apollo provides.
String configEnv = url.getParameter(APOLLO_ENV_KEY);
String configAddr = getAddressWithProtocolPrefix(url);
String configCluster = url.getParameter(Constants.CONFIG_CLUSTER_KEY);
if (configEnv != null) {
System.setProperty(APOLLO_ENV_KEY, configEnv);
}
if (StringUtils.isEmpty(System.getProperty(APOLLO_ENV_KEY)) && !Constants.ANYHOST_VALUE.equals(configAddr)) {
System.setProperty(APOLLO_ADDR_KEY, configAddr);
}
if (configCluster != null) {
System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
}
// 初始化Apollo的Config
dubboConfig = ConfigService.getConfig(url.getParameter(Constants.CONFIG_NAMESPACE_KEY, DEFAULT_GROUP));
// Decide to fail or to continue when failed to connect to remote server.
boolean check = url.getParameter(Constants.CONFIG_CHECK_KEY, true);
if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) {
//如果需要檢查,則直接拋異常
if (check) {
throw new IllegalStateException("Failed to connect to config center, the config center is Apollo, " +"the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv));
} else {
logger.warn("Failed to connect to config center, the config center is Apollo, " +
"the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) +
", will use the local cache value instead before eventually the connection is established.");
}
}
}
對于配置變更監(jiān)聽器的管理,這里以添加監(jiān)聽器為例來說明,其他操作大同小異
@Override
public void addListener(String key, String group, ConfigurationListener listener) {
// listeners->ApolloListener的緩存,
ApolloListener apolloListener = listeners.computeIfAbsent(group + key, k -> createTargetListener(key, group));
// 借助apolloListener添加dubbo的ConfigurationListener
apolloListener.addListener(listener);
// apolloListener添加至Apollo的config
dubboConfig.addChangeListener(apolloListener, Collections.singleton(key));
}
// 關注一下ApolloListener的實現(xiàn)
public class ApolloListener implements ConfigChangeListener {
// listeners,dubbo的ConfigurationListener列表,管理dubbo的ConfigurationListener
private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();
ApolloListener() {}
@Override
// 配置變更事件處理
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
if ("".equals(change.getNewValue())) {
logger.warn("an empty rule is received for " + key + ", the current working rule is " +
change.getOldValue() + ", the empty rule will not take effect.");
return;
}
// apllo配置變更事件的處理
ConfigChangeEvent event = new ConfigChangeEvent(key, change.getNewValue(),getChangeType(change));
listeners.forEach(listener -> listener.process(event));
}
}
// 輔助方法,配置變更事件轉換
private ConfigChangeType getChangeType(ConfigChange change) {
if (change.getChangeType() == PropertyChangeType.DELETED) {
return ConfigChangeType.DELETED;
}
return ConfigChangeType.MODIFIED;
}
// 添加配置變更監(jiān)聽器
void addListener(ConfigurationListener configurationListener) {
this.listeners.add(configurationListener);
}
// 刪除配置變更監(jiān)聽器
void removeListener(ConfigurationListener configurationListener) {
this.listeners.remove(configurationListener);
}
boolean hasInternalListener() {
return listeners != null && listeners.size() > 0;
}
}
最后是Configuration接口方法的實現(xiàn),比較簡單
@Override
public String getInternalProperty(String key) {
return dubboConfig.getProperty(key, null);
}
ZookeeperDynamicConfiguration則是利用Zookeeper實現(xiàn)配置動態(tài)變更。dubbo內(nèi)部對Zookeeper的使用,全部借助Curator框架(有興趣的同學可以研究一下,這里不做展開)。同樣的,需要重點關注構造方法、配置變更監(jiān)聽器的管理以及Configuration接口方法,這里主要借助Curator的TreeCache(用作配置數(shù)據(jù)的緩存以及配置變更事件的響應)和TreeCacheListener實現(xiàn)。下面直接來看代碼
ZookeeperDynamicConfiguration(URL url) {
// 基礎參數(shù)初始化
this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
// 線程池參數(shù),用于newFixedThreadPool
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
String connectString = url.getBackupAddress();
client = newClient(connectString, sessionTimeout, connectTimeout, policy);
client.start();
try {
// 連接檢測,默認等待時間30s
boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
if (!connected) {
if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
throw new IllegalStateException("Failed to connect to config center (zookeeper): "
+ connectString + " in " + 3 * connectTimeout + "ms.");
} else {
logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
}
}
} catch (InterruptedException e) {
throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "+ connectString + " config center, ", e);
}
// 閉鎖,防止數(shù)據(jù)不一致
initializedLatch = new CountDownLatch(1);
// 緩存變更監(jiān)聽器
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
// build local cache,構建本地緩存
try {
this.buildCache();
} catch (Exception e) {
logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString);
}
}
private void buildCache() throws Exception {
// 初始化treeCode
this.treeCache = new TreeCache(client, rootPath);
// create the watcher for future configuration updates
treeCache.getListenable().addListener(cacheListener, executor);
// it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
treeCache.start();
//開啟閉鎖
initializedLatch.await();
}
構造方法中使用了CacheListener,我們直接來看CacheListener,而且CacheListener也是實現(xiàn)對配置變更監(jiān)聽器管理的核心類。CacheListener基于Curator的TreeCacheListener擴展,對dubbo的ConfigurationListener進行管理(添加、刪除)。CacheListenter的其他邏輯不用關注,直接來看對配置變更事件的處理
@Override
public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
TreeCacheEvent.Type type = event.getType();
ChildData data = event.getData();
//初始化事件,則關閉比閉鎖
if (type == TreeCacheEvent.Type.INITIALIZED) {
initializedLatch.countDown();
return;
}
// TODO, ignore other event types,代碼省略
// TODO We limit the notification of config changes to a specific path level, for example
// /dubbo/config/service/configurators, other config changes not in this level will not get notified,
// say /dubbo/config/dubbo.properties
if (data.getPath().split("/").length >= 5) {
byte[] value = data.getData();
String key = pathToKey(data.getPath());
ConfigChangeType changeType;
switch (type) {
case NODE_ADDED:
changeType = ConfigChangeType.ADDED;
break;
case NODE_REMOVED:
changeType = ConfigChangeType.DELETED;
break;
case NODE_UPDATED:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
Set<ConfigurationListener> listeners = keyListeners.get(key);
// 真實處理邏輯,調用dubbo的ConfigurationListener處理邏輯
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
@Override
// dubbo的Configuration接口方法實現(xiàn)
public Object getInternalProperty(String key) {
//直接借助treeCache實現(xiàn)。
ChildData childData = treeCache.getCurrentData(key);
if (childData != null) {
return new String(childData.getData(), StandardCharsets.UTF_8);
}
return null;
}
MockDynamicConfiguration和NopDynamicConfiguration主要用于動態(tài)配置的降級和保底,內(nèi)部沒有實際邏輯,不再做解析。
關于dubbo的Configuration就介紹到這里了,做個小結。dubbo內(nèi)部通過靜態(tài)配置與動態(tài)配置相結合,對外統(tǒng)一表現(xiàn)為CompositeConfiguration,在dubbo服務暴露、引用過程中提供屬性初始化;然后,服務運行、消費期間,監(jiān)聽配置變更,并同步至URL、Invoker。
注:dubbo服務版本2.7.1,歡迎指正。