源碼閱讀:分布式服務(wù)注冊中心XXL-REGISTRY(基于1.0.2)

前言
因為公司最近項目原因正好用到了《分布式任務(wù)調(diào)度平臺XXL-JOB》,項目結(jié)束打算看看他的源碼,發(fā)現(xiàn)他還依賴于 《分布式服務(wù)框架XXL-RPC》,于是我決定先看XXL-RPC。但當(dāng)我正準(zhǔn)備看的時候,我發(fā)現(xiàn)XXL-RPC依賴于 《分布式服務(wù)注冊中心XXL-REGISTRY》,于是我決定先看XXL-REGISTRY

讓我們先看看它自己怎么吹自己的

[1.1 概述]

XXL-REGISTRY 是一個輕量級分布式服務(wù)注冊中心,擁有"輕量級、秒級注冊上線、多環(huán)境、跨語言、跨機房"等特性?,F(xiàn)已開放源代碼,開箱即用。

[1.2 特性]

  • 1、輕量級:基于DB與磁盤文件,只需要提供一個DB實例即可,無第三方依賴;
  • 2、實時性:借助內(nèi)部廣播機制,新服務(wù)上線、下線,可以在1s內(nèi)推送給客戶端;
  • 3、數(shù)據(jù)同步:注冊中心會定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù),確保服務(wù)數(shù)據(jù)實時可用;
  • 4、性能:服務(wù)發(fā)現(xiàn)時僅讀磁盤文件,性能非常高;服務(wù)注冊、摘除時通過磁盤文件校驗,防止重復(fù)注冊操作;
  • 5、擴展性:可方便、快速的橫向擴展,只需保證服務(wù)注冊中心配置一致即可,可借助負(fù)載均衡組件如Nginx快速集群部署;
  • 6、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
    • 正常狀態(tài)=支持動態(tài)注冊、發(fā)現(xiàn),服務(wù)注冊信息實時更新;
    • 鎖定狀態(tài)=人工維護注冊信息,服務(wù)注冊信息固定不變;
    • 禁用狀態(tài)=禁止使用,服務(wù)注冊信息固定為空;
  • 7、跨語言:注冊中心提供HTTP接口(RESTFUL 格式)供客戶端實用,語言無關(guān),通用性更強;
  • 8、兼容性:項目立項之初是為XXL-RPC量身設(shè)計,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊實用,如dubbo、springboot等;
  • 9、跨機房:得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的配置服務(wù);因此,異地跨機房部署時,只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
  • 10、容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn) "服務(wù)注冊中心" 產(chǎn)品開箱即用;
  • 11、訪問令牌(accessToken):為提升系統(tǒng)安全性,注冊中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;

屁話不多,先把源碼下下來,創(chuàng)建數(shù)據(jù)庫,跑起來看看



界面簡介,貌似可以手動注冊服務(wù),先注冊一個玩一下。隨便填寫一下信息,保存

可以查看剛剛的注冊信息

點擊運行報表tab看到我剛剛注冊的服務(wù)信息。

回到服務(wù)注冊,再點擊查看,發(fā)現(xiàn)我剛剛注冊的地址沒了,做下猜測:

因為我的注冊服務(wù)之后沒有和注冊中心有任何互動,被注冊中心自動下線了

界面大概就是這樣,我們看看官方的使用說明

客戶端maven依賴地址:

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-registry-client</artifactId>
    <version>${最新穩(wěn)定版}</version>
</dependency>

客戶端API實用示例代碼如下:

// 注冊中心客戶端(基礎(chǔ)類)
XxlRegistryBaseClient registryClient = new XxlRegistryBaseClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");

// 注冊中心客戶端(增強類)
XxlRegistryClient registryClient = new XxlRegistryClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");


// 服務(wù)注冊 & 續(xù)約:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));

registryClient.registry(registryDataList);


// 服務(wù)摘除:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));

registryClient.remove(registryDataList);


// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");

Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);


// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");

registryClient.monitor(keys);

ok,那就從客戶端代碼入手。
首先需要創(chuàng)建一個registryClient ,先從XxlRegistryBaseClient 開始。看看他的構(gòu)造函數(shù)。

public XxlRegistryBaseClient(String adminAddress, String accessToken, String biz, String env) {
        this.adminAddress = adminAddress;
        this.accessToken = accessToken;
        this.biz = biz;
        this.env = env;

        // valid
        if (adminAddress==null || adminAddress.trim().length()==0) {
            throw new RuntimeException("xxl-registry adminAddress empty");
        }
        if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
            throw new RuntimeException("xxl-registry biz empty Invalid[4~255]");
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            throw new RuntimeException("xxl-registry biz env Invalid[2~255]");
        }

        // parse
        adminAddressArr = new ArrayList<>();
        if (adminAddress.contains(",")) {
            adminAddressArr.addAll(Arrays.asList(adminAddress.split(",")));
        } else {
            adminAddressArr.add(adminAddress);
        }

    }

就是一些簡單的校驗和設(shè)值,沒什么看頭。接下來看看服務(wù)的注冊和續(xù)約。

 /**
     * registry
     *
     * @param registryDataList
     * @return
     */
    public boolean registry(List<XxlRegistryDataParamVO> registryDataList){

        // valid
        if (registryDataList==null || registryDataList.size()==0) {
            throw new RuntimeException("xxl-registry registryDataList empty");
        }
        for (XxlRegistryDataParamVO registryParam: registryDataList) {
            if (registryParam.getKey()==null || registryParam.getKey().trim().length()<4 || registryParam.getKey().trim().length()>255) {
                throw new RuntimeException("xxl-registry registryDataList#key Invalid[4~255]");
            }
            if (registryParam.getValue()==null || registryParam.getValue().trim().length()<4 || registryParam.getValue().trim().length()>255) {
                throw new RuntimeException("xxl-registry registryDataList#value Invalid[4~255]");
            }
        }

        // pathUrl
        String pathUrl = "/api/registry";

        // param
        XxlRegistryParamVO registryParamVO = new XxlRegistryParamVO();
        registryParamVO.setAccessToken(this.accessToken);
        registryParamVO.setBiz(this.biz);
        registryParamVO.setEnv(this.env);
        registryParamVO.setRegistryDataList(registryDataList);

        String paramsJson = BasicJson.toJson(registryParamVO);

        // result
        Map<String, Object> respObj = requestAndValid(pathUrl, paramsJson, 5);
        return respObj!=null?true:false;
    }

根據(jù)下方requestAndValid代碼可知,這是一個發(fā)送http請求的代碼,那么registry方法,除了一些校驗,其實就是向服務(wù)器發(fā)送了一個httppost請求。

private Map<String, Object> requestAndValid(String pathUrl, String requestBody, int timeout){

        for (String adminAddressUrl: adminAddressArr) {
            String finalUrl = adminAddressUrl + pathUrl;

            // request
            String responseData = BasicHttpUtil.postBody(finalUrl, requestBody, timeout);
            if (responseData == null) {
                return null;
            }

            // parse resopnse
            Map<String, Object> resopnseMap = null;
            try {
                resopnseMap = BasicJson.parseMap(responseData);
            } catch (Exception e) { }


            // valid resopnse
            if (resopnseMap==null
                    || !resopnseMap.containsKey("code")
                    || !"200".equals(String.valueOf(resopnseMap.get("code")))
                    ) {
                logger.warn("XxlRegistryBaseClient response fail, responseData={}", responseData);
                return null;
            }

            return resopnseMap;
        }


        return null;
    }

那么直接看看注冊中心源碼的這個 /api/registry是干什么的吧
跳過controller的一些校驗,直接進入service

@Override
public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {

       // valid
       if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
       }
       if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
       }
       if (env==null || env.trim().length()<2 || env.trim().length()>255) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
       }
       if (registryDataList==null || registryDataList.size()==0) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
       }
       for (XxlRegistryData registryData: registryDataList) {
           if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
               return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
           }
           if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
               return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
           }
       }

       // fill + add queue
       for (XxlRegistryData registryData: registryDataList) {
           registryData.setBiz(biz);
           registryData.setEnv(env);
       }
       registryQueue.addAll(registryDataList);

       return ReturnT.SUCCESS;
   }

除了一些校驗,最關(guān)鍵的出現(xiàn)了!

registryQueue.addAll(registryDataList);

注冊數(shù)據(jù)被塞進了一個隊列里面??纯催@個隊列的定義:

private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();

也就是說,我們注冊(續(xù)約)服務(wù)的時候,只是把信息放入了一個隊列。那么稍微動動 腦子就知道,當(dāng)出隊的時候,就是真正處理數(shù)據(jù)的時候。我們用IDE搜一搜這個隊列的相關(guān)操作代碼看看。
搜索結(jié)果激動人心,只有一個地方用了take,一個地方用了addAll(就是上面提到的),其他沒有任何調(diào)用。
那就直接看看take出來數(shù)據(jù)做了什么事情吧。

ps:為了方便理解代碼,先分看下數(shù)據(jù)庫
在xxl_registry表中,biz,env,key一起構(gòu)成了唯一主鍵,所有服務(wù)的值在data中維護成數(shù)組的形式存在

xxl_registry

在xxl_registry_data表中,biz, env, key, value構(gòu)成了唯一主鍵,每條數(shù)據(jù)代表了一個服務(wù)的注冊信息,updateTime表示服務(wù)注冊的時間。
xxl_registry_data

消息表,記錄服務(wù)變化信息
xxl_registry_message

經(jīng)驗老道的程序員已經(jīng)發(fā)現(xiàn)afterPropertiesSet這個方法。
afterPropertiesSet中開了10個線程,每個線程做的事情就是:

  1. registryQueue里面取出一條服務(wù)注冊數(shù)據(jù)
  2. 向數(shù)據(jù)庫更新或新增一條xxl_registry_data數(shù)據(jù)(這個表主要用來記錄某個服務(wù)最后的注冊(續(xù)約)時間)
  3. 從磁盤讀取記錄該服務(wù)的文件數(shù)據(jù)
    3.1. 如果磁盤讀取不到相應(yīng)文件,則進入checkRegistryDataAndSendMessage方法,把xxl_registry_data中所有的value組成json array,對比 xxl_registry中的address看是否一致。
    3.1.1. xxl_registry中不存在相應(yīng)服務(wù)信息,則新增一條服務(wù)數(shù)據(jù)
    3.1.2. xxl_registry中存在相應(yīng)服務(wù),但服務(wù)address和jsonArray不一致,則更新xxl_registry數(shù)據(jù)使其和xxl_registry_data中的數(shù)據(jù)一致
    3.1.3. 當(dāng)數(shù)據(jù)不一致時,會向xxl_registry_message中插入一條消息數(shù)據(jù)
    3.2. 讀取到了相應(yīng)服務(wù)的信息,但status不是0,則不做任何操作
    3.3. 讀取到了相應(yīng)服務(wù)的信息,且包含了剛剛?cè)〕鰜淼姆?wù)注冊數(shù)據(jù),則不做任何操作
    @Override
    public void afterPropertiesSet() throws Exception {

        // valid
        if (registryDataFilePath==null || registryDataFilePath.trim().length()==0) {
            throw new RuntimeException("xxl-registry, registryDataFilePath empty.");
        }

        /**
         * registry registry data         (client-num/10 s)
         */
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while (!executorStoped) {
                        try {
                            XxlRegistryData xxlRegistryData = registryQueue.take();
                            if (xxlRegistryData !=null) {

                                // refresh or add
                                int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
                                if (ret == 0) {
                                    xxlRegistryDataDao.add(xxlRegistryData);
                                }

                                // valid file status
                                XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
                                if (fileXxlRegistry == null) {
                                    // go on
                                } else if (fileXxlRegistry.getStatus() != 0) {
                                    continue;     // "Status limited."
                                } else {
                                    if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
                                        continue;     // "Repeated limited."
                                    }
                                }

                                // checkRegistryDataAndSendMessage
                                checkRegistryDataAndSendMessage(xxlRegistryData);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            });
        }
    }
     /**
     * update Registry And Message
     */
    private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){
        // data json
        List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
        List<String> valueList = new ArrayList<>();
        if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
            for (XxlRegistryData dataItem: xxlRegistryDataList) {
                valueList.add(dataItem.getValue());
            }
        }
        String dataJson = JacksonUtil.writeValueAsString(valueList);

        // update registry and message
        XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
        boolean needMessage = false;
        if (xxlRegistry == null) {
            xxlRegistry = new XxlRegistry();
            xxlRegistry.setBiz(xxlRegistryData.getBiz());
            xxlRegistry.setEnv(xxlRegistryData.getEnv());
            xxlRegistry.setKey(xxlRegistryData.getKey());
            xxlRegistry.setData(dataJson);
            xxlRegistryDao.add(xxlRegistry);
            needMessage = true;
        } else {

            // check status, locked and disabled not use
            if (xxlRegistry.getStatus() != 0) {
                return;
            }

            if (!xxlRegistry.getData().equals(dataJson)) {
                xxlRegistry.setData(dataJson);
                xxlRegistryDao.update(xxlRegistry);
                needMessage = true;
            }
        }

        if (needMessage) {
            // sendRegistryDataUpdateMessage (registry update)
            sendRegistryDataUpdateMessage(xxlRegistry);
        }

    }

總結(jié)一下,服務(wù)注冊的大致流程:

  1. 通過httppost請求,客戶端把服務(wù)注冊信息塞入注冊中心的注冊隊列
  2. 注冊中心后臺有10個線程會從注冊隊列取出注冊數(shù)據(jù)
  3. 同步注冊信息到xxl_registry
  4. 發(fā)消息

嗯?是不是覺得很奇怪。注冊只干這些事?不是磁盤操作嗎?xxl_registry_data中的數(shù)據(jù)一直留著嗎?消息什么時候處理?太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳,服務(wù)不會自動下線嗎?

這些問題先放一放,先看看其他幾個接口吧
既然有服務(wù)注冊,那就有服務(wù)移除,先看移除接口。
ps:重復(fù)的邏輯就略過了

    @Override
    public ReturnT<String> remove(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {

        // valid
        if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
        }
        if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
        }
        if (registryDataList==null || registryDataList.size()==0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
        }
        for (XxlRegistryData registryData: registryDataList) {
            if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
            }
            if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
            }
        }

        // fill + add queue
        for (XxlRegistryData registryData: registryDataList) {
            registryData.setBiz(biz);
            registryData.setEnv(env);
        }
        removeQueue.addAll(registryDataList);

        return ReturnT.SUCCESS;
    }

這是一段似曾相識的代碼,只是把服務(wù)信息放進了另一個隊列,removeQueue
private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();
ok,按照注冊的思路,我們看看針對removeQueue是不是也有10個線程從中反復(fù)取數(shù)據(jù)呢?

        /**
         * remove registry data         (client-num/start-interval s)
         */
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while (!executorStoped) {
                        try {
                            XxlRegistryData xxlRegistryData = removeQueue.take();
                            if (xxlRegistryData != null) {

                                // delete
                                xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue());

                                // valid file status
                                XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
                                if (fileXxlRegistry == null) {
                                    // go on
                                } else if (fileXxlRegistry.getStatus() != 0) {
                                    continue;   // "Status limited."
                                } else {
                                    if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
                                        continue;   // "Repeated limited."
                                    }
                                }

                                // checkRegistryDataAndSendMessage
                                checkRegistryDataAndSendMessage(xxlRegistryData);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            });
        }

果不其然,代碼幾乎和注冊時候的一一對應(yīng),只是把向xxl_registry_data中插入數(shù)據(jù)變成了刪除數(shù)據(jù)。

那么,服務(wù)注冊相關(guān)的接口已經(jīng)看完了。(沒錯,只有兩個)

既然有服務(wù)注冊,就有服務(wù)發(fā)現(xiàn),有提供方,有消費方才是個正常的系統(tǒng)!
那接下來就看看怎么發(fā)現(xiàn)已經(jīng)注冊的服務(wù)!
先看客戶端代碼

// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");

Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);

很簡單,就是把想要發(fā)現(xiàn)的服務(wù)的key加進了參數(shù)里面
那再看看注冊中心的代碼

@Override
    public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {

        // valid
        if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
        }
        if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
        }
        if (keys==null || keys.size()==0) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
        }
        for (String key: keys) {
            if (key==null || key.trim().length()<4 || key.trim().length()>255) {
                return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
            }
        }

        Map<String, List<String>> result = new HashMap<String, List<String>>();
        for (String key: keys) {
            XxlRegistryData xxlRegistryData = new XxlRegistryData();
            xxlRegistryData.setBiz(biz);
            xxlRegistryData.setEnv(env);
            xxlRegistryData.setKey(key);

            List<String> dataList = new ArrayList<String>();
            XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
            if (fileXxlRegistry!=null) {
                dataList = fileXxlRegistry.getDataList();
            }

            result.put(key, dataList);
        }

        return new ReturnT<Map<String, List<String>>>(result);
    }
// get
    public XxlRegistry getFileRegistryData(XxlRegistryData xxlRegistryData){

        // fileName
        String fileName = parseRegistryDataFileName(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());

        // read
        Properties prop = PropUtil.loadProp(fileName);
        if (prop!=null) {
            XxlRegistry fileXxlRegistry = new XxlRegistry();
            fileXxlRegistry.setData(prop.getProperty("data"));
            fileXxlRegistry.setStatus(Integer.valueOf(prop.getProperty("status")));
            fileXxlRegistry.setDataList(JacksonUtil.readValue(fileXxlRegistry.getData(), List.class));
            return fileXxlRegistry;
        }
        return null;
    }

仔細看看代碼非常簡單,就是從注冊中心的本地磁盤讀取服務(wù)信息,然后直接返回。也就是說,服務(wù)發(fā)現(xiàn)只會和磁盤產(chǎn)生交互,和數(shù)據(jù)庫無關(guān)。和前面它自己吹噓的一樣。
那么問題又來了,注冊中心是怎么保證磁盤文件的實時性和一直性的呢?還是老樣子,這個問題放一放,先把最后一個客戶端接口看完!

// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");

registryClient.monitor(keys);

去注冊中心看看

@Override
    public DeferredResult<ReturnT<String>> monitor(String accessToken, String biz, String env, List<String> keys) {

        // init
        DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));

        // valid
        if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"));
            return deferredResult;
        }
        if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"));
            return deferredResult;
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"));
            return deferredResult;
        }
        if (keys==null || keys.size()==0) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."));
            return deferredResult;
        }
        for (String key: keys) {
            if (key==null || key.trim().length()<4 || key.trim().length()>255) {
                deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"));
                return deferredResult;
            }
        }

        // monitor by client
        for (String key: keys) {
            String fileName = parseRegistryDataFileName(biz, env, key);

            List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
            if (deferredResultList == null) {
                deferredResultList = new ArrayList<>();
                registryDeferredResultMap.put(fileName, deferredResultList);
            }

            deferredResultList.add(deferredResult);
        }

        return deferredResult;
    }

看看代碼,初始化了一個DeferredResult

DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));

DeferredResult
是spring-web包提供的一個異步響應(yīng)對象,下面給出一些點單的sample

@GetMapping("deferredResultTest")
    @ResponseBody
    public DeferredResult<String> test(@RequestParam("timeout") final Long timeout) throws InterruptedException {
        final DeferredResult<String> result = new DeferredResult<>(5L * 1000, "timeout");

       new Thread(new Runnable() {
           @Override
           public void run() {
               System.out.println("process start.....");
               try {
                   TimeUnit.SECONDS.sleep(timeout);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               result.setResult("done");
               System.out.println("process dnoe......");
           }
       }).start();


       result.onTimeout(new Runnable() {
           @Override
           public void run() {
               System.out.println("timeout");
               result.setResult("timeout");
           }
       });

        System.out.println("return result");
        return result;
    }

請求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=3 時,返回done,控制臺打印
process start.....
return result
process dnoe......

請求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=8 時,返回timeout,控制臺打印
return result
process start.....
timeout
process dnoe......

回到原來話題,monitor初始化了一個超時時間30秒的DeferredResult,超時就返回一個no key update的response對象
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();
然后去registryDeferredResultMap這個map里找注冊信息,把剛剛初始化的DeferredResult加入這個map中,然后就直接return了這個DeferredResult對象。
根據(jù)我們對DeferredResult對象的理解,這個對象在30秒內(nèi)沒處理完則會返回一個success對象,那么30秒內(nèi)處理玩會發(fā)生什么呢?讓我們搜一搜registryDeferredResultMap,肯定在其他地方做了處理!
搜索結(jié)果表示只有一處

 // set
    public String setFileRegistryData(XxlRegistry xxlRegistry){
        // fileName
        String fileName = parseRegistryDataFileName(xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey());

        // valid repeat update
        Properties existProp = PropUtil.loadProp(fileName);
        if (existProp != null
                && existProp.getProperty("data").equals(xxlRegistry.getData())
                && existProp.getProperty("status").equals(String.valueOf(xxlRegistry.getStatus()))
                ) {
            return new File(fileName).getPath();
        }

        // write
        Properties prop = new Properties();
        prop.setProperty("data", xxlRegistry.getData());
        prop.setProperty("status", String.valueOf(xxlRegistry.getStatus()));

        PropUtil.writeProp(prop, fileName);

        logger.info(">>>>>>>>>>> xxl-registry, setFileRegistryData: biz={}, env={}, key={}, data={}"
                , xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey(), xxlRegistry.getData());


        // brocast monitor client
        List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
        if (deferredResultList != null) {
            registryDeferredResultMap.remove(fileName);
            for (DeferredResult deferredResult: deferredResultList) {
                deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
            }
        }

        return new File(fileName).getPath();
    }

看完代碼可知,在調(diào)用setFileRegistryData方法之后,通過和磁盤里注冊數(shù)據(jù)的對比,如果有變化,則DeferredResult就會被setResult。那問題又來了,setFileRegistryData這個方法是什么時候被調(diào)用呢?讓我們再搜一搜!又是他!afterPropertiesSet!總過有兩個地方用到了setFileRegistryData,先看第一個線程。這個線程每秒鐘會運行一次while里面的代碼。

        /**
         * broadcase new one registry-data-file     (1/1s)
         *
         * clean old message   (1/10s)
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (!executorStoped) {
                    try {
                        // new message, filter readed
                        List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
                        if (messageList!=null && messageList.size()>0) {
                            for (XxlRegistryMessage message: messageList) {
                                readedMessageIds.add(message.getId());

                                if (message.getType() == 0) {   // from registry、add、update、deelete,ne need sync from db, only write

                                    XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);

                                    // process data by status
                                    if (xxlRegistry.getStatus() == 1) {
                                        // locked, not updated
                                    } else if (xxlRegistry.getStatus() == 2) {
                                        // disabled, write empty
                                        xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
                                    } else {
                                        // default, sync from db (aready sync before message, only write)
                                    }

                                    // sync file
                                    setFileRegistryData(xxlRegistry);
                                }
                            }
                        }

                        // clean old message;
                        if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
                            xxlRegistryMessageDao.cleanMessage(registryBeatTime);
                            readedMessageIds.clear();
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        });

先去xxl_registry_message表里找沒被消費的消息

注意,這里代碼有個坑,這個readedMessageIds這個參數(shù)最后在sql里面的體現(xiàn)是not in,所以是,找到?jīng)]有被消費過的message

如果有沒有被消費,則做下面處理:

  1. 把消息id加入到已消費list readedMessageIds
  2. 如果服務(wù)被禁用,把服務(wù)的注冊data數(shù)據(jù)清空
  3. 調(diào)用setFileRegistryData方法同步數(shù)據(jù)到磁盤,并響應(yīng)給監(jiān)聽中的客戶端
  4. 每隔BeatTime(10秒),從xxl_registry_message刪除BeatTime(10秒)之前的數(shù)據(jù)。清空readedMessageIds列表

那么在看第二個線程,第二個線程是BeatTime(10秒)運行一次while里面的代碼

        /**
         *  clean old registry-data     (1/10s)
         *
         *  sync total registry-data db + file      (1+N/10s)
         *
         *  clean old registry-data file
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (!executorStoped) {

                    // align to beattime
                    try {
                        long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
                        if (sleepSecond>0 && sleepSecond<registryBeatTime) {
                            TimeUnit.SECONDS.sleep(sleepSecond);
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // clean old registry-data in db
                        xxlRegistryDataDao.cleanData(registryBeatTime * 3);

                        // sync registry-data, db + file
                        int offset = 0;
                        int pagesize = 1000;
                        List<String> registryDataFileList = new ArrayList<>();

                        List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
                        while (registryList!=null && registryList.size()>0) {

                            for (XxlRegistry registryItem: registryList) {

                                // process data by status
                                if (registryItem.getStatus() == 1) {
                                    // locked, not updated
                                } else if (registryItem.getStatus() == 2) {
                                    // disabled, write empty
                                    String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
                                    registryItem.setData(dataJson);
                                } else {
                                    // default, sync from db
                                    List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
                                    List<String> valueList = new ArrayList<String>();
                                    if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
                                        for (XxlRegistryData dataItem: xxlRegistryDataList) {
                                            valueList.add(dataItem.getValue());
                                        }
                                    }
                                    String dataJson = JacksonUtil.writeValueAsString(valueList);

                                    // check update, sync db
                                    if (!registryItem.getData().equals(dataJson)) {
                                        registryItem.setData(dataJson);
                                        xxlRegistryDao.update(registryItem);
                                    }
                                }

                                // sync file
                                String registryDataFile = setFileRegistryData(registryItem);

                                // collect registryDataFile
                                registryDataFileList.add(registryDataFile);
                            }


                            offset += 1000;
                            registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
                        }

                        // clean old registry-data file
                        cleanFileRegistryData(registryDataFileList);

                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(registryBeatTime);
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        });
    }
  1. 一開始做了一下時間對其到BeatTime(10秒)的操作(比如現(xiàn)在是10:10:03,那么會休眠7秒,對其到10:10:10)
  2. 清除xxl_registry_data表中30秒前的數(shù)據(jù)
  3. xxl_registry中循環(huán)取1000條數(shù)據(jù),
    3.1. 如果服務(wù)被鎖定,則什么都不做
    3.2. 如果服務(wù)被禁用,則設(shè)置data為空數(shù)組
    3.3. 如果正常,則從xxl_registry_data表取出相對服務(wù)的所有value組成list,更新回xxl_registry
    3.4. 同步數(shù)據(jù)到磁盤,并把文件路徑添加到registryDataFileList里面
  4. 清除不在registryDataFileList里面的文件

看完這段代碼就知道,這就是前面所說的,10秒鐘全量同步數(shù)據(jù)。

看到這里,monitor的大致流程已經(jīng)比較清晰

  1. 設(shè)置DeferredResult,
  2. 等待30秒,30秒內(nèi)服務(wù)沒有變化,則在30秒的時候返回success。
  3. 30秒內(nèi),如果【每秒檢查message消息的線程】發(fā)現(xiàn)有變化,則會立即返回success
  4. 30秒內(nèi),如果【每10秒全量同步線程】發(fā)現(xiàn)有變化,則會立即返回success

所以,客戶端的視角就是,服務(wù)信息有變化,則離開拿到monitor方法的返回,沒變化則30秒的時候拿到返回值。

既然服務(wù)消費者的代碼已經(jīng)比較清楚,那再回頭看看剛剛服務(wù)生產(chǎn)者的代碼留下了的疑問。

注冊只干這些事?不是磁盤操作嗎?

消費者確實是磁盤操作,服務(wù)生產(chǎn)者會同事維護db和磁盤數(shù)據(jù)

xxl_registry_data中的數(shù)據(jù)一直留著嗎?

3倍心跳之前的數(shù)據(jù),會在【每10秒全量同步線程】中,被刪除

消息什么時候處理?

【每秒檢查message消息的線程】

太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳,服務(wù)不會自動下線嗎?

會在【每10秒全量同步線程】中data被清空,所以需要服務(wù)生產(chǎn)者每隔一段時間就注冊(續(xù)約)一次(就是重新調(diào)一次registry方法)。XxlRegistryClient提供了后臺線程自動注冊(續(xù)約)

是個比較簡單的生產(chǎn)者消費者模型,而且大多數(shù)的疑問都被解決了。
那么我們重新梳理一遍這個服務(wù)中心的工作流程:

  • 服務(wù)生產(chǎn)者通過registry方法向注冊中心注冊(向registryQueue隊列添加數(shù)據(jù))
  • 后臺線程會定時掃描registryQueue,并更新數(shù)據(jù)到db,發(fā)送消息(消息會被【每秒檢查message消息的線程】消費,然后同步數(shù)據(jù)到磁盤)
  • 服務(wù)生產(chǎn)者通過remove方法向注冊中心移除服務(wù)(邏輯和注冊一致)
  • 服務(wù)消費者從磁盤讀取服務(wù)生產(chǎn)者數(shù)據(jù)
  • 10秒鐘會有一次全量數(shù)據(jù)同步

附上官方架構(gòu)圖一張


官方架構(gòu)圖

讓我們在回頭看看他吹噓的功能

1、輕量級:基于DB與磁盤文件,只需要提供一個DB實例即可,無第三方依賴;
2、實時性:借助內(nèi)部廣播機制,新服務(wù)上線、下線,可以在1s內(nèi)推送給客戶端;
3、數(shù)據(jù)同步:注冊中心會定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù),確保服務(wù)數(shù)據(jù)實時可用;
4、性能:服務(wù)發(fā)現(xiàn)時僅讀磁盤文件,性能非常高;服務(wù)注冊、摘除時通過磁盤文件校驗,防止重復(fù)注冊操作;
5、擴展性:可方便、快速的橫向擴展,只需保證服務(wù)注冊中心配置一致即可,可借助負(fù)載均衡組件如Nginx快速集群部署;
6、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
正常狀態(tài)=支持動態(tài)注冊、發(fā)現(xiàn),服務(wù)注冊信息實時更新;
鎖定狀態(tài)=人工維護注冊信息,服務(wù)注冊信息固定不變;
禁用狀態(tài)=禁止使用,服務(wù)注冊信息固定為空;
7、跨語言:注冊中心提供HTTP接口(RESTFUL 格式)供客戶端實用,語言無關(guān),通用性更強;
8、兼容性:項目立項之初是為XXL-RPC量身設(shè)計,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊實用,如dubbo、springboot等;
9、跨機房:得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的配置服務(wù);因此,異地跨機房部署時,只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
10、容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn) "服務(wù)注冊中心" 產(chǎn)品開箱即用;
11、訪問令牌(accessToken):為提升系統(tǒng)安全性,注冊中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;

除了5,8,9,10,11,其他的已經(jīng)在剛才的代碼閱讀中得到驗證。
8,10,11略過,不是這次閱讀源碼的重點。
我們還剩下最后一個疑惑,也就是多服務(wù)中心的時候怎么所有服務(wù)中心的數(shù)據(jù)一致。
先看看官方怎么說的

服務(wù)注冊中心集群(可選)

服務(wù)注冊中心支持集群部署,提升消息系統(tǒng)容災(zāi)和可用性。
集群部署時,幾點要求和建議:

  • DB配置保持一致;
  • 登陸賬號配置保持一致;
  • 建議:推薦通過nginx為集群做負(fù)載均衡,分配域名。訪問、客戶端使用等操作均通過該域名進行。

4.3 跨機房(異地多活)

得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的服務(wù)注冊服務(wù);因此,異地跨機房部署時,> 只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
舉個例子:比如機房A、B 內(nèi)分別部署服務(wù)注冊中心集群節(jié)點。即機房A部署 a1、a2 兩個服務(wù)注冊中心服務(wù)節(jié)點,機房B部署 b1、b2 兩個服務(wù)注冊中心服務(wù)節(jié)點;
那么各機房內(nèi)應(yīng)用只需要請求本機房內(nèi)部署的服務(wù)注冊中心節(jié)點即可,不需要跨機房調(diào)用。即機房A內(nèi)業(yè)務(wù)應(yīng)用請求 a1、a2 獲取配置、機房B內(nèi)業(yè)務(wù)應(yīng)用 b1、b2 獲取配置。

這種跨機房部署方式實現(xiàn)了配置服務(wù)的 "異地多活",擁有以下幾點好處:

  • 1、注冊服務(wù)響應(yīng)更快:注冊請求本機房內(nèi)搞定;
  • 2、注冊服務(wù)更穩(wěn)定:注冊請求不需要跨機房,不需要考慮復(fù)雜的網(wǎng)絡(luò)情況,更加穩(wěn)定;
  • 2、容災(zāi)性:即使一個機房內(nèi)服務(wù)注冊中心全部宕機,僅會影響到本機房內(nèi)應(yīng)用加載服務(wù),其他機房不會受到影響。

4.4 一致性

類似 Raft 方案,更輕量級、穩(wěn)定;

  • Raft:Leader統(tǒng)一處理變更操作請求,一致性協(xié)議的作用具化為保證節(jié)點間操作日志副本(log replication)一致,以term作為邏輯時鐘(logical clock)保證時序,節(jié)點運行相同狀態(tài)機(state machine)得到一致結(jié)果。
  • xxl-registry:
    • Leader(統(tǒng)一處理分發(fā)變更請求):DB消息表(僅變更時產(chǎn)生消息,消息量較小,而且消息輪訓(xùn)存在間隔,因此消息表壓力不會太大;);
    • state machine(順序操作日志副本并保證結(jié)果一直):順序消費消息,保證本地數(shù)據(jù)一致,并通過周期全量同步進一步保證一致性;

嚇得我趕緊看了下Raft方案??赐曛螅揖拖?,這系統(tǒng)用到Raft方案了嗎?
看起來非常高大上,異地多活,跨機房,容災(zāi),其實通過代碼閱讀,就可以很明顯看出來:這個系統(tǒng)是通過DB實例以及10秒一次的全量同步來保證一致性的。不管數(shù)據(jù)怎么變,數(shù)據(jù)庫中的xxl_registry_data表才是真正的注冊數(shù)據(jù)。只要隔一段時間同步這個表中的數(shù)據(jù),就行了。當(dāng)然,前提是只有一個DB實例(其實不是也可以)
那么,這個注冊中心的大致編程思想通過源碼閱讀結(jié)合官方文檔,已經(jīng)基本了解。

下一篇,我們來看看《分布式服務(wù)框架XXL-RPC》吧

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容