通過查閱官網(wǎng)可知,服務(wù)注冊(cè)實(shí)際上就是向Nacos服務(wù)端發(fā)起一個(gè)http請(qǐng)求。

對(duì)應(yīng)的controller(InstanceController)如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT)
public class InstanceController {
。。。
}
1. 客戶端服務(wù)注冊(cè)流程
- nacos-discovery-spring-boot-starter 啟動(dòng)服務(wù)通過自動(dòng)裝配功能裝配nacos客戶端。
- Nacos自動(dòng)配置服務(wù)實(shí)現(xiàn)Spring的應(yīng)用監(jiān)聽器用來(lái)注冊(cè)nacos服務(wù)。
- 監(jiān)聽到spring的WebServerInitializedEvent事件后把springboot服務(wù)注冊(cè)到nacos注冊(cè)中心。
- 調(diào)用nacos-client jar包中的com.alibaba.nacos.client.naming.net.NamingProxy#registerService完成服務(wù)注冊(cè)。
以上為spring Boot自動(dòng)裝配原理以及spring容器啟動(dòng)時(shí)監(jiān)聽器的原理,不做過多解釋。
registerService對(duì)應(yīng)代碼如下:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map<String, String> params = new HashMap<>(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); //發(fā)送http請(qǐng)求向服務(wù)端進(jìn)行注冊(cè)。
}
2. 服務(wù)端注冊(cè)邏輯
調(diào)用鏈路如下:
com.alibaba.nacos.naming.controllers.InstanceController#register
->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
->com.alibaba.nacos.naming.core.ServiceManager#addInstance
->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//注冊(cè)邏輯
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
instance.getIp(), instance.getPort()));
return "ok";
}
接著往下走進(jìn)入到InstanceOperatorServiceImpl類中

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
}
進(jìn)入到ServiceManager類中
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
checkServiceIsNull(service, namespaceId, serviceName);
//添加到注冊(cè)表里
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
在繼續(xù)跟進(jìn)addInstance方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//加入到注冊(cè)表中的邏輯
consistencyService.put(key, instances);
}
}
依據(jù)調(diào)用鏈路走到這里
@Override
public void put(String key, Record value) throws NacosException {
//添加注冊(cè)表
onPut(key, value);
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
//集群架構(gòu)下進(jìn)行數(shù)據(jù)同步的邏輯,此分支可以先不看
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//這里就是將instance保存在一個(gè)map中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加客戶端信息到阻塞隊(duì)列
notifier.addTask(key, DataOperation.CHANGE);
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
//阻塞隊(duì)列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
//既然是一個(gè)線程類,那么就首先看run方法,DistroConsistencyServiceImpl初始化的時(shí)候會(huì)將Notifier
//提交到只有一個(gè)線程的線程池中去處理
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
// 拿出阻塞隊(duì)列中的客戶端信息進(jìn)行處理
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
//遍歷所有的實(shí)例
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//如果實(shí)例信息發(fā)生了改變
if (action == DataOperation.CHANGE) {
//在onPut方法中已經(jīng)將instance放入到一個(gè)dataStore的map中,if條件滿足則取出來(lái)對(duì)ip地址進(jìn)行修改
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
Nacos采用阻塞隊(duì)列加Notifier 的形式,完成異步注冊(cè)架構(gòu),這樣做的好處在于:提高注冊(cè)的并發(fā),對(duì)于客戶端來(lái)說就是阻塞狀態(tài),啟動(dòng)速度變慢,對(duì)于正常的功能沒有任何影響,而且大多數(shù)項(xiàng)目中的服務(wù)數(shù)量也不可能存在將阻塞隊(duì)列裝滿的情況。
后續(xù)調(diào)用鏈路
->com.alibaba.nacos.naming.core.Service#onChange
->com.alibaba.nacos.naming.core.Service#updateIPs
->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
//如果為ephemeral 則復(fù)制出一份副本
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
//復(fù)制操作
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//基于oldIpMap 即復(fù)制出來(lái)的 進(jìn)行注冊(cè)操作,并不是復(fù)制出整個(gè)注冊(cè)表,而是只復(fù)制了實(shí)例的set集合
List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
。。。
//最終將 toUpdateInstances 賦值給ephemeralInstances 或者 persistentInstances
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
// Set<Instance> ephemeralInstances = new HashSet<>(); 真正存放instance的地方
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
此處復(fù)制出一個(gè)map的作用就是為了提高并發(fā)讀寫能力,利用cow的思想免除了了加鎖的開銷,也可以避免消費(fèi)端從注冊(cè)中心中讀取到臟數(shù)據(jù)。又因?yàn)槌跏蓟臅r(shí)候只會(huì)初始化一次,所以也只有一個(gè)線程來(lái)處理隊(duì)列中的任務(wù),所以也不會(huì)出現(xiàn)覆蓋問題。
3. Nacos注冊(cè)表結(jié)構(gòu)

舉例說明
