dubbo之Registry(注冊中心)

前言

Registry是dubbo對注冊中心的抽象,提供服務(wù)的注冊、注銷、查找、訂閱、取消訂閱等功能。本文按照dubbo中Registry的組織形式,分析Registry的核心邏輯。首先來看注冊中心的創(chuàng)建,RegistryFactory接口定義注冊中心的創(chuàng)建(工廠模式實現(xiàn)),支持SPI擴(kuò)展,默認(rèn)SPI實現(xiàn)是DubboRegistryFactory。注冊中心Registry接口繼承Node、RegistryService,抽象基類AbstractRegistry直接實現(xiàn)Registry,F(xiàn)ailbackRegistry繼承自基類AbstractRegistry,所有注冊中心實現(xiàn)均繼承自FailbackRegistry,這里可以看出,所有注冊中心均支持失敗重試(Failback)。

一、RegistryFactory(注冊中心工廠)

RegistryFactory的UML類圖如下:


注冊中心工廠UML (1).jpg

先來看RegistryFactory接口定義,比較簡單:

@SPI("dubbo")
public interface RegistryFactory {
    /**
     * Connect to the registry
     *  1、check=false時,連接無需check,否則連接斷開時直接拋異常
     *  2、支持URL的用戶名、密碼權(quán)限校驗
     *  3、支持注冊中心族備份地址:10.20.153.10
     *  4、支持注冊中心本地緩存文件
     *  5、支持超時設(shè)置
     *  6、支持session 60s過期
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

1.1 AbstractRegistryFactory

接著是抽象基類AbstractRegistryFactory,實現(xiàn)RegistryFactory接口的getRegistry方法,同時定義模板方法createRegistry供子類實現(xiàn);AbstractRegistryFactory的getRegistry方法先從Map緩存查詢注冊中心,查不到則執(zhí)行模板方法createRegistry創(chuàng)建注冊中心,并放入緩存,然后返回該registry

@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    //加鎖,保證單例
    LOCK.lock();
    try {
        // 緩存查詢注冊中心
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 不存在則通過spi、ioc方式創(chuàng)建registry
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 鎖釋放
        LOCK.unlock();
    }
}

1.2 其他實現(xiàn)

基類AbstractRegistryFactory的子類實現(xiàn)中,比較重要的是DubboRegistryFactory(介紹Protocol時已經(jīng)做了解析),其他實現(xiàn)比如RedisRegistryFactory、ZookeeperRegistryFactory、MulticastRegistryFactory的邏輯非常簡單,直接返回對應(yīng)的注冊中心實現(xiàn),代碼就省略了。

二、Registry(注冊中心)

來看dubbo中Registry實現(xiàn)。UML類圖如下:


注冊中心UML.jpg

方便理解起見,這里把注冊中實現(xiàn)分為三個層次,分別是Registry接口、FailbackRegistry實現(xiàn)、注冊中心實現(xiàn)。

2.1、Registry接口

上面UML類圖中可以看出,Registry繼承Node和RegistryService接口(Registry接口內(nèi)部無新增方法),重點關(guān)注RegistryService接口。RegistryService抽象了服務(wù)的注冊、注銷、訂閱、取消訂閱、查找等核心功能,來看接口定義:

public interface RegistryService {
    /**
     * 注冊數(shù)據(jù),比如提供者服務(wù)、消費者地址、路由規(guī)則、override規(guī)則以及其他數(shù)據(jù)
     * 1、若URL中check=false,那么注冊失敗會進(jìn)行重試且不會拋出異常,否則直接拋異常
     * 2、若URL中dynamic=false,那么URL中信息會被持久化存儲,否則注冊過程異常退出,URL信息應(yīng)當(dāng)被刪除
     * 3、若URL中category=routers,那么意味著分類存儲,默認(rèn)catetory=providers,且會根據(jù)分類區(qū)域進(jìn)行通知更新
     * 4、當(dāng)注冊中心重啟,網(wǎng)絡(luò)波動,數(shù)據(jù)不能被丟棄,包括刪除損壞的流水線中數(shù)據(jù)的刪除
     * 5、參數(shù)不同的URL可以共存,不能相互覆蓋
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);

    /**
     * 注銷
     * 1、若是dynamic=false的持久化存儲,如果找不到注冊數(shù)據(jù),那么會拋出非法狀態(tài)異常,否則會忽略
     * 2、根據(jù)完整URL進(jìn)行匹配注銷
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);

    /**
     * 訂閱注冊數(shù)據(jù),并在注冊數(shù)據(jù)更新的時候自動推送
     * 服務(wù)訂閱
     * 1、若URL中check=false,那么當(dāng)注冊失敗時,會直接在后臺重試不會拋異常
     * 2、若URL中category=routers,那么只會通知特定分類數(shù)據(jù);多個分類用逗號隔開;允許使用*全部匹配
     * 3、允許接口、組、版本以及分類作為查詢條件
     * 4、查詢條件允許使用*進(jìn)行匹配,意味著訂閱接口的所有版本
     * 5、若注冊中心重啟、網(wǎng)絡(luò)波動,必須自動保存訂閱請求
     * 6、參數(shù)不同的URL可以共存,不能相互覆蓋
     * 7、訂閱過程必須是阻塞的
     */
    void subscribe(URL url, NotifyListener listener);

    /**
    * 1、沒有訂閱,則直接忽略
     * 2、根據(jù)URL完全匹配
     **/
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 根據(jù)條件匹配,查詢注冊數(shù)據(jù);對應(yīng)訂閱的推模式,提供拉模式,且僅返回一個結(jié)果
    */
    List<URL> lookup(URL url);

從接口定義可以看出,RegsitryService對數(shù)據(jù)的注冊、注銷、訂閱、取消訂閱、查找等功能做了定義約束,所有對RegistryService的實現(xiàn)都必須滿足這個約束。

2.2、AbstractRegistry & FailbackRegistry

接下來看Registry接口的基類實現(xiàn)AbstractRegistry、FailbackRegistry。

2.2.1 AbstractRegistry

先來看AbstractRegistry,重點關(guān)注構(gòu)造方法,AbstractRegistry基類的構(gòu)造過程的核心邏輯分為三部分:1、創(chuàng)建注冊中心緩存文件;2、cache文件加載至properties;3、同步backUpUrl信息。我們先來看構(gòu)造方法的定義,然后再分步來看:

public AbstractRegistry(URL url) {
    // 注冊中心URL負(fù)載
    setUrl(url);
    // Start file save timer
    // 1、注冊中心文件同步保存開關(guān),默認(rèn)關(guān)閉,即默認(rèn)異步保存
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    // 默認(rèn)文件路徑: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        // 文件目錄創(chuàng)建失敗,直接拋異常
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    // When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
    // 2、加載注冊中心cache文件到內(nèi)存,用于容錯
    loadProperties();
    // 3、backUpUrl數(shù)據(jù)同步,同步所有訂閱者、更新properties緩存。
    notify(url.getBackupUrls());
}
2.2.1.1、創(chuàng)建注冊中心緩存

這一步比較簡單,首先根據(jù)URL信息,確定緩存文件保存方式(異步、同步);然后,拼接緩存文件名稱,根據(jù)文件名創(chuàng)建緩存文件,創(chuàng)建失敗則直接拋異常,否則初始化緩存文件file。

2.2.1.2、注冊中心cache加載

加載本地cache文件到內(nèi)存緩存properties,將上一步中的cache文件內(nèi)容,加載到properties。比較容易理解,啟動時先讀本地緩存,用于容錯。

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        // 省略try-catch
            in = new FileInputStream(file);
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry cache file " + file + ", data: " + properties);
            }
    }
}
2.2.1.3、backupUrl數(shù)據(jù)同步

重點來看backupUrl的數(shù)據(jù)同步,backupUrl的生成方式前面我們已經(jīng)講過了,這一步主要是將生成的backupUrl列表同步給各訂閱者以及內(nèi)存緩存Properties,來看代碼:

protected void notify(List<URL> urls) {
    if (CollectionUtils.isEmpty(urls)) {
        return;
    }

    // 根據(jù)已訂閱的URL,以及訂閱者listener進(jìn)行同步
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();
                
        //過濾掉不匹配的URL
        if (!UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    // 核心邏輯,執(zhí)行URL信息同步
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

繼續(xù)來看notify方法:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    // 按照category分組
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
  
    //緩存分組后的Notified結(jié)果
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // 同步URL信息,這里有兩種實現(xiàn),分別是RegistryDirectory和RegistryProtocol$OverrideListener,listener通過subscribe方法被注冊到subscribed緩存,
        listener.notify(categoryList);
        // 每次notify都會更新cache緩存文件(同步或者異步),保證注冊中心properties內(nèi)容與各訂閱者拿到的信息一致。
        saveProperties(url);
    }
}

同步URL信息到訂閱者的邏輯主要在RegistryDirectory和RegistryProtocol$OverrideListener,這里就不再做解析了。來看同步到內(nèi)存properties的邏輯,把所有待同步的URL序列化為字符串(每個url中間用空格隔開),然后將URL數(shù)據(jù)保存(同步或異步)至緩存文件(即2.2.1.1中創(chuàng)建的緩存文件),注意,這里異步保存實際執(zhí)行的邏輯與同步保存完全一致,核心邏輯在doSaveProperties方法。

private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        //  url緩存信息組裝,每個url中間用空格隔開
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        // 更新內(nèi)存緩存
        properties.setProperty(url.getServiceKey(), buf.toString());
        // 版本控制,可以看出,MVCC并非DB專有
        long version = lastCacheChanged.incrementAndGet();
        // 同步保存文件則直接執(zhí)行save,將properties緩存內(nèi)容同步至緩存文件,否則放入線程池異步調(diào)度
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

繼續(xù)來看doSaveProperties方法,方法參數(shù)是當(dāng)前文件的版本號,可以看到,防止并發(fā)操作,版本號用于版本控制

// 保存配置到本地緩存文件,文件版本
public void doSaveProperties(long version) {
    // 防止版本回退
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        // 創(chuàng)建本地緩存文件鎖,不存在則直接創(chuàng)建;
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();
            //拿到鎖才可以操作
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            // Save
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                lock.release();
            }
        }
    } catch (Throwable e) {
        if (version < lastCacheChanged.get()) {
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
    }
}

為了進(jìn)一步加深理解,我們把AbstractRegistry構(gòu)建過程中的數(shù)據(jù)流圖示如下:


AbstractRegistry init.jpg
2.2.1.4、數(shù)據(jù)注冊 - register

基類中的注冊等方法邏輯非常簡單,只是將URL放入緩存,非常簡單,不做過多說明。

2.2.1.5、數(shù)據(jù)注銷 - unRegister

同樣的,注銷方法邏輯也非常簡單,將URL從緩存中刪除。

2.2.1.6、數(shù)據(jù)訂閱 - subscribe

數(shù)據(jù)訂閱需要注意,訂閱的邏輯核心是注冊監(jiān)聽器,以便數(shù)據(jù)變更時,同步更新;subscribed緩存的結(jié)構(gòu)比較特殊,來看代碼:

@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    // subscribed緩存結(jié)構(gòu):ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}
2.2.1.7、取消數(shù)據(jù)訂閱 - unSubscribe

取消訂閱即刪除監(jiān)聽器,從subscribed緩存中刪除對應(yīng)監(jiān)聽器。

2.2.1.8、數(shù)據(jù)查找 - lookup

查找邏輯即,從已同步過的URL列表(notified緩存)中,過濾滿足要求的URL;若當(dāng)前已同步過的URL集合為空,則

@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        for (List<URL> urls : notifiedUrls.values()) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    } else {
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        NotifyListener listener = reference::set;
        // 即注冊監(jiān)聽器,保證首次notify有數(shù)據(jù)返回
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    }
    return result;
}
2.2.1.9、注冊中心銷毀- destory

銷毀的邏輯比較簡單,分為注銷數(shù)據(jù)和注銷監(jiān)聽器兩部分,來看代碼:

@Override
// 主要做兩件事情:1、注銷URL,從registered列表中剔除所有URL;2、移除所有NotifyListener,也就是說不再接受配置變更同步消息。
public void destroy() {
    if (logger.isInfoEnabled()) {
        logger.info("Destroy registry:" + getUrl());
    }
    // 將所有URL從registered移除,即注銷全部數(shù)據(jù)
    Set<URL> destroyRegistered = new HashSet<>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
        for (URL url : new HashSet<>(getRegistered())) {
            if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                try {
                    unregister(url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unregister url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }

    // 注銷監(jiān)聽器,即從subscribed移除所有訂閱URL的listener
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                try {
                    unsubscribe(url, listener);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unsubscribe url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

AbstractRegistry基類的邏輯就分析到這里,接著來看FailbackRegistry。

2.2.2、FailbackRegistry

FailbackRegistry 顧名思義,支持失敗恢復(fù)的注冊中心,繼承自基類AbstractRegistry,可以看到在基類基礎(chǔ)上擴(kuò)展了失敗恢復(fù)功能,先來看幾個緩存變量:

// 重試任務(wù)map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

幾個重試任務(wù)map,用于緩存失敗的任務(wù)以便于重試;接著來看構(gòu)造方法,在父類構(gòu)造方法的基礎(chǔ)上,新增了HashedWheelTimer實例,用于定時重試失敗任務(wù),默認(rèn)重試時間間隔5s。

public FailbackRegistry(URL url) {
    super(url);
    // 默認(rèn)重試時間間隔 5s
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

    // 利用hashTimer實現(xiàn)定時重試,
    retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}

除此之外,F(xiàn)ailbackRegistry還定義了幾個關(guān)鍵的模板方法,由子類實現(xiàn):

public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、數(shù)據(jù)注冊-register

注冊邏輯在父類基礎(chǔ)上新增了失敗以后的操作,邏輯比較簡單,直接來看代碼:

@Override
public void register(URL url) {
    // 父類注冊方法,保存url到已注冊列表
    super.register(url);
    // 將url從注冊失敗列表中剔除
    removeFailedRegistered(url);
    // 將url從注銷失敗列表中剔除
    removeFailedUnregistered(url);
    try {
        // 執(zhí)行模板方法邏輯
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果啟動檢測開關(guān)開啟(默認(rèn)開啟),失敗會直接拋異常,否則加入失敗列表,用于重試
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 加入注冊失敗列表,用于重試
        addFailedRegistered(url);
    }
}
2.2.2.2、數(shù)據(jù)注銷-unRegister

注銷邏輯與注冊類似,直接來看代碼:

@Override
public void unregister(URL url) {
    // 父類注銷邏輯
    super.unregister(url);
    // 從注冊失敗緩存中刪除
    removeFailedRegistered(url);
    // 從注銷失敗列表中刪除
    removeFailedUnregistered(url);
    try {
        // 執(zhí)行模板方法
        doUnregister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果啟動檢測開關(guān)開啟(默認(rèn)開啟),失敗會直接拋異常,否則加入失敗列表,用于重試
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        //加入失敗重試緩存
        addFailedUnregistered(url);
    }
}
2.2.2.3、數(shù)據(jù)訂閱-subscribe

與注冊、注銷邏輯類似,數(shù)據(jù)訂閱邏輯也非常簡單,代碼就不再展示了

2.2.2.4、取消數(shù)據(jù)訂閱-unSubscribe

與注冊、注銷邏輯類似,不做過多解析。

2.3、注冊中心實現(xiàn)

好了,前面的鋪墊結(jié)束了,本節(jié)來看具體的注冊中心實現(xiàn),下面按照DubboRegistry、MulticastRegistry、RedisRegistry、ZookeeperRegistry的順序依次分析。

2.3.1、DubboRegistry

DubboRegistry作為dubbo的默認(rèn)注冊中心實現(xiàn)(RegistryFactory默認(rèn)SPI實現(xiàn)是DubboRegistryFactory),嚴(yán)格意義上來講,DubboRegistry實際上是一個Registry代理,核心邏輯全部由代理也即registryService實現(xiàn)。DubboRegistry的創(chuàng)建在DubboRegistryFactory中已經(jīng)做了解析,這里不做過多說明。重點關(guān)注DubboRegistry的構(gòu)造方法

public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
    super(registryInvoker.getUrl());
    this.registryInvoker = registryInvoker;
    // Registry代理,核心邏輯借助registryService實現(xiàn)
    this.registryService = registryService;
    // 重連定時器,默認(rèn)重連間隔時間3s
    this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
    // 初始化調(diào)度任務(wù)邏輯,具體邏輯參考recover方法
    reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
        try {
            // 重連邏輯
            connect();
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
        }
    }, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}

介紹FailbackRegistry時說過,所有的注冊中心實現(xiàn)都支持自動恢復(fù),DubboRegistry的自動恢復(fù)實現(xiàn)原理是若當(dāng)前注冊中心不可用,則直接將該URL注冊信息加入到注冊失敗、訂閱失敗緩存(借助父類FailbackRegistry的addFailedRegistered、addFailedSubscribed方法實現(xiàn)),由父類的HashedWheelTimer重新調(diào)度,進(jìn)行恢復(fù),來看失敗后加入緩存的邏輯(外層方法是connect,內(nèi)部實際上執(zhí)行的recover方法):

// 核心邏輯:注冊失敗、訂閱失敗的url分別放入對應(yīng)列表,用于hashedWheelTimer調(diào)度
@Override
protected void recover() throws Exception {
    // register
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        // 放入注冊失敗列表
        for (URL url : recoverRegistered) {
            addFailedRegistered(url);
        }
    }
    // subscribe
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}

2.3.2、MulticastRegistry

MulticastRegistry即多播注冊中心,顧名思義,采用多播實現(xiàn);注冊、注銷、訂閱、取消訂閱等均通過多播方式實現(xiàn)。核心邏輯在構(gòu)造方法,包括多播組的創(chuàng)建以及多播消息的處理;創(chuàng)建多播組比較容易理解,重點關(guān)注多播消息的處理,在構(gòu)造方法中創(chuàng)建并啟動一個守護(hù)線程,用于接收并處理多播消息(注冊消息、注銷消息、訂閱消息),處理多播消息的入口是receive方法。

2.3.2.1、構(gòu)造方法

先來看構(gòu)造方法:

// 創(chuàng)建并啟動daemon線程,用于接收廣播消息,對接收到的消息處理邏輯在receive方法
public MulticastRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    try {
        // 創(chuàng)建并加入多播組
        multicastAddress = InetAddress.getByName(url.getHost());
        checkMulticastAddress(multicastAddress);
        multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
        multicastSocket = new MulticastSocket(multicastPort);
        // 注冊中心URL地址加入多播組
        NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
        // 啟動daemon線程,用于接收廣播socket消息
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                byte[] buf = new byte[2048];
                // UDP包封裝
                DatagramPacket recv = new DatagramPacket(buf, buf.length);
                while (!multicastSocket.isClosed()) {
                    try {
                        // 接收UDP報文
                        multicastSocket.receive(recv);
                        String msg = new String(recv.getData()).trim();
                        int i = msg.indexOf('\n');
                        if (i > 0) {
                            msg = msg.substring(0, i).trim();
                        }
                        // 多播消息接收處理
                        MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                        Arrays.fill(buf, (byte) 0);
                    } catch (Throwable e) {
                        if (!multicastSocket.isClosed()) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        }, "DubboMulticastRegistryReceiver");
        thread.setDaemon(true);
        thread.start();
    } catch (IOException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    // 這里利用定時調(diào)度線程池,定時清理received緩存中不可用socket;默認(rèn)清理時間間隔60s;
    this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    if (url.getParameter("clean", true)) {
        this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    clean(); // Remove the expired
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
                }
            }
        }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
    } else {
        this.cleanFuture = null;
    }
}

receive方法實現(xiàn)比較簡單,根據(jù)msg類型(通過消息前綴判斷)執(zhí)行相應(yīng)的注冊、注銷、訂閱操作;重點關(guān)注registered、unregistered、multicast三個方法(MulticastRegistry的注冊、注銷、訂閱邏輯均通過這三個方法實現(xiàn))。

private void receive(String msg, InetSocketAddress remoteAddress) {
    if (logger.isInfoEnabled()) {
        logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
    }
    // 廣播注冊消息
    if (msg.startsWith(Constants.REGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
        registered(url);
    // 廣播注銷消息
    } else if (msg.startsWith(Constants.UNREGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
        unregistered(url);
    // 廣播訂閱消息
    } else if (msg.startsWith(Constants.SUBSCRIBE)) {
        URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
        // 根據(jù)注冊的地址,發(fā)送單播、多播消息
        Set<URL> urls = getRegistered();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
                    // 發(fā)送單播,多播消息
                    if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
                            && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
                        unicast(Constants.REGISTER + " " + u.toFullString(), host);
                    } else {
                        multicast(Constants.REGISTER + " " + u.toFullString());
                    }
                }
            }
        }
    }/* else if (msg.startsWith(UNSUBSCRIBE)) {
    }*/
}

先來看registered方法,核心邏輯是將URL與subscried緩存中的URL進(jìn)行匹配,匹配成功則加入received,同時同步給訂閱者(通過NotifyListener的notify方法),然后通知當(dāng)前l(fā)istener(在subscribe時wait*)

protected void registered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls == null) {
                received.putIfAbsent(key, new ConcurrentHashSet<URL>());
                urls = received.get(key);
            }
            urls.add(url);
            List<URL> list = toList(urls);
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
                synchronized (listener) {
                    listener.notify();
                }
            }
        }
    }
}

再來看unRegistered,邏輯上大體與registered類似,將多播消息中的URL與subscried中URL匹配,匹配成功則將該URL從received中剔除;若當(dāng)前received中該URL對應(yīng)URL列表為空,則重置該URL的protocol為empty;最后同步變更給訂閱者。

protected void unregistered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls != null) {
                urls.remove(url);
            }
            // received中url對應(yīng)URL列表為空,則直接重置該url協(xié)議為empty
            if (urls == null || urls.isEmpty()) {
                if (urls == null) {
                    urls = new ConcurrentHashSet<URL>();
                }
                URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
                urls.add(empty);
            }
            List<URL> list = toList(urls);
            // 同步變更消息到各訂閱者
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
            }
        }
    }
}

最后來看multicast方法,邏輯比較簡單,即為發(fā)送多播消息到多播消息組(多播消息由構(gòu)造方法中創(chuàng)建的線程異步處理

private void multicast(String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
        multicastSocket.send(hi);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
2.3.2.2、其他邏輯

MulticastRegistry的其他邏輯包括doRegister、doUnregister、doSubscribe、doUnsubscribe、destroy,前面四個方法的實現(xiàn)方式完全一致,即拼接并發(fā)送多播消息到多播組,這里以doRegister為例,代碼如下:

@Override
public void doRegister(URL url) {
    multicast(Constants.REGISTER + " " + url.toFullString());
}

最后來看destroy,銷毀需要處理多播組、關(guān)閉所有線程池、關(guān)閉所有socket,來看代碼:

@Override
public void destroy() {
    super.destroy();
    try {
        // 取消定時清理任務(wù)
        ExecutorUtil.cancelScheduledFuture(cleanFuture);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 退出多播組,并關(guān)閉socket
        multicastSocket.leaveGroup(multicastAddress);
        multicastSocket.close();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    // 關(guān)閉定時清理線程池,這里關(guān)注下線程池的優(yōu)雅關(guān)閉
    ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}

來看下dubbo線程池的優(yōu)雅關(guān)閉:

public static void gracefulShutdown(Executor executor, int timeout) {
    if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
        return;
    }
    final ExecutorService es = (ExecutorService) executor;
    try {
        // 新任務(wù)不能再提交
        es.shutdown();
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    try {
        // 等待隊列內(nèi)剩余任務(wù)結(jié)束,立即關(guān)閉線程池
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    // 若線程池仍未關(guān)閉,則新建線程用于線程池的關(guān)閉
    if (!isTerminated(es)) {
        newThreadToCloseExecutor(es);
    }
}
// 專門用于關(guān)閉線程池的線程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
        if (!isTerminated(es)) {
            shutdownExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 1000; i++) {
                            es.shutdownNow();
                            if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                                break;
                            }
                        }
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            });
        }
    }

了解完MulticastRegistry的所有方法,我們來看整個MulticastRegistry中的數(shù)據(jù)流,執(zhí)行注冊操作(這里以register為例,其他操作類似)將數(shù)據(jù)多播至多播組,然后由deamon線程異步將數(shù)據(jù)同步至訂閱者,如下圖所示:

MulticastRegistry-dataflow (1).jpg

2.3.3、RedisRegistry

RedisRegistry,即使用Redis存儲URL數(shù)據(jù)的注冊中心,這里從初始化、數(shù)據(jù)流以及核心方法等幾個方面進(jìn)行解析。RedisRegisty除了使用Redis緩存之外,還使用了Redis的消息隊列,用于doSubscribe過程中的URL數(shù)據(jù)變更消息處理。

2.3.3.1、初始化

先來看RedisRegistry的初始化,大致可以分為父類構(gòu)造方法、注冊中心參數(shù)初始化、RedisPool連接池創(chuàng)建、過期調(diào)度線程池啟動。父類構(gòu)造方法主要是AbstractRegistry構(gòu)造方法的調(diào)用;參數(shù)初始化主要包括RedisPool連接池參數(shù)初始化、注冊中心重連時間間隔初始化、過期線程池調(diào)度時間間隔初始化等;RedisPool的創(chuàng)建比較簡單,即直接取上一步中的參數(shù)創(chuàng)建RedisPool連接池(這里需要注意,會對同一個URL的多個backup地址單獨創(chuàng)建RedisPool,多個地址之間進(jìn)行了隔離,保證URL的可用性);過期調(diào)度線程池的調(diào)度比較容易理解,也就是說在注冊中心構(gòu)造過程中已經(jīng)啟動了對過期URL數(shù)據(jù)的定時清理,默認(rèn)調(diào)度間隔30s。

RedisRegistry-init.jpg

然后來看構(gòu)造方法:

public RedisRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 借用對象池管理配置
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    // config參數(shù)配置,略去

    // 支持的redis集群模式,failover和replicate
    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    replicate = "replicate".equals(cluster);

    List<String> addresses = new ArrayList<>();
    addresses.add(url.getAddress());
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (ArrayUtils.isNotEmpty(backups)) {
        addresses.addAll(Arrays.asList(backups));
    }

    for (String address : addresses) {
        int i = address.indexOf(':');
        String host;
        int port;
        if (i > 0) {
            host = address.substring(0, i);
            port = Integer.parseInt(address.substring(i + 1));
        } else {
            host = address;
            port = DEFAULT_REDIS_PORT;
        }
        // 每個地址對應(yīng)一個jedis連接池,URL的各地址之間互不影響,保證可用性
        this.jedisPools.put(address, new JedisPool(config, host, port,
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0)));
    }

    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    // group = "/group/"
    this.root = group;

    // 配置過期定時調(diào)度
    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
        try {
            // 延遲過期
            deferExpired(); // Extend the expiration time
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
        }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

// 延遲過期
private void deferExpired() {
        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
            JedisPool jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 延長緩存過期時間,并發(fā)布隊列消息
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                jedis.publish(key, Constants.REGISTER);
                            }
                        }
                    }
                    // 如果開啟了強(qiáng)制清理開關(guān);則直接清理redis中數(shù)據(jù),并發(fā)布注銷消息
                    if (admin) {
                        clean(jedis);
                    }
                    // 無需創(chuàng)建副本,則只寫單臺redis,直接break;
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }

再來看整個RedisRegistry中的數(shù)據(jù)流,可以發(fā)現(xiàn),RedisRegistry通過Redis緩存與Redis消息隊列,異步+同步的方式將數(shù)據(jù)同步至本地cache文件、內(nèi)存緩存Properties以及具體的數(shù)據(jù)訂閱者,如RegistryDirectory:


RedisRegistry-dataflow (1).jpg
2.3.3.2、注冊(doRegister)

了解完RedisRegistry中的數(shù)據(jù)流,再來看注冊過程就比較容易理解了。主要包括兩個核心操作:將當(dāng)前URL緩存至Redis;然后將URL信息發(fā)布至Redis消息隊列。直接來看代碼:

public void doRegister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 當(dāng)前URL的所有可用地址,遍歷,存入redis,并發(fā)布注冊消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hset(key, value, expire);
                jedis.publish(key, Constants.REGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.3、注銷(doUnregister)

注銷邏輯即注冊邏輯的反向操作,先刪除redis緩存,再發(fā)布注銷消息:

public void doUnregister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    // 遍歷當(dāng)前URL的所有可用節(jié)點地址,遍歷從redis中刪除,并發(fā)布注銷消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hdel(key, value);
                jedis.publish(key, Constants.UNREGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.4、訂閱(doSubscribe)

訂閱邏輯稍微復(fù)雜,支持同步和異步方式將URL信息同步至訂閱者,如RegistryDirectory;同步邏輯比較簡單,將被訂閱的URL與redis中緩存URL進(jìn)行交叉過濾,最終通過父類notify方法(AbstractRegistry的notify方法),完成URL數(shù)據(jù)同步;異步邏輯由Notifier線程實現(xiàn),Notifier依照線性退避規(guī)則執(zhí)行,執(zhí)行邏輯除了同步URL信息之外,還會訂閱redis消息隊列并消費隊列中消息,當(dāng)然,消費的核心邏輯也是執(zhí)行doNotify方法。

public void doSubscribe(final URL url, final NotifyListener listener) {
    String service = toServicePath(url);
    // 每個service對應(yīng)一個notifier線程
    Notifier notifier = notifiers.get(service);
    // 異步方式,創(chuàng)建Notifier線程,并啟動,線性回避執(zhí)行,數(shù)據(jù)流 : redis緩存、redis消息隊列 -> 訂閱者
    if (notifier == null) {
        Notifier newNotifier = new Notifier(service);
        notifiers.putIfAbsent(service, newNotifier);
        notifier = notifiers.get(service);
        // notifier線程創(chuàng)建后即啟動
        if (notifier == newNotifier) {
            notifier.start();
        }
    }
    boolean success = false;
    RpcException exception = null;
    // 同步方式訂閱 數(shù)據(jù)流 redis緩存 -> 訂閱者,如RegistryDirector
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                // 區(qū)分所有服務(wù)與單個服務(wù)。
                if (service.endsWith(Constants.ANY_VALUE)) {
                    admin = true;
                    Set<String> keys = jedis.keys(service);
                    if (CollectionUtils.isNotEmpty(keys)) {
                        Map<String, Set<String>> serviceKeys = new HashMap<>();
                        for (String key : keys) {
                            String serviceKey = toServicePath(key);
                            Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                            sk.add(key);
                        }
                        for (Set<String> sk : serviceKeys.values()) {
                            doNotify(jedis, sk, url, Collections.singletonList(listener));
                        }
                    }
                } else {
                    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
                }
                success = true;
                break; // Just read one server's data
            }
        } catch (Throwable t) { // Try the next server
            exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    // 異常處理邏輯略去
}

來看doNotify實現(xiàn)

private void doNotify(Jedis jedis, String key) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
        doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
    }
}

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!Constants.ANY_VALUE.equals(consumerService)) {
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // redis緩存URL信息與內(nèi)存中被訂閱的URL信息進(jìn)行交叉過濾
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    if (!u.getParameter(Constants.DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            // 與url匹配的緩存URL放入待通知列表
                            urls.add(u);
                        }
                    }
                }
            }
            // 若無有效URL,則填充urls地址為任意值*
            if (urls.isEmpty()) {
                urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
                        .setAddress(Constants.ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(Constants.CATEGORY_KEY, category));
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
            // 由父類nofity方法完成最終URL數(shù)據(jù)的同步
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }

再來看Notifier實現(xiàn)

private class Notifier extends Thread {

    private final String service;
    private final AtomicInteger connectSkip = new AtomicInteger();
    private final AtomicInteger connectSkipped = new AtomicInteger();
    private volatile Jedis jedis;
    private volatile boolean first = true;
    private volatile boolean running = true;
    private volatile int connectRandom;

    public Notifier(String service) {
        super.setDaemon(true);
        super.setName("DubboRedisSubscribe");
        this.service = service;
    }

    private void resetSkip() {
        connectSkip.set(0);
        connectSkipped.set(0);
        connectRandom = 0;
    }

    // 首次結(jié)果為false;線性退避算法
    private boolean isSkip() {
        // 初始值均為0
        int skip = connectSkip.get(); // Growth of skipping times
        if (skip >= 10) { 
            if (connectRandom == 0) {
                connectRandom = ThreadLocalRandom.current().nextInt(10);
            }
            skip = 10 + connectRandom;
        }
        // 初始值 false,
        // 第一次:false,0-1
        // 第二次:true,1-1
        // 第三次:false,0-2
        // 第四次:true,1-2
        // 第五次:true,2-2
        // 第五次:false,0-3
        if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
            return true;
        }

        // skip 自增
        connectSkip.incrementAndGet();
        // skiped重置0
        connectSkipped.set(0);
        connectRandom = 0;
        return false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                // 線性回避,是否跳過
                if (!isSkip()) {
                    try {
                        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
                            JedisPool jedisPool = entry.getValue();
                            try {
                                jedis = jedisPool.getResource();
                                try {
                                    // service是具體類型還是所有服務(wù)
                                    if (service.endsWith(Constants.ANY_VALUE)) {
                                        if (!first) {
                                            first = false;
                                            Set<String> keys = jedis.keys(service);
                                            if (CollectionUtils.isNotEmpty(keys)) {
                                                for (String s : keys) {
                                                    //  同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
                                                    doNotify(jedis, s);
                                                }
                                            }
                                            // 同步完之后,重置skip
                                            resetSkip();
                                        }
                                        // 處理Redis消息隊列中所有與service匹配的消息
                                        jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                    } else {
                                        if (!first) {
                                            first = false;
                                            //  同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
                                            doNotify(jedis, service);
                                            resetSkip();
                                        }
                                        // 處理Redis消息隊列中所有與service匹配的消息,同樣的同步至各數(shù)據(jù)訂閱者以及本地cache文件。
                                        jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
                                    }
                                    break;
                                } finally {
                                    jedis.close();
                                }
                            } catch (Throwable t) { // Retry another server
                                logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                sleep(reconnectPeriod);
                            }
                        }
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                        sleep(reconnectPeriod);
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    }

    public void shutdown() {
        try {
            running = false;
            jedis.disconnect();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}

這里順便提一下NotifySub,繼承自JedisPubSub,用于redis消息隊列中消息的消費,核心邏輯在onMessage方法:

@Override
public void onMessage(String key, String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("redis event: " + key + " = " + msg);
    }
    // 只處理注冊、注銷類消息
    if (msg.equals(Constants.REGISTER)
            || msg.equals(Constants.UNREGISTER)) {
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                doNotify(jedis, key);
            } finally {
                jedis.close();
            }
        } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
            logger.error(t.getMessage(), t);
        }
    }
}

2.3.4、ZookeeperRegistry

ZookeeperRegistry,即zk注冊中心,也是dubbo官方默認(rèn)采用的注冊中心。先來看構(gòu)造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 初始化zkClient
    zkClient = zookeeperTransporter.connect(url);
    // 若是重連狀態(tài),執(zhí)行恢復(fù)邏輯,即將已注冊過的URL,放入retry列表,重新注冊;
    zkClient.addStateListener(state -> {
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    });
}

構(gòu)造方法里有一個參數(shù),ZookeeperTransporter,先來看一下這個ZookeeperTransporter。

2.3.4.1、ZookeeperTransporter

ZookeeperTransporter是dubbo對zk客戶端的適配接口,支持SPI(方法級),內(nèi)部只有一個connect方法,返回ZookeeperClient實例。我們知道,zk客戶端有常用的兩個實現(xiàn),分別是:

  1. Curator,Netflix公司開源的一套zookeeper客戶端框架,jar包gav如下:

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.1</version>
    </dependency>
    
  2. Zkclient,Github上一個開源的Zookeeper客戶端,在Zookeeper原生 API接口之上進(jìn)行了包裝,是一個更加易用的Zookeeper客戶端,jar包gav如下:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.2</version>
    </dependency>
    

    dubbo中借助這兩種客戶端組件,ZookeeperClient的實現(xiàn)有CuratorZookeeperClient、ZkclientZookeeperClient,內(nèi)部通過客戶端組件完成zk的連接、監(jiān)聽等動作,具體邏輯這里不做詳細(xì)解析。繼續(xù)來看ZookeeperTransporter,基于接口的基類實現(xiàn)AbstractZookeeperTransporter,實現(xiàn)了connect方法,同時定義模板方法createZookeeperClient由具體的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter,邏輯比較簡單,省略)實現(xiàn)。直接來看基類的connect方法:

    @Override
    public ZookeeperClient connect(URL url) {
        ZookeeperClient zookeeperClient;
        // backUrl解析
        List<String> addressList = getURLBackupAddress(url);
        // The field define the zookeeper server , including protocol, host, port, username, password
        // fetchAndUpdateZookeeperClientCache 邏輯比較簡單,從緩存里取,取不到則返回null;
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        // avoid creating too many connections, so add lock,加鎖,防并發(fā)。
        synchronized (zookeeperClientMap) {
            if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                logger.info("find valid zookeeper client from the cache for address: " + url);
                return zookeeperClient;
            }
            // 緩存沒取到,創(chuàng)建zkClient,并緩存到map
            zookeeperClient = createZookeeperClient(toClientURL(url));
            logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
            writeToClientMap(addressList, zookeeperClient);
        }
        return zookeeperClient;
    }
    
2.3.4.2、doRegister方法

基類FailbackRegistry的模板方法實現(xiàn),邏輯非常簡單:

@Override
public void doRegister(URL url) {
    try {
        // 創(chuàng)建zk臨時節(jié)點,節(jié)點目錄類似 /dubbo/xxx/xxx/xxx.xxx.xxxService
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.3、doUnregister方法

基類FailbackRegistry的模板方法實現(xiàn),邏輯同樣非常簡單:

@Override
public void doUnregister(URL url) {
    try {
        // 刪除zk節(jié)點
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.4、doSubscribe方法

基類FailbackRegistry的模板方法實現(xiàn),邏輯相對復(fù)雜,主要分為幾部分1)zkListener的初始化,2)遞歸訂閱url變更信息,創(chuàng)建zk永久節(jié)點,3)執(zhí)行父類notify(邏輯參考AbstractRegistry解析部分),代碼如下:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 無指定服務(wù)
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 根節(jié)點/dubbo
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                listeners = zkListeners.get(url);
            }
            // 初始化zkListener
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                // 初始化anyServices 注冊 ChildListener到listener內(nèi)存緩存,訂閱childChange變更,即當(dāng)執(zhí)行childChanged時,完成對所有child的訂閱。
                listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            // 遞歸訂閱url變更消息,最終會走到else,結(jié)束
                            subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            // 創(chuàng)建永久節(jié)點/dubbo
            zkClient.create(root, false);
            // 訂閱URL變更信息
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 遞歸訂閱url變更消息,最終會走到else,結(jié)束
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            //指定URL變更處理
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    // childChange事件,只做notify到指定listener
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                // 監(jiān)聽該path
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 父類AbstractRegistry的notify邏輯,略去
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.5、doUnsubscribe方法

基類FailbackRegistry的模板方法實現(xiàn),直接上代碼:

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners != null) {
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // 邏輯比較簡單,即不再訂閱對應(yīng)的listener
                zkClient.removeChildListener(root, zkListener);
            } else {
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}
2.3.4.6、lookup方法

基類AbstractRegistry的模板方法實現(xiàn),核心邏輯是從zk中查詢指定URL對應(yīng)地址的所有可用URL(不同Category),直接來看代碼:

@Override
public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List<String> providers = new ArrayList<>();
        for (String path : toCategoriesPath(url)) {
            List<String> children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

小結(jié)

本文重點解析了dubbo中Registry的相關(guān)核心實現(xiàn),從RegistryFactory到Registry,dubbo抽象了一系列注冊中心,為用戶自定義擴(kuò)展提供了非常優(yōu)良的入口;同時,dubbo為我們提供了基于緩存、多播、zk等的注冊中心實現(xiàn),非常方便,不得不感嘆設(shè)計的非常好。

注:源碼版本 2.7.1。8月以來,接連經(jīng)歷了迎接小生命的驚喜,跳槽后的適應(yīng)期,中間耽誤了挺長一段時間,后面我會努力持續(xù)更新的,come on。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容