前言
因為公司最近項目原因正好用到了《分布式任務(wù)調(diào)度平臺XXL-JOB》,項目結(jié)束打算看看他的源碼,發(fā)現(xiàn)他還依賴于 《分布式服務(wù)框架XXL-RPC》,于是我決定先看XXL-RPC。但當(dāng)我正準(zhǔn)備看的時候,我發(fā)現(xiàn)XXL-RPC依賴于 《分布式服務(wù)注冊中心XXL-REGISTRY》,于是我決定先看XXL-REGISTRY
讓我們先看看它自己怎么吹自己的
[1.1 概述]
XXL-REGISTRY 是一個輕量級分布式服務(wù)注冊中心,擁有"輕量級、秒級注冊上線、多環(huán)境、跨語言、跨機房"等特性?,F(xiàn)已開放源代碼,開箱即用。
[1.2 特性]
- 1、輕量級:基于DB與磁盤文件,只需要提供一個DB實例即可,無第三方依賴;
- 2、實時性:借助內(nèi)部廣播機制,新服務(wù)上線、下線,可以在1s內(nèi)推送給客戶端;
- 3、數(shù)據(jù)同步:注冊中心會定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù),確保服務(wù)數(shù)據(jù)實時可用;
- 4、性能:服務(wù)發(fā)現(xiàn)時僅讀磁盤文件,性能非常高;服務(wù)注冊、摘除時通過磁盤文件校驗,防止重復(fù)注冊操作;
- 5、擴展性:可方便、快速的橫向擴展,只需保證服務(wù)注冊中心配置一致即可,可借助負(fù)載均衡組件如Nginx快速集群部署;
- 6、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
- 正常狀態(tài)=支持動態(tài)注冊、發(fā)現(xiàn),服務(wù)注冊信息實時更新;
- 鎖定狀態(tài)=人工維護注冊信息,服務(wù)注冊信息固定不變;
- 禁用狀態(tài)=禁止使用,服務(wù)注冊信息固定為空;
- 7、跨語言:注冊中心提供HTTP接口(RESTFUL 格式)供客戶端實用,語言無關(guān),通用性更強;
- 8、兼容性:項目立項之初是為XXL-RPC量身設(shè)計,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊實用,如dubbo、springboot等;
- 9、跨機房:得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的配置服務(wù);因此,異地跨機房部署時,只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
- 10、容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn) "服務(wù)注冊中心" 產(chǎn)品開箱即用;
- 11、訪問令牌(accessToken):為提升系統(tǒng)安全性,注冊中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;
屁話不多,先把源碼下下來,創(chuàng)建數(shù)據(jù)庫,跑起來看看


界面簡介,貌似可以手動注冊服務(wù),先注冊一個玩一下。隨便填寫一下信息,保存

可以查看剛剛的注冊信息

點擊運行報表tab看到我剛剛注冊的服務(wù)信息。

回到服務(wù)注冊,再點擊查看,發(fā)現(xiàn)我剛剛注冊的地址沒了,做下猜測:
因為我的注冊服務(wù)之后沒有和注冊中心有任何互動,被注冊中心自動下線了

界面大概就是這樣,我們看看官方的使用說明
客戶端maven依賴地址:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-registry-client</artifactId>
<version>${最新穩(wěn)定版}</version>
</dependency>
客戶端API實用示例代碼如下:
// 注冊中心客戶端(基礎(chǔ)類)
XxlRegistryBaseClient registryClient = new XxlRegistryBaseClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");
// 注冊中心客戶端(增強類)
XxlRegistryClient registryClient = new XxlRegistryClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");
// 服務(wù)注冊 & 續(xù)約:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));
registryClient.registry(registryDataList);
// 服務(wù)摘除:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));
registryClient.remove(registryDataList);
// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);
// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
registryClient.monitor(keys);
ok,那就從客戶端代碼入手。
首先需要創(chuàng)建一個registryClient ,先從XxlRegistryBaseClient 開始。看看他的構(gòu)造函數(shù)。
public XxlRegistryBaseClient(String adminAddress, String accessToken, String biz, String env) {
this.adminAddress = adminAddress;
this.accessToken = accessToken;
this.biz = biz;
this.env = env;
// valid
if (adminAddress==null || adminAddress.trim().length()==0) {
throw new RuntimeException("xxl-registry adminAddress empty");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
throw new RuntimeException("xxl-registry biz empty Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
throw new RuntimeException("xxl-registry biz env Invalid[2~255]");
}
// parse
adminAddressArr = new ArrayList<>();
if (adminAddress.contains(",")) {
adminAddressArr.addAll(Arrays.asList(adminAddress.split(",")));
} else {
adminAddressArr.add(adminAddress);
}
}
就是一些簡單的校驗和設(shè)值,沒什么看頭。接下來看看服務(wù)的注冊和續(xù)約。
/**
* registry
*
* @param registryDataList
* @return
*/
public boolean registry(List<XxlRegistryDataParamVO> registryDataList){
// valid
if (registryDataList==null || registryDataList.size()==0) {
throw new RuntimeException("xxl-registry registryDataList empty");
}
for (XxlRegistryDataParamVO registryParam: registryDataList) {
if (registryParam.getKey()==null || registryParam.getKey().trim().length()<4 || registryParam.getKey().trim().length()>255) {
throw new RuntimeException("xxl-registry registryDataList#key Invalid[4~255]");
}
if (registryParam.getValue()==null || registryParam.getValue().trim().length()<4 || registryParam.getValue().trim().length()>255) {
throw new RuntimeException("xxl-registry registryDataList#value Invalid[4~255]");
}
}
// pathUrl
String pathUrl = "/api/registry";
// param
XxlRegistryParamVO registryParamVO = new XxlRegistryParamVO();
registryParamVO.setAccessToken(this.accessToken);
registryParamVO.setBiz(this.biz);
registryParamVO.setEnv(this.env);
registryParamVO.setRegistryDataList(registryDataList);
String paramsJson = BasicJson.toJson(registryParamVO);
// result
Map<String, Object> respObj = requestAndValid(pathUrl, paramsJson, 5);
return respObj!=null?true:false;
}
根據(jù)下方requestAndValid代碼可知,這是一個發(fā)送http請求的代碼,那么registry方法,除了一些校驗,其實就是向服務(wù)器發(fā)送了一個httppost請求。
private Map<String, Object> requestAndValid(String pathUrl, String requestBody, int timeout){
for (String adminAddressUrl: adminAddressArr) {
String finalUrl = adminAddressUrl + pathUrl;
// request
String responseData = BasicHttpUtil.postBody(finalUrl, requestBody, timeout);
if (responseData == null) {
return null;
}
// parse resopnse
Map<String, Object> resopnseMap = null;
try {
resopnseMap = BasicJson.parseMap(responseData);
} catch (Exception e) { }
// valid resopnse
if (resopnseMap==null
|| !resopnseMap.containsKey("code")
|| !"200".equals(String.valueOf(resopnseMap.get("code")))
) {
logger.warn("XxlRegistryBaseClient response fail, responseData={}", responseData);
return null;
}
return resopnseMap;
}
return null;
}
那么直接看看注冊中心源碼的這個 /api/registry是干什么的吧
跳過controller的一些校驗,直接進入service
@Override
public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (registryDataList==null || registryDataList.size()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
}
for (XxlRegistryData registryData: registryDataList) {
if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
}
if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
}
}
// fill + add queue
for (XxlRegistryData registryData: registryDataList) {
registryData.setBiz(biz);
registryData.setEnv(env);
}
registryQueue.addAll(registryDataList);
return ReturnT.SUCCESS;
}
除了一些校驗,最關(guān)鍵的出現(xiàn)了!
registryQueue.addAll(registryDataList);
注冊數(shù)據(jù)被塞進了一個隊列里面??纯催@個隊列的定義:
private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();
也就是說,我們注冊(續(xù)約)服務(wù)的時候,只是把信息放入了一個隊列。那么稍微動動 腦子就知道,當(dāng)出隊的時候,就是真正處理數(shù)據(jù)的時候。我們用IDE搜一搜這個隊列的相關(guān)操作代碼看看。
搜索結(jié)果激動人心,只有一個地方用了take,一個地方用了addAll(就是上面提到的),其他沒有任何調(diào)用。
那就直接看看take出來數(shù)據(jù)做了什么事情吧。
ps:為了方便理解代碼,先分看下數(shù)據(jù)庫
在xxl_registry表中,biz,env,key一起構(gòu)成了唯一主鍵,所有服務(wù)的值在data中維護成數(shù)組的形式存在
xxl_registry
在xxl_registry_data表中,biz,env,key,value構(gòu)成了唯一主鍵,每條數(shù)據(jù)代表了一個服務(wù)的注冊信息,updateTime表示服務(wù)注冊的時間。
xxl_registry_data
消息表,記錄服務(wù)變化信息
xxl_registry_message
經(jīng)驗老道的程序員已經(jīng)發(fā)現(xiàn)afterPropertiesSet這個方法。
在afterPropertiesSet中開了10個線程,每個線程做的事情就是:
- 從
registryQueue里面取出一條服務(wù)注冊數(shù)據(jù) - 向數(shù)據(jù)庫更新或新增一條
xxl_registry_data數(shù)據(jù)(這個表主要用來記錄某個服務(wù)最后的注冊(續(xù)約)時間) - 從磁盤讀取記錄該服務(wù)的文件數(shù)據(jù)
3.1. 如果磁盤讀取不到相應(yīng)文件,則進入checkRegistryDataAndSendMessage方法,把xxl_registry_data中所有的value組成json array,對比xxl_registry中的address看是否一致。
3.1.1.xxl_registry中不存在相應(yīng)服務(wù)信息,則新增一條服務(wù)數(shù)據(jù)
3.1.2.xxl_registry中存在相應(yīng)服務(wù),但服務(wù)address和jsonArray不一致,則更新xxl_registry數(shù)據(jù)使其和xxl_registry_data中的數(shù)據(jù)一致
3.1.3. 當(dāng)數(shù)據(jù)不一致時,會向xxl_registry_message中插入一條消息數(shù)據(jù)
3.2. 讀取到了相應(yīng)服務(wù)的信息,但status不是0,則不做任何操作
3.3. 讀取到了相應(yīng)服務(wù)的信息,且包含了剛剛?cè)〕鰜淼姆?wù)注冊數(shù)據(jù),則不做任何操作
@Override
public void afterPropertiesSet() throws Exception {
// valid
if (registryDataFilePath==null || registryDataFilePath.trim().length()==0) {
throw new RuntimeException("xxl-registry, registryDataFilePath empty.");
}
/**
* registry registry data (client-num/10 s)
*/
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
XxlRegistryData xxlRegistryData = registryQueue.take();
if (xxlRegistryData !=null) {
// refresh or add
int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
if (ret == 0) {
xxlRegistryDataDao.add(xxlRegistryData);
}
// valid file status
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry == null) {
// go on
} else if (fileXxlRegistry.getStatus() != 0) {
continue; // "Status limited."
} else {
if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
continue; // "Repeated limited."
}
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
}
/**
* update Registry And Message
*/
private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){
// data json
List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
List<String> valueList = new ArrayList<>();
if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
for (XxlRegistryData dataItem: xxlRegistryDataList) {
valueList.add(dataItem.getValue());
}
}
String dataJson = JacksonUtil.writeValueAsString(valueList);
// update registry and message
XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
boolean needMessage = false;
if (xxlRegistry == null) {
xxlRegistry = new XxlRegistry();
xxlRegistry.setBiz(xxlRegistryData.getBiz());
xxlRegistry.setEnv(xxlRegistryData.getEnv());
xxlRegistry.setKey(xxlRegistryData.getKey());
xxlRegistry.setData(dataJson);
xxlRegistryDao.add(xxlRegistry);
needMessage = true;
} else {
// check status, locked and disabled not use
if (xxlRegistry.getStatus() != 0) {
return;
}
if (!xxlRegistry.getData().equals(dataJson)) {
xxlRegistry.setData(dataJson);
xxlRegistryDao.update(xxlRegistry);
needMessage = true;
}
}
if (needMessage) {
// sendRegistryDataUpdateMessage (registry update)
sendRegistryDataUpdateMessage(xxlRegistry);
}
}
總結(jié)一下,服務(wù)注冊的大致流程:
- 通過httppost請求,客戶端把服務(wù)注冊信息塞入注冊中心的注冊隊列
- 注冊中心后臺有10個線程會從注冊隊列取出注冊數(shù)據(jù)
- 同步注冊信息到xxl_registry
- 發(fā)消息
嗯?是不是覺得很奇怪。注冊只干這些事?不是磁盤操作嗎?xxl_registry_data中的數(shù)據(jù)一直留著嗎?消息什么時候處理?太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳,服務(wù)不會自動下線嗎?
這些問題先放一放,先看看其他幾個接口吧
既然有服務(wù)注冊,那就有服務(wù)移除,先看移除接口。
ps:重復(fù)的邏輯就略過了
@Override
public ReturnT<String> remove(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (registryDataList==null || registryDataList.size()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
}
for (XxlRegistryData registryData: registryDataList) {
if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
}
if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
}
}
// fill + add queue
for (XxlRegistryData registryData: registryDataList) {
registryData.setBiz(biz);
registryData.setEnv(env);
}
removeQueue.addAll(registryDataList);
return ReturnT.SUCCESS;
}
這是一段似曾相識的代碼,只是把服務(wù)信息放進了另一個隊列,removeQueue
private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();
ok,按照注冊的思路,我們看看針對removeQueue是不是也有10個線程從中反復(fù)取數(shù)據(jù)呢?
/**
* remove registry data (client-num/start-interval s)
*/
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
XxlRegistryData xxlRegistryData = removeQueue.take();
if (xxlRegistryData != null) {
// delete
xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue());
// valid file status
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry == null) {
// go on
} else if (fileXxlRegistry.getStatus() != 0) {
continue; // "Status limited."
} else {
if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
continue; // "Repeated limited."
}
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
果不其然,代碼幾乎和注冊時候的一一對應(yīng),只是把向xxl_registry_data中插入數(shù)據(jù)變成了刪除數(shù)據(jù)。
那么,服務(wù)注冊相關(guān)的接口已經(jīng)看完了。(沒錯,只有兩個)
既然有服務(wù)注冊,就有服務(wù)發(fā)現(xiàn),有提供方,有消費方才是個正常的系統(tǒng)!
那接下來就看看怎么發(fā)現(xiàn)已經(jīng)注冊的服務(wù)!
先看客戶端代碼
// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);
很簡單,就是把想要發(fā)現(xiàn)的服務(wù)的key加進了參數(shù)里面
那再看看注冊中心的代碼
@Override
public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (keys==null || keys.size()==0) {
return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
}
}
Map<String, List<String>> result = new HashMap<String, List<String>>();
for (String key: keys) {
XxlRegistryData xxlRegistryData = new XxlRegistryData();
xxlRegistryData.setBiz(biz);
xxlRegistryData.setEnv(env);
xxlRegistryData.setKey(key);
List<String> dataList = new ArrayList<String>();
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry!=null) {
dataList = fileXxlRegistry.getDataList();
}
result.put(key, dataList);
}
return new ReturnT<Map<String, List<String>>>(result);
}
// get
public XxlRegistry getFileRegistryData(XxlRegistryData xxlRegistryData){
// fileName
String fileName = parseRegistryDataFileName(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
// read
Properties prop = PropUtil.loadProp(fileName);
if (prop!=null) {
XxlRegistry fileXxlRegistry = new XxlRegistry();
fileXxlRegistry.setData(prop.getProperty("data"));
fileXxlRegistry.setStatus(Integer.valueOf(prop.getProperty("status")));
fileXxlRegistry.setDataList(JacksonUtil.readValue(fileXxlRegistry.getData(), List.class));
return fileXxlRegistry;
}
return null;
}
仔細看看代碼非常簡單,就是從注冊中心的本地磁盤讀取服務(wù)信息,然后直接返回。也就是說,服務(wù)發(fā)現(xiàn)只會和磁盤產(chǎn)生交互,和數(shù)據(jù)庫無關(guān)。和前面它自己吹噓的一樣。
那么問題又來了,注冊中心是怎么保證磁盤文件的實時性和一直性的呢?還是老樣子,這個問題放一放,先把最后一個客戶端接口看完!
// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
registryClient.monitor(keys);
去注冊中心看看
@Override
public DeferredResult<ReturnT<String>> monitor(String accessToken, String biz, String env, List<String> keys) {
// init
DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"));
return deferredResult;
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"));
return deferredResult;
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"));
return deferredResult;
}
if (keys==null || keys.size()==0) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."));
return deferredResult;
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"));
return deferredResult;
}
}
// monitor by client
for (String key: keys) {
String fileName = parseRegistryDataFileName(biz, env, key);
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList == null) {
deferredResultList = new ArrayList<>();
registryDeferredResultMap.put(fileName, deferredResultList);
}
deferredResultList.add(deferredResult);
}
return deferredResult;
}
看看代碼,初始化了一個DeferredResult
DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
DeferredResult
是spring-web包提供的一個異步響應(yīng)對象,下面給出一些點單的sample
@GetMapping("deferredResultTest")
@ResponseBody
public DeferredResult<String> test(@RequestParam("timeout") final Long timeout) throws InterruptedException {
final DeferredResult<String> result = new DeferredResult<>(5L * 1000, "timeout");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("process start.....");
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
result.setResult("done");
System.out.println("process dnoe......");
}
}).start();
result.onTimeout(new Runnable() {
@Override
public void run() {
System.out.println("timeout");
result.setResult("timeout");
}
});
System.out.println("return result");
return result;
}
請求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=3 時,返回done,控制臺打印
process start.....
return result
process dnoe......
請求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=8 時,返回timeout,控制臺打印
return result
process start.....
timeout
process dnoe......
回到原來話題,monitor初始化了一個超時時間30秒的DeferredResult,超時就返回一個no key update的response對象
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();
然后去registryDeferredResultMap這個map里找注冊信息,把剛剛初始化的DeferredResult加入這個map中,然后就直接return了這個DeferredResult對象。
根據(jù)我們對DeferredResult對象的理解,這個對象在30秒內(nèi)沒處理完則會返回一個success對象,那么30秒內(nèi)處理玩會發(fā)生什么呢?讓我們搜一搜registryDeferredResultMap,肯定在其他地方做了處理!
搜索結(jié)果表示只有一處
// set
public String setFileRegistryData(XxlRegistry xxlRegistry){
// fileName
String fileName = parseRegistryDataFileName(xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey());
// valid repeat update
Properties existProp = PropUtil.loadProp(fileName);
if (existProp != null
&& existProp.getProperty("data").equals(xxlRegistry.getData())
&& existProp.getProperty("status").equals(String.valueOf(xxlRegistry.getStatus()))
) {
return new File(fileName).getPath();
}
// write
Properties prop = new Properties();
prop.setProperty("data", xxlRegistry.getData());
prop.setProperty("status", String.valueOf(xxlRegistry.getStatus()));
PropUtil.writeProp(prop, fileName);
logger.info(">>>>>>>>>>> xxl-registry, setFileRegistryData: biz={}, env={}, key={}, data={}"
, xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey(), xxlRegistry.getData());
// brocast monitor client
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList != null) {
registryDeferredResultMap.remove(fileName);
for (DeferredResult deferredResult: deferredResultList) {
deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
}
}
return new File(fileName).getPath();
}
看完代碼可知,在調(diào)用setFileRegistryData方法之后,通過和磁盤里注冊數(shù)據(jù)的對比,如果有變化,則DeferredResult就會被setResult。那問題又來了,setFileRegistryData這個方法是什么時候被調(diào)用呢?讓我們再搜一搜!又是他!afterPropertiesSet!總過有兩個地方用到了setFileRegistryData,先看第一個線程。這個線程每秒鐘會運行一次while里面的代碼。
/**
* broadcase new one registry-data-file (1/1s)
*
* clean old message (1/10s)
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
// new message, filter readed
List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
if (messageList!=null && messageList.size()>0) {
for (XxlRegistryMessage message: messageList) {
readedMessageIds.add(message.getId());
if (message.getType() == 0) { // from registry、add、update、deelete,ne need sync from db, only write
XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);
// process data by status
if (xxlRegistry.getStatus() == 1) {
// locked, not updated
} else if (xxlRegistry.getStatus() == 2) {
// disabled, write empty
xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
} else {
// default, sync from db (aready sync before message, only write)
}
// sync file
setFileRegistryData(xxlRegistry);
}
}
}
// clean old message;
if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
xxlRegistryMessageDao.cleanMessage(registryBeatTime);
readedMessageIds.clear();
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
先去xxl_registry_message表里找沒被消費的消息
注意,這里代碼有個坑,這個
readedMessageIds這個參數(shù)最后在sql里面的體現(xiàn)是not in,所以是,找到?jīng)]有被消費過的message
如果有沒有被消費,則做下面處理:
- 把消息id加入到已消費list
readedMessageIds里 - 如果服務(wù)被禁用,把服務(wù)的注冊data數(shù)據(jù)清空
- 調(diào)用
setFileRegistryData方法同步數(shù)據(jù)到磁盤,并響應(yīng)給監(jiān)聽中的客戶端 - 每隔BeatTime(10秒),從xxl_registry_message刪除BeatTime(10秒)之前的數(shù)據(jù)。清空readedMessageIds列表
那么在看第二個線程,第二個線程是BeatTime(10秒)運行一次while里面的代碼
/**
* clean old registry-data (1/10s)
*
* sync total registry-data db + file (1+N/10s)
*
* clean old registry-data file
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
// align to beattime
try {
long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
if (sleepSecond>0 && sleepSecond<registryBeatTime) {
TimeUnit.SECONDS.sleep(sleepSecond);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
// clean old registry-data in db
xxlRegistryDataDao.cleanData(registryBeatTime * 3);
// sync registry-data, db + file
int offset = 0;
int pagesize = 1000;
List<String> registryDataFileList = new ArrayList<>();
List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
while (registryList!=null && registryList.size()>0) {
for (XxlRegistry registryItem: registryList) {
// process data by status
if (registryItem.getStatus() == 1) {
// locked, not updated
} else if (registryItem.getStatus() == 2) {
// disabled, write empty
String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
registryItem.setData(dataJson);
} else {
// default, sync from db
List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
List<String> valueList = new ArrayList<String>();
if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
for (XxlRegistryData dataItem: xxlRegistryDataList) {
valueList.add(dataItem.getValue());
}
}
String dataJson = JacksonUtil.writeValueAsString(valueList);
// check update, sync db
if (!registryItem.getData().equals(dataJson)) {
registryItem.setData(dataJson);
xxlRegistryDao.update(registryItem);
}
}
// sync file
String registryDataFile = setFileRegistryData(registryItem);
// collect registryDataFile
registryDataFileList.add(registryDataFile);
}
offset += 1000;
registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
}
// clean old registry-data file
cleanFileRegistryData(registryDataFileList);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(registryBeatTime);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
- 一開始做了一下時間對其到BeatTime(10秒)的操作(比如現(xiàn)在是10:10:03,那么會休眠7秒,對其到10:10:10)
- 清除
xxl_registry_data表中30秒前的數(shù)據(jù) - 從
xxl_registry中循環(huán)取1000條數(shù)據(jù),
3.1. 如果服務(wù)被鎖定,則什么都不做
3.2. 如果服務(wù)被禁用,則設(shè)置data為空數(shù)組
3.3. 如果正常,則從xxl_registry_data表取出相對服務(wù)的所有value組成list,更新回xxl_registry表
3.4. 同步數(shù)據(jù)到磁盤,并把文件路徑添加到registryDataFileList里面 - 清除不在
registryDataFileList里面的文件
看完這段代碼就知道,這就是前面所說的,10秒鐘全量同步數(shù)據(jù)。
看到這里,monitor的大致流程已經(jīng)比較清晰
- 設(shè)置DeferredResult,
- 等待30秒,30秒內(nèi)服務(wù)沒有變化,則在30秒的時候返回success。
- 30秒內(nèi),如果【每秒檢查message消息的線程】發(fā)現(xiàn)有變化,則會立即返回success
- 30秒內(nèi),如果【每10秒全量同步線程】發(fā)現(xiàn)有變化,則會立即返回success
所以,客戶端的視角就是,服務(wù)信息有變化,則離開拿到monitor方法的返回,沒變化則30秒的時候拿到返回值。
既然服務(wù)消費者的代碼已經(jīng)比較清楚,那再回頭看看剛剛服務(wù)生產(chǎn)者的代碼留下了的疑問。
注冊只干這些事?不是磁盤操作嗎?
消費者確實是磁盤操作,服務(wù)生產(chǎn)者會同事維護db和磁盤數(shù)據(jù)
xxl_registry_data中的數(shù)據(jù)一直留著嗎?
3倍心跳之前的數(shù)據(jù),會在【每10秒全量同步線程】中,被刪除
消息什么時候處理?
【每秒檢查message消息的線程】
太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳,服務(wù)不會自動下線嗎?
會在【每10秒全量同步線程】中data被清空,所以需要服務(wù)生產(chǎn)者每隔一段時間就注冊(續(xù)約)一次(就是重新調(diào)一次registry方法)。XxlRegistryClient提供了后臺線程自動注冊(續(xù)約)
是個比較簡單的生產(chǎn)者消費者模型,而且大多數(shù)的疑問都被解決了。
那么我們重新梳理一遍這個服務(wù)中心的工作流程:
- 服務(wù)生產(chǎn)者通過registry方法向注冊中心注冊(向
registryQueue隊列添加數(shù)據(jù))- 后臺線程會定時掃描
registryQueue,并更新數(shù)據(jù)到db,發(fā)送消息(消息會被【每秒檢查message消息的線程】消費,然后同步數(shù)據(jù)到磁盤)- 服務(wù)生產(chǎn)者通過remove方法向注冊中心移除服務(wù)(邏輯和注冊一致)
- 服務(wù)消費者從磁盤讀取服務(wù)生產(chǎn)者數(shù)據(jù)
- 10秒鐘會有一次全量數(shù)據(jù)同步
附上官方架構(gòu)圖一張

讓我們在回頭看看他吹噓的功能
1、輕量級:基于DB與磁盤文件,只需要提供一個DB實例即可,無第三方依賴;
2、實時性:借助內(nèi)部廣播機制,新服務(wù)上線、下線,可以在1s內(nèi)推送給客戶端;
3、數(shù)據(jù)同步:注冊中心會定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù),確保服務(wù)數(shù)據(jù)實時可用;
4、性能:服務(wù)發(fā)現(xiàn)時僅讀磁盤文件,性能非常高;服務(wù)注冊、摘除時通過磁盤文件校驗,防止重復(fù)注冊操作;
5、擴展性:可方便、快速的橫向擴展,只需保證服務(wù)注冊中心配置一致即可,可借助負(fù)載均衡組件如Nginx快速集群部署;
6、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
正常狀態(tài)=支持動態(tài)注冊、發(fā)現(xiàn),服務(wù)注冊信息實時更新;
鎖定狀態(tài)=人工維護注冊信息,服務(wù)注冊信息固定不變;
禁用狀態(tài)=禁止使用,服務(wù)注冊信息固定為空;
7、跨語言:注冊中心提供HTTP接口(RESTFUL 格式)供客戶端實用,語言無關(guān),通用性更強;
8、兼容性:項目立項之初是為XXL-RPC量身設(shè)計,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊實用,如dubbo、springboot等;
9、跨機房:得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的配置服務(wù);因此,異地跨機房部署時,只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
10、容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn) "服務(wù)注冊中心" 產(chǎn)品開箱即用;
11、訪問令牌(accessToken):為提升系統(tǒng)安全性,注冊中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;
除了5,8,9,10,11,其他的已經(jīng)在剛才的代碼閱讀中得到驗證。
8,10,11略過,不是這次閱讀源碼的重點。
我們還剩下最后一個疑惑,也就是多服務(wù)中心的時候怎么所有服務(wù)中心的數(shù)據(jù)一致。
先看看官方怎么說的
服務(wù)注冊中心集群(可選)
服務(wù)注冊中心支持集群部署,提升消息系統(tǒng)容災(zāi)和可用性。
集群部署時,幾點要求和建議:
- DB配置保持一致;
- 登陸賬號配置保持一致;
- 建議:推薦通過nginx為集群做負(fù)載均衡,分配域名。訪問、客戶端使用等操作均通過該域名進行。
4.3 跨機房(異地多活)
得益于服務(wù)注冊中心集群關(guān)系對等特性,集群各節(jié)點提供冪等的服務(wù)注冊服務(wù);因此,異地跨機房部署時,> 只需要請求本機房服務(wù)注冊中心即可,實現(xiàn)異地多活;
舉個例子:比如機房A、B 內(nèi)分別部署服務(wù)注冊中心集群節(jié)點。即機房A部署 a1、a2 兩個服務(wù)注冊中心服務(wù)節(jié)點,機房B部署 b1、b2 兩個服務(wù)注冊中心服務(wù)節(jié)點;
那么各機房內(nèi)應(yīng)用只需要請求本機房內(nèi)部署的服務(wù)注冊中心節(jié)點即可,不需要跨機房調(diào)用。即機房A內(nèi)業(yè)務(wù)應(yīng)用請求 a1、a2 獲取配置、機房B內(nèi)業(yè)務(wù)應(yīng)用 b1、b2 獲取配置。
這種跨機房部署方式實現(xiàn)了配置服務(wù)的 "異地多活",擁有以下幾點好處:
- 1、注冊服務(wù)響應(yīng)更快:注冊請求本機房內(nèi)搞定;
- 2、注冊服務(wù)更穩(wěn)定:注冊請求不需要跨機房,不需要考慮復(fù)雜的網(wǎng)絡(luò)情況,更加穩(wěn)定;
- 2、容災(zāi)性:即使一個機房內(nèi)服務(wù)注冊中心全部宕機,僅會影響到本機房內(nèi)應(yīng)用加載服務(wù),其他機房不會受到影響。
4.4 一致性
類似 Raft 方案,更輕量級、穩(wěn)定;
- Raft:Leader統(tǒng)一處理變更操作請求,一致性協(xié)議的作用具化為保證節(jié)點間操作日志副本(log replication)一致,以term作為邏輯時鐘(logical clock)保證時序,節(jié)點運行相同狀態(tài)機(state machine)得到一致結(jié)果。
- xxl-registry:
- Leader(統(tǒng)一處理分發(fā)變更請求):DB消息表(僅變更時產(chǎn)生消息,消息量較小,而且消息輪訓(xùn)存在間隔,因此消息表壓力不會太大;);
- state machine(順序操作日志副本并保證結(jié)果一直):順序消費消息,保證本地數(shù)據(jù)一致,并通過周期全量同步進一步保證一致性;
嚇得我趕緊看了下Raft方案??赐曛螅揖拖?,這系統(tǒng)用到Raft方案了嗎?
看起來非常高大上,異地多活,跨機房,容災(zāi),其實通過代碼閱讀,就可以很明顯看出來:這個系統(tǒng)是通過DB實例以及10秒一次的全量同步來保證一致性的。不管數(shù)據(jù)怎么變,數(shù)據(jù)庫中的xxl_registry_data表才是真正的注冊數(shù)據(jù)。只要隔一段時間同步這個表中的數(shù)據(jù),就行了。當(dāng)然,前提是只有一個DB實例(其實不是也可以)
那么,這個注冊中心的大致編程思想通過源碼閱讀結(jié)合官方文檔,已經(jīng)基本了解。
下一篇,我們來看看《分布式服務(wù)框架XXL-RPC》吧


