前言
Registry是dubbo對注冊中心的抽象,提供服務(wù)的注冊、注銷、查找、訂閱、取消訂閱等功能。本文按照dubbo中Registry的組織形式,分析Registry的核心邏輯。首先來看注冊中心的創(chuàng)建,RegistryFactory接口定義注冊中心的創(chuàng)建(工廠模式實現(xiàn)),支持SPI擴(kuò)展,默認(rèn)SPI實現(xiàn)是DubboRegistryFactory。注冊中心Registry接口繼承Node、RegistryService,抽象基類AbstractRegistry直接實現(xiàn)Registry,F(xiàn)ailbackRegistry繼承自基類AbstractRegistry,所有注冊中心實現(xiàn)均繼承自FailbackRegistry,這里可以看出,所有注冊中心均支持失敗重試(Failback)。
一、RegistryFactory(注冊中心工廠)
RegistryFactory的UML類圖如下:

先來看RegistryFactory接口定義,比較簡單:
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* 1、check=false時,連接無需check,否則連接斷開時直接拋異常
* 2、支持URL的用戶名、密碼權(quán)限校驗
* 3、支持注冊中心族備份地址:10.20.153.10
* 4、支持注冊中心本地緩存文件
* 5、支持超時設(shè)置
* 6、支持session 60s過期
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
1.1 AbstractRegistryFactory
接著是抽象基類AbstractRegistryFactory,實現(xiàn)RegistryFactory接口的getRegistry方法,同時定義模板方法createRegistry供子類實現(xiàn);AbstractRegistryFactory的getRegistry方法先從Map緩存查詢注冊中心,查不到則執(zhí)行模板方法createRegistry創(chuàng)建注冊中心,并放入緩存,然后返回該registry
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
//加鎖,保證單例
LOCK.lock();
try {
// 緩存查詢注冊中心
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 不存在則通過spi、ioc方式創(chuàng)建registry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 鎖釋放
LOCK.unlock();
}
}
1.2 其他實現(xiàn)
基類AbstractRegistryFactory的子類實現(xiàn)中,比較重要的是DubboRegistryFactory(介紹Protocol時已經(jīng)做了解析),其他實現(xiàn)比如RedisRegistryFactory、ZookeeperRegistryFactory、MulticastRegistryFactory的邏輯非常簡單,直接返回對應(yīng)的注冊中心實現(xiàn),代碼就省略了。
二、Registry(注冊中心)
來看dubbo中Registry實現(xiàn)。UML類圖如下:

方便理解起見,這里把注冊中實現(xiàn)分為三個層次,分別是Registry接口、FailbackRegistry實現(xiàn)、注冊中心實現(xiàn)。
2.1、Registry接口
上面UML類圖中可以看出,Registry繼承Node和RegistryService接口(Registry接口內(nèi)部無新增方法),重點關(guān)注RegistryService接口。RegistryService抽象了服務(wù)的注冊、注銷、訂閱、取消訂閱、查找等核心功能,來看接口定義:
public interface RegistryService {
/**
* 注冊數(shù)據(jù),比如提供者服務(wù)、消費者地址、路由規(guī)則、override規(guī)則以及其他數(shù)據(jù)
* 1、若URL中check=false,那么注冊失敗會進(jìn)行重試且不會拋出異常,否則直接拋異常
* 2、若URL中dynamic=false,那么URL中信息會被持久化存儲,否則注冊過程異常退出,URL信息應(yīng)當(dāng)被刪除
* 3、若URL中category=routers,那么意味著分類存儲,默認(rèn)catetory=providers,且會根據(jù)分類區(qū)域進(jìn)行通知更新
* 4、當(dāng)注冊中心重啟,網(wǎng)絡(luò)波動,數(shù)據(jù)不能被丟棄,包括刪除損壞的流水線中數(shù)據(jù)的刪除
* 5、參數(shù)不同的URL可以共存,不能相互覆蓋
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 注銷
* 1、若是dynamic=false的持久化存儲,如果找不到注冊數(shù)據(jù),那么會拋出非法狀態(tài)異常,否則會忽略
* 2、根據(jù)完整URL進(jìn)行匹配注銷
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 訂閱注冊數(shù)據(jù),并在注冊數(shù)據(jù)更新的時候自動推送
* 服務(wù)訂閱
* 1、若URL中check=false,那么當(dāng)注冊失敗時,會直接在后臺重試不會拋異常
* 2、若URL中category=routers,那么只會通知特定分類數(shù)據(jù);多個分類用逗號隔開;允許使用*全部匹配
* 3、允許接口、組、版本以及分類作為查詢條件
* 4、查詢條件允許使用*進(jìn)行匹配,意味著訂閱接口的所有版本
* 5、若注冊中心重啟、網(wǎng)絡(luò)波動,必須自動保存訂閱請求
* 6、參數(shù)不同的URL可以共存,不能相互覆蓋
* 7、訂閱過程必須是阻塞的
*/
void subscribe(URL url, NotifyListener listener);
/**
* 1、沒有訂閱,則直接忽略
* 2、根據(jù)URL完全匹配
**/
void unsubscribe(URL url, NotifyListener listener);
/**
* 根據(jù)條件匹配,查詢注冊數(shù)據(jù);對應(yīng)訂閱的推模式,提供拉模式,且僅返回一個結(jié)果
*/
List<URL> lookup(URL url);
從接口定義可以看出,RegsitryService對數(shù)據(jù)的注冊、注銷、訂閱、取消訂閱、查找等功能做了定義約束,所有對RegistryService的實現(xiàn)都必須滿足這個約束。
2.2、AbstractRegistry & FailbackRegistry
接下來看Registry接口的基類實現(xiàn)AbstractRegistry、FailbackRegistry。
2.2.1 AbstractRegistry
先來看AbstractRegistry,重點關(guān)注構(gòu)造方法,AbstractRegistry基類的構(gòu)造過程的核心邏輯分為三部分:1、創(chuàng)建注冊中心緩存文件;2、cache文件加載至properties;3、同步backUpUrl信息。我們先來看構(gòu)造方法的定義,然后再分步來看:
public AbstractRegistry(URL url) {
// 注冊中心URL負(fù)載
setUrl(url);
// Start file save timer
// 1、注冊中心文件同步保存開關(guān),默認(rèn)關(guān)閉,即默認(rèn)異步保存
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 默認(rèn)文件路徑: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
// 文件目錄創(chuàng)建失敗,直接拋異常
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
// 2、加載注冊中心cache文件到內(nèi)存,用于容錯
loadProperties();
// 3、backUpUrl數(shù)據(jù)同步,同步所有訂閱者、更新properties緩存。
notify(url.getBackupUrls());
}
2.2.1.1、創(chuàng)建注冊中心緩存
這一步比較簡單,首先根據(jù)URL信息,確定緩存文件保存方式(異步、同步);然后,拼接緩存文件名稱,根據(jù)文件名創(chuàng)建緩存文件,創(chuàng)建失敗則直接拋異常,否則初始化緩存文件file。
2.2.1.2、注冊中心cache加載
加載本地cache文件到內(nèi)存緩存properties,將上一步中的cache文件內(nèi)容,加載到properties。比較容易理解,啟動時先讀本地緩存,用于容錯。
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
// 省略try-catch
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
}
}
2.2.1.3、backupUrl數(shù)據(jù)同步
重點來看backupUrl的數(shù)據(jù)同步,backupUrl的生成方式前面我們已經(jīng)講過了,這一步主要是將生成的backupUrl列表同步給各訂閱者以及內(nèi)存緩存Properties,來看代碼:
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
// 根據(jù)已訂閱的URL,以及訂閱者listener進(jìn)行同步
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
//過濾掉不匹配的URL
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 核心邏輯,執(zhí)行URL信息同步
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
繼續(xù)來看notify方法:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
// 按照category分組
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//緩存分組后的Notified結(jié)果
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 同步URL信息,這里有兩種實現(xiàn),分別是RegistryDirectory和RegistryProtocol$OverrideListener,listener通過subscribe方法被注冊到subscribed緩存,
listener.notify(categoryList);
// 每次notify都會更新cache緩存文件(同步或者異步),保證注冊中心properties內(nèi)容與各訂閱者拿到的信息一致。
saveProperties(url);
}
}
同步URL信息到訂閱者的邏輯主要在RegistryDirectory和RegistryProtocol$OverrideListener,這里就不再做解析了。來看同步到內(nèi)存properties的邏輯,把所有待同步的URL序列化為字符串(每個url中間用空格隔開),然后將URL數(shù)據(jù)保存(同步或異步)至緩存文件(即2.2.1.1中創(chuàng)建的緩存文件),注意,這里異步保存實際執(zhí)行的邏輯與同步保存完全一致,核心邏輯在doSaveProperties方法。
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
// url緩存信息組裝,每個url中間用空格隔開
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
// 更新內(nèi)存緩存
properties.setProperty(url.getServiceKey(), buf.toString());
// 版本控制,可以看出,MVCC并非DB專有
long version = lastCacheChanged.incrementAndGet();
// 同步保存文件則直接執(zhí)行save,將properties緩存內(nèi)容同步至緩存文件,否則放入線程池異步調(diào)度
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
繼續(xù)來看doSaveProperties方法,方法參數(shù)是當(dāng)前文件的版本號,可以看到,防止并發(fā)操作,版本號用于版本控制
// 保存配置到本地緩存文件,文件版本
public void doSaveProperties(long version) {
// 防止版本回退
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
// 創(chuàng)建本地緩存文件鎖,不存在則直接創(chuàng)建;
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
//拿到鎖才可以操作
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) {
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
}
}
為了進(jìn)一步加深理解,我們把AbstractRegistry構(gòu)建過程中的數(shù)據(jù)流圖示如下:

2.2.1.4、數(shù)據(jù)注冊 - register
基類中的注冊等方法邏輯非常簡單,只是將URL放入緩存,非常簡單,不做過多說明。
2.2.1.5、數(shù)據(jù)注銷 - unRegister
同樣的,注銷方法邏輯也非常簡單,將URL從緩存中刪除。
2.2.1.6、數(shù)據(jù)訂閱 - subscribe
數(shù)據(jù)訂閱需要注意,訂閱的邏輯核心是注冊監(jiān)聽器,以便數(shù)據(jù)變更時,同步更新;subscribed緩存的結(jié)構(gòu)比較特殊,來看代碼:
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// subscribed緩存結(jié)構(gòu):ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}
2.2.1.7、取消數(shù)據(jù)訂閱 - unSubscribe
取消訂閱即刪除監(jiān)聽器,從subscribed緩存中刪除對應(yīng)監(jiān)聽器。
2.2.1.8、數(shù)據(jù)查找 - lookup
查找邏輯即,從已同步過的URL列表(notified緩存)中,過濾滿足要求的URL;若當(dāng)前已同步過的URL集合為空,則
@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
// 即注冊監(jiān)聽器,保證首次notify有數(shù)據(jù)返回
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
2.2.1.9、注冊中心銷毀- destory
銷毀的邏輯比較簡單,分為注銷數(shù)據(jù)和注銷監(jiān)聽器兩部分,來看代碼:
@Override
// 主要做兩件事情:1、注銷URL,從registered列表中剔除所有URL;2、移除所有NotifyListener,也就是說不再接受配置變更同步消息。
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 將所有URL從registered移除,即注銷全部數(shù)據(jù)
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 注銷監(jiān)聽器,即從subscribed移除所有訂閱URL的listener
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
AbstractRegistry基類的邏輯就分析到這里,接著來看FailbackRegistry。
2.2.2、FailbackRegistry
FailbackRegistry 顧名思義,支持失敗恢復(fù)的注冊中心,繼承自基類AbstractRegistry,可以看到在基類基礎(chǔ)上擴(kuò)展了失敗恢復(fù)功能,先來看幾個緩存變量:
// 重試任務(wù)map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
幾個重試任務(wù)map,用于緩存失敗的任務(wù)以便于重試;接著來看構(gòu)造方法,在父類構(gòu)造方法的基礎(chǔ)上,新增了HashedWheelTimer實例,用于定時重試失敗任務(wù),默認(rèn)重試時間間隔5s。
public FailbackRegistry(URL url) {
super(url);
// 默認(rèn)重試時間間隔 5s
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 利用hashTimer實現(xiàn)定時重試,
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
除此之外,F(xiàn)ailbackRegistry還定義了幾個關(guān)鍵的模板方法,由子類實現(xiàn):
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、數(shù)據(jù)注冊-register
注冊邏輯在父類基礎(chǔ)上新增了失敗以后的操作,邏輯比較簡單,直接來看代碼:
@Override
public void register(URL url) {
// 父類注冊方法,保存url到已注冊列表
super.register(url);
// 將url從注冊失敗列表中剔除
removeFailedRegistered(url);
// 將url從注銷失敗列表中剔除
removeFailedUnregistered(url);
try {
// 執(zhí)行模板方法邏輯
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果啟動檢測開關(guān)開啟(默認(rèn)開啟),失敗會直接拋異常,否則加入失敗列表,用于重試
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 加入注冊失敗列表,用于重試
addFailedRegistered(url);
}
}
2.2.2.2、數(shù)據(jù)注銷-unRegister
注銷邏輯與注冊類似,直接來看代碼:
@Override
public void unregister(URL url) {
// 父類注銷邏輯
super.unregister(url);
// 從注冊失敗緩存中刪除
removeFailedRegistered(url);
// 從注銷失敗列表中刪除
removeFailedUnregistered(url);
try {
// 執(zhí)行模板方法
doUnregister(url);
} catch (Exception e) {
Throwable t = e;
// 如果啟動檢測開關(guān)開啟(默認(rèn)開啟),失敗會直接拋異常,否則加入失敗列表,用于重試
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
//加入失敗重試緩存
addFailedUnregistered(url);
}
}
2.2.2.3、數(shù)據(jù)訂閱-subscribe
與注冊、注銷邏輯類似,數(shù)據(jù)訂閱邏輯也非常簡單,代碼就不再展示了
2.2.2.4、取消數(shù)據(jù)訂閱-unSubscribe
與注冊、注銷邏輯類似,不做過多解析。
2.3、注冊中心實現(xiàn)
好了,前面的鋪墊結(jié)束了,本節(jié)來看具體的注冊中心實現(xiàn),下面按照DubboRegistry、MulticastRegistry、RedisRegistry、ZookeeperRegistry的順序依次分析。
2.3.1、DubboRegistry
DubboRegistry作為dubbo的默認(rèn)注冊中心實現(xiàn)(RegistryFactory默認(rèn)SPI實現(xiàn)是DubboRegistryFactory),嚴(yán)格意義上來講,DubboRegistry實際上是一個Registry代理,核心邏輯全部由代理也即registryService實現(xiàn)。DubboRegistry的創(chuàng)建在DubboRegistryFactory中已經(jīng)做了解析,這里不做過多說明。重點關(guān)注DubboRegistry的構(gòu)造方法
public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
super(registryInvoker.getUrl());
this.registryInvoker = registryInvoker;
// Registry代理,核心邏輯借助registryService實現(xiàn)
this.registryService = registryService;
// 重連定時器,默認(rèn)重連間隔時間3s
this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
// 初始化調(diào)度任務(wù)邏輯,具體邏輯參考recover方法
reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
try {
// 重連邏輯
connect();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
}
}, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}
介紹FailbackRegistry時說過,所有的注冊中心實現(xiàn)都支持自動恢復(fù),DubboRegistry的自動恢復(fù)實現(xiàn)原理是若當(dāng)前注冊中心不可用,則直接將該URL注冊信息加入到注冊失敗、訂閱失敗緩存(借助父類FailbackRegistry的addFailedRegistered、addFailedSubscribed方法實現(xiàn)),由父類的HashedWheelTimer重新調(diào)度,進(jìn)行恢復(fù),來看失敗后加入緩存的邏輯(外層方法是connect,內(nèi)部實際上執(zhí)行的recover方法):
// 核心邏輯:注冊失敗、訂閱失敗的url分別放入對應(yīng)列表,用于hashedWheelTimer調(diào)度
@Override
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
// 放入注冊失敗列表
for (URL url : recoverRegistered) {
addFailedRegistered(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
2.3.2、MulticastRegistry
MulticastRegistry即多播注冊中心,顧名思義,采用多播實現(xiàn);注冊、注銷、訂閱、取消訂閱等均通過多播方式實現(xiàn)。核心邏輯在構(gòu)造方法,包括多播組的創(chuàng)建以及多播消息的處理;創(chuàng)建多播組比較容易理解,重點關(guān)注多播消息的處理,在構(gòu)造方法中創(chuàng)建并啟動一個守護(hù)線程,用于接收并處理多播消息(注冊消息、注銷消息、訂閱消息),處理多播消息的入口是receive方法。
2.3.2.1、構(gòu)造方法
先來看構(gòu)造方法:
// 創(chuàng)建并啟動daemon線程,用于接收廣播消息,對接收到的消息處理邏輯在receive方法
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
try {
// 創(chuàng)建并加入多播組
multicastAddress = InetAddress.getByName(url.getHost());
checkMulticastAddress(multicastAddress);
multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
multicastSocket = new MulticastSocket(multicastPort);
// 注冊中心URL地址加入多播組
NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
// 啟動daemon線程,用于接收廣播socket消息
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
// UDP包封裝
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (!multicastSocket.isClosed()) {
try {
// 接收UDP報文
multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0, i).trim();
}
// 多播消息接收處理
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if (!multicastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
}
}
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 這里利用定時調(diào)度線程池,定時清理received緩存中不可用socket;默認(rèn)清理時間間隔60s;
this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
if (url.getParameter("clean", true)) {
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
}
}
}, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null;
}
}
receive方法實現(xiàn)比較簡單,根據(jù)msg類型(通過消息前綴判斷)執(zhí)行相應(yīng)的注冊、注銷、訂閱操作;重點關(guān)注registered、unregistered、multicast三個方法(MulticastRegistry的注冊、注銷、訂閱邏輯均通過這三個方法實現(xiàn))。
private void receive(String msg, InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
// 廣播注冊消息
if (msg.startsWith(Constants.REGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
registered(url);
// 廣播注銷消息
} else if (msg.startsWith(Constants.UNREGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
unregistered(url);
// 廣播訂閱消息
} else if (msg.startsWith(Constants.SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
// 根據(jù)注冊的地址,發(fā)送單播、多播消息
Set<URL> urls = getRegistered();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
// 發(fā)送單播,多播消息
if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
unicast(Constants.REGISTER + " " + u.toFullString(), host);
} else {
multicast(Constants.REGISTER + " " + u.toFullString());
}
}
}
}
}/* else if (msg.startsWith(UNSUBSCRIBE)) {
}*/
}
先來看registered方法,核心邏輯是將URL與subscried緩存中的URL進(jìn)行匹配,匹配成功則加入received,同時同步給訂閱者(通過NotifyListener的notify方法),然后通知當(dāng)前l(fā)istener(在subscribe時wait*)
protected void registered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls == null) {
received.putIfAbsent(key, new ConcurrentHashSet<URL>());
urls = received.get(key);
}
urls.add(url);
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
再來看unRegistered,邏輯上大體與registered類似,將多播消息中的URL與subscried中URL匹配,匹配成功則將該URL從received中剔除;若當(dāng)前received中該URL對應(yīng)URL列表為空,則重置該URL的protocol為empty;最后同步變更給訂閱者。
protected void unregistered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls != null) {
urls.remove(url);
}
// received中url對應(yīng)URL列表為空,則直接重置該url協(xié)議為empty
if (urls == null || urls.isEmpty()) {
if (urls == null) {
urls = new ConcurrentHashSet<URL>();
}
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
// 同步變更消息到各訂閱者
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
}
}
}
}
最后來看multicast方法,邏輯比較簡單,即為發(fā)送多播消息到多播消息組(多播消息由構(gòu)造方法中創(chuàng)建的線程異步處理)
private void multicast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
2.3.2.2、其他邏輯
MulticastRegistry的其他邏輯包括doRegister、doUnregister、doSubscribe、doUnsubscribe、destroy,前面四個方法的實現(xiàn)方式完全一致,即拼接并發(fā)送多播消息到多播組,這里以doRegister為例,代碼如下:
@Override
public void doRegister(URL url) {
multicast(Constants.REGISTER + " " + url.toFullString());
}
最后來看destroy,銷毀需要處理多播組、關(guān)閉所有線程池、關(guān)閉所有socket,來看代碼:
@Override
public void destroy() {
super.destroy();
try {
// 取消定時清理任務(wù)
ExecutorUtil.cancelScheduledFuture(cleanFuture);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
// 退出多播組,并關(guān)閉socket
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// 關(guān)閉定時清理線程池,這里關(guān)注下線程池的優(yōu)雅關(guān)閉
ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
來看下dubbo線程池的優(yōu)雅關(guān)閉:
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// 新任務(wù)不能再提交
es.shutdown();
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
try {
// 等待隊列內(nèi)剩余任務(wù)結(jié)束,立即關(guān)閉線程池
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
// 若線程池仍未關(guān)閉,則新建線程用于線程池的關(guān)閉
if (!isTerminated(es)) {
newThreadToCloseExecutor(es);
}
}
// 專門用于關(guān)閉線程池的線程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
if (!isTerminated(es)) {
shutdownExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
es.shutdownNow();
if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}
了解完MulticastRegistry的所有方法,我們來看整個MulticastRegistry中的數(shù)據(jù)流,執(zhí)行注冊操作(這里以register為例,其他操作類似)將數(shù)據(jù)多播至多播組,然后由deamon線程異步將數(shù)據(jù)同步至訂閱者,如下圖所示:

2.3.3、RedisRegistry
RedisRegistry,即使用Redis存儲URL數(shù)據(jù)的注冊中心,這里從初始化、數(shù)據(jù)流以及核心方法等幾個方面進(jìn)行解析。RedisRegisty除了使用Redis緩存之外,還使用了Redis的消息隊列,用于doSubscribe過程中的URL數(shù)據(jù)變更消息處理。
2.3.3.1、初始化
先來看RedisRegistry的初始化,大致可以分為父類構(gòu)造方法、注冊中心參數(shù)初始化、RedisPool連接池創(chuàng)建、過期調(diào)度線程池啟動。父類構(gòu)造方法主要是AbstractRegistry構(gòu)造方法的調(diào)用;參數(shù)初始化主要包括RedisPool連接池參數(shù)初始化、注冊中心重連時間間隔初始化、過期線程池調(diào)度時間間隔初始化等;RedisPool的創(chuàng)建比較簡單,即直接取上一步中的參數(shù)創(chuàng)建RedisPool連接池(這里需要注意,會對同一個URL的多個backup地址單獨創(chuàng)建RedisPool,多個地址之間進(jìn)行了隔離,保證URL的可用性);過期調(diào)度線程池的調(diào)度比較容易理解,也就是說在注冊中心構(gòu)造過程中已經(jīng)啟動了對過期URL數(shù)據(jù)的定時清理,默認(rèn)調(diào)度間隔30s。

然后來看構(gòu)造方法:
public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 借用對象池管理配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
// config參數(shù)配置,略去
// 支持的redis集群模式,failover和replicate
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);
List<String> addresses = new ArrayList<>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (ArrayUtils.isNotEmpty(backups)) {
addresses.addAll(Arrays.asList(backups));
}
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
// 每個地址對應(yīng)一個jedis連接池,URL的各地址之間互不影響,保證可用性
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0)));
}
this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
if (!group.endsWith(Constants.PATH_SEPARATOR)) {
group = group + Constants.PATH_SEPARATOR;
}
// group = "/group/"
this.root = group;
// 配置過期定時調(diào)度
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
// 延遲過期
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
// 延遲過期
private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
// 延長緩存過期時間,并發(fā)布隊列消息
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
jedis.publish(key, Constants.REGISTER);
}
}
}
// 如果開啟了強(qiáng)制清理開關(guān);則直接清理redis中數(shù)據(jù),并發(fā)布注銷消息
if (admin) {
clean(jedis);
}
// 無需創(chuàng)建副本,則只寫單臺redis,直接break;
if (!replicate) {
break;// If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
再來看整個RedisRegistry中的數(shù)據(jù)流,可以發(fā)現(xiàn),RedisRegistry通過Redis緩存與Redis消息隊列,異步+同步的方式將數(shù)據(jù)同步至本地cache文件、內(nèi)存緩存Properties以及具體的數(shù)據(jù)訂閱者,如RegistryDirectory:

2.3.3.2、注冊(doRegister)
了解完RedisRegistry中的數(shù)據(jù)流,再來看注冊過程就比較容易理解了。主要包括兩個核心操作:將當(dāng)前URL緩存至Redis;然后將URL信息發(fā)布至Redis消息隊列。直接來看代碼:
public void doRegister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
// 當(dāng)前URL的所有可用地址,遍歷,存入redis,并發(fā)布注冊消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.3、注銷(doUnregister)
注銷邏輯即注冊邏輯的反向操作,先刪除redis緩存,再發(fā)布注銷消息:
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
// 遍歷當(dāng)前URL的所有可用節(jié)點地址,遍歷從redis中刪除,并發(fā)布注銷消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.4、訂閱(doSubscribe)
訂閱邏輯稍微復(fù)雜,支持同步和異步方式將URL信息同步至訂閱者,如RegistryDirectory;同步邏輯比較簡單,將被訂閱的URL與redis中緩存URL進(jìn)行交叉過濾,最終通過父類notify方法(AbstractRegistry的notify方法),完成URL數(shù)據(jù)同步;異步邏輯由Notifier線程實現(xiàn),Notifier依照線性退避規(guī)則執(zhí)行,執(zhí)行邏輯除了同步URL信息之外,還會訂閱redis消息隊列并消費隊列中消息,當(dāng)然,消費的核心邏輯也是執(zhí)行doNotify方法。
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
// 每個service對應(yīng)一個notifier線程
Notifier notifier = notifiers.get(service);
// 異步方式,創(chuàng)建Notifier線程,并啟動,線性回避執(zhí)行,數(shù)據(jù)流 : redis緩存、redis消息隊列 -> 訂閱者
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
// notifier線程創(chuàng)建后即啟動
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
// 同步方式訂閱 數(shù)據(jù)流 redis緩存 -> 訂閱者,如RegistryDirector
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 區(qū)分所有服務(wù)與單個服務(wù)。
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
} else {
doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 異常處理邏輯略去
}
來看doNotify實現(xiàn)
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
if (!Constants.ANY_VALUE.equals(consumerService)) {
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
String category = toCategoryName(key);
if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
continue;
}
List<URL> urls = new ArrayList<>();
// redis緩存URL信息與內(nèi)存中被訂閱的URL信息進(jìn)行交叉過濾
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
if (!u.getParameter(Constants.DYNAMIC_KEY, true)
|| Long.parseLong(entry.getValue()) >= now) {
if (UrlUtils.isMatch(url, u)) {
// 與url匹配的緩存URL放入待通知列表
urls.add(u);
}
}
}
}
// 若無有效URL,則填充urls地址為任意值*
if (urls.isEmpty()) {
urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
.setAddress(Constants.ANYHOST_VALUE)
.setPath(toServiceName(key))
.addParameter(Constants.CATEGORY_KEY, category));
}
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (CollectionUtils.isEmpty(result)) {
return;
}
// 由父類nofity方法完成最終URL數(shù)據(jù)的同步
for (NotifyListener listener : listeners) {
notify(url, listener, result);
}
}
再來看Notifier實現(xiàn)
private class Notifier extends Thread {
private final String service;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkipped = new AtomicInteger();
private volatile Jedis jedis;
private volatile boolean first = true;
private volatile boolean running = true;
private volatile int connectRandom;
public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}
private void resetSkip() {
connectSkip.set(0);
connectSkipped.set(0);
connectRandom = 0;
}
// 首次結(jié)果為false;線性退避算法
private boolean isSkip() {
// 初始值均為0
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) {
if (connectRandom == 0) {
connectRandom = ThreadLocalRandom.current().nextInt(10);
}
skip = 10 + connectRandom;
}
// 初始值 false,
// 第一次:false,0-1
// 第二次:true,1-1
// 第三次:false,0-2
// 第四次:true,1-2
// 第五次:true,2-2
// 第五次:false,0-3
if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
// skip 自增
connectSkip.incrementAndGet();
// skiped重置0
connectSkipped.set(0);
connectRandom = 0;
return false;
}
@Override
public void run() {
while (running) {
try {
// 線性回避,是否跳過
if (!isSkip()) {
try {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
jedis = jedisPool.getResource();
try {
// service是具體類型還是所有服務(wù)
if (service.endsWith(Constants.ANY_VALUE)) {
if (!first) {
first = false;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
for (String s : keys) {
// 同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
doNotify(jedis, s);
}
}
// 同步完之后,重置skip
resetSkip();
}
// 處理Redis消息隊列中所有與service匹配的消息
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
} else {
if (!first) {
first = false;
// 同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
doNotify(jedis, service);
resetSkip();
}
// 處理Redis消息隊列中所有與service匹配的消息,同樣的同步至各數(shù)據(jù)訂閱者以及本地cache文件。
jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
}
break;
} finally {
jedis.close();
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
public void shutdown() {
try {
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
這里順便提一下NotifySub,繼承自JedisPubSub,用于redis消息隊列中消息的消費,核心邏輯在onMessage方法:
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
// 只處理注冊、注銷類消息
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, key);
} finally {
jedis.close();
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
2.3.4、ZookeeperRegistry
ZookeeperRegistry,即zk注冊中心,也是dubbo官方默認(rèn)采用的注冊中心。先來看構(gòu)造方法:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 初始化zkClient
zkClient = zookeeperTransporter.connect(url);
// 若是重連狀態(tài),執(zhí)行恢復(fù)邏輯,即將已注冊過的URL,放入retry列表,重新注冊;
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
構(gòu)造方法里有一個參數(shù),ZookeeperTransporter,先來看一下這個ZookeeperTransporter。
2.3.4.1、ZookeeperTransporter
ZookeeperTransporter是dubbo對zk客戶端的適配接口,支持SPI(方法級),內(nèi)部只有一個connect方法,返回ZookeeperClient實例。我們知道,zk客戶端有常用的兩個實現(xiàn),分別是:
-
Curator,Netflix公司開源的一套zookeeper客戶端框架,jar包gav如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> -
Zkclient,Github上一個開源的Zookeeper客戶端,在Zookeeper原生 API接口之上進(jìn)行了包裝,是一個更加易用的Zookeeper客戶端,jar包gav如下:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>dubbo中借助這兩種客戶端組件,ZookeeperClient的實現(xiàn)有CuratorZookeeperClient、ZkclientZookeeperClient,內(nèi)部通過客戶端組件完成zk的連接、監(jiān)聽等動作,具體邏輯這里不做詳細(xì)解析。繼續(xù)來看ZookeeperTransporter,基于接口的基類實現(xiàn)AbstractZookeeperTransporter,實現(xiàn)了connect方法,同時定義模板方法createZookeeperClient由具體的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter,邏輯比較簡單,省略)實現(xiàn)。直接來看基類的connect方法:
@Override public ZookeeperClient connect(URL url) { ZookeeperClient zookeeperClient; // backUrl解析 List<String> addressList = getURLBackupAddress(url); // The field define the zookeeper server , including protocol, host, port, username, password // fetchAndUpdateZookeeperClientCache 邏輯比較簡單,從緩存里取,取不到則返回null; if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // avoid creating too many connections, so add lock,加鎖,防并發(fā)。 synchronized (zookeeperClientMap) { if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // 緩存沒取到,創(chuàng)建zkClient,并緩存到map zookeeperClient = createZookeeperClient(toClientURL(url)); logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url); writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; }
2.3.4.2、doRegister方法
基類FailbackRegistry的模板方法實現(xiàn),邏輯非常簡單:
@Override
public void doRegister(URL url) {
try {
// 創(chuàng)建zk臨時節(jié)點,節(jié)點目錄類似 /dubbo/xxx/xxx/xxx.xxx.xxxService
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.3、doUnregister方法
基類FailbackRegistry的模板方法實現(xiàn),邏輯同樣非常簡單:
@Override
public void doUnregister(URL url) {
try {
// 刪除zk節(jié)點
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.4、doSubscribe方法
基類FailbackRegistry的模板方法實現(xiàn),邏輯相對復(fù)雜,主要分為幾部分1)zkListener的初始化,2)遞歸訂閱url變更信息,創(chuàng)建zk永久節(jié)點,3)執(zhí)行父類notify(邏輯參考AbstractRegistry解析部分),代碼如下:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 無指定服務(wù)
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 根節(jié)點/dubbo
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
// 初始化zkListener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 初始化anyServices 注冊 ChildListener到listener內(nèi)存緩存,訂閱childChange變更,即當(dāng)執(zhí)行childChanged時,完成對所有child的訂閱。
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
// 遞歸訂閱url變更消息,最終會走到else,結(jié)束
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
// 創(chuàng)建永久節(jié)點/dubbo
zkClient.create(root, false);
// 訂閱URL變更信息
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 遞歸訂閱url變更消息,最終會走到else,結(jié)束
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//指定URL變更處理
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// childChange事件,只做notify到指定listener
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 監(jiān)聽該path
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 父類AbstractRegistry的notify邏輯,略去
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.5、doUnsubscribe方法
基類FailbackRegistry的模板方法實現(xiàn),直接上代碼:
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 邏輯比較簡單,即不再訂閱對應(yīng)的listener
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
2.3.4.6、lookup方法
基類AbstractRegistry的模板方法實現(xiàn),核心邏輯是從zk中查詢指定URL對應(yīng)地址的所有可用URL(不同Category),直接來看代碼:
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
小結(jié)
本文重點解析了dubbo中Registry的相關(guān)核心實現(xiàn),從RegistryFactory到Registry,dubbo抽象了一系列注冊中心,為用戶自定義擴(kuò)展提供了非常優(yōu)良的入口;同時,dubbo為我們提供了基于緩存、多播、zk等的注冊中心實現(xiàn),非常方便,不得不感嘆設(shè)計的非常好。
注:源碼版本 2.7.1。8月以來,接連經(jīng)歷了迎接小生命的驚喜,跳槽后的適應(yīng)期,中間耽誤了挺長一段時間,后面我會努力持續(xù)更新的,come on。