按照dubbo官網(wǎng)的介紹,如下
Apache Dubbo 是一款高性能、輕量級(jí)的開(kāi)源 Java 服務(wù)框架
記得最開(kāi)始,dubbo是把自己定位成一款高性能的rpc框架,我們現(xiàn)在還是按照rpc的定位進(jìn)行分析,dubbo的整個(gè)對(duì)外的框架非常的簡(jiǎn)單對(duì)稱(chēng)美,如下

但是內(nèi)部的實(shí)現(xiàn)非常的復(fù)雜,如下

優(yōu)秀的框架都是相似的,整體對(duì)外的框架非常的簡(jiǎn)單,但是內(nèi)部設(shè)計(jì)非常的復(fù)雜,將簡(jiǎn)單留給客戶(hù),將復(fù)雜封裝給自己。
Make It Simple
按照上圖的介紹,除了service和config層,其它的層都是spi,所謂spi就是支持客戶(hù)自定義的替換,等于說(shuō)整個(gè)duubo可以認(rèn)為是個(gè)戴高樂(lè)積木,而每一層的實(shí)現(xiàn)都是可以替換成用戶(hù)自己的技術(shù)棧。這樣也方便各個(gè)公司在引入dubbo的時(shí)候進(jìn)行定制化的改造。
整個(gè)調(diào)用鏈從上往下分為十層,下面依次簡(jiǎn)單的介紹
1 config層,如下圖,dubbo提供了對(duì)模塊的配置能力,最重要的是ServiceConfig(provide端)與ReferenceConfig(consumer端)的配置能力,當(dāng)然還有MonitorConfig(監(jiān)控中心的配置),ApplicationConfig(全局的應(yīng)用配置),RegistryConfig(注冊(cè)中心的配置),ProtocolConfig(協(xié)議配置)等,dubbo中提供的配置類(lèi)圖依賴(lài)如下

2 proxy代理層,主要是為了服務(wù)接口的透明代理,對(duì)外提供方便和透明的引用,生成服務(wù)的客戶(hù)端的stub和服務(wù)端的Skeleton,dubbo中提供的類(lèi)圖依賴(lài)如下

3 registry 注冊(cè)中心層:封裝服務(wù)地址的注冊(cè)與發(fā)現(xiàn),以服務(wù) URL 為中心,擴(kuò)展接口為 RegistryFactory, Registry, RegistryService,dubbo支持多注冊(cè)中心,dubbo中針對(duì)RegistryFactory的類(lèi)依賴(lài)圖如下

4 cluster 路由層:封裝多個(gè)提供者的路由及負(fù)載均衡,并橋接注冊(cè)中心,以 Invoker 為中心,擴(kuò)展接口為 Cluster, Directory, Router, LoadBalance,針對(duì)Cluster的實(shí)現(xiàn)類(lèi)圖如下

默認(rèn)為FailOverCluster(失敗重試),當(dāng)然我們可以配置自己的策略
5monitor 監(jiān)控層 略
6 protocol 遠(yuǎn)程調(diào)用層:封裝 RPC 調(diào)用,以 Invocation, Result 為中心,擴(kuò)展接口為 Protocol, Invoker, Exporter,我們代碼的大部分都在這一層進(jìn)行分析,Protocol的類(lèi)圖依賴(lài)關(guān)系如下

7 exchange 信息交換層:封裝請(qǐng)求響應(yīng)模式,同步轉(zhuǎn)異步,以 Request, Response 為中心,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer,主要是對(duì)transport層的request和response的封裝。
8 transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codec
9 serialize 數(shù)據(jù)序列化層
針對(duì)上面的類(lèi)圖的依賴(lài),我們可以發(fā)現(xiàn)依賴(lài)圖大多都是矮胖的,等于說(shuō)dubbo給我們默認(rèn)的實(shí)現(xiàn)了很多的策略,我們只需要按需取用即可(策略模式)
整個(gè)源碼包的模塊如下

。
dubbo主要是服務(wù)的暴露和發(fā)現(xiàn)調(diào)用,整個(gè)服務(wù)的暴露時(shí)序圖如下

而服務(wù)的發(fā)現(xiàn)引用調(diào)用時(shí)序圖如下

DDD
在 Dubbo 的核心領(lǐng)域模型中:
- Protocol 是服務(wù)域,它是 Invoker 暴露和引用的主功能入口,它負(fù)責(zé) Invoker 的生命周期管理??梢哉J(rèn)為Protocol實(shí)現(xiàn)了對(duì)Invoke的封裝
- Invoker 是實(shí)體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn),也可能一個(gè)集群實(shí)現(xiàn)。
- Invocation 是會(huì)話(huà)域,它持有調(diào)用過(guò)程中的變量,比如方法名,參數(shù)等。
我們的代碼的分析也主要集中在Protocol和Invoker上面。
當(dāng)我們希望將某個(gè)service暴露成dubbo接口的時(shí)候,只需要使用dubbo的注解@service即可,然后該service會(huì)被注冊(cè)到Spring里面成為一個(gè)bean,同時(shí)也會(huì)生成一個(gè)ServiceBean,其refer這個(gè)bean,所以我們的分析也重點(diǎn)在ServiceBean,ServiceBean的繼承關(guān)系如下

由于ServiceBean實(shí)現(xiàn)了InitializingBean,所以在其屬性填充完畢之后,執(zhí)行afterPropertiesSet(),在afterPropertiesSet主要是對(duì)各種配置進(jìn)行檢查填充和校驗(yàn)。
而又由于ServiceBean實(shí)現(xiàn)了ApplicationListener<ContextRefreshedEvent>,所以在監(jiān)聽(tīng)到ContextRefreshedEvent之后,如下
Class ServiceBean
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();//暴露和注冊(cè)服務(wù)
}
}
可以看到export方法是其最核心的方法。我們接下來(lái)對(duì)export方法進(jìn)行進(jìn)一步的分析。
export是在ServiceBean的父類(lèi)ServiceConfig里面實(shí)現(xiàn)的,如下
Class ServiceConfig
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
//是否需要延遲暴露,如果需要的話(huà)使用線(xiàn)程池異步線(xiàn)程實(shí)現(xiàn)暴露
//如果在啟動(dòng)的時(shí)候比較慢,可以設(shè)置延遲暴露的方式
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
protected synchronized void doExport() {
****
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//核心
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
在doExport方法里面又做了很多的校驗(yàn),而最重要的方法就是doExportUrls。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
如上我們可以看到,dubbo是支持多注冊(cè)中心和多protocol的,針對(duì)每一個(gè)注冊(cè)地址,每一個(gè)protocol,都會(huì)調(diào)用一次 doExportUrlsFor1Protocol(protocolConfig, registryURLs)。
一般來(lái)來(lái)說(shuō),不是非常特殊的場(chǎng)景,我們一般都是單注冊(cè)中心(默認(rèn)是zk),單Protocol(dubbo,注意這里是rpc協(xié)議),如下如,我的電腦就是配置的單注冊(cè)中心,單rpc協(xié)議(dubbo)

而接下來(lái)的doExportUrlsFor1Protocol方法寫(xiě)的比較混亂,代碼比較長(zhǎng),我們一行行看一下。
Class ServiceConfig
doExportUrlsFor1Protocol 分段分析如下
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application); //收集application配置信息
appendParameters(map, module); //收集module配置信息
appendParameters(map, provider, Constants.DEFAULT_KEY); //收集provider配置信息
appendParameters(map, protocolConfig); //收集protocolConfig配置信息
appendParameters(map, this); //收集ServiceConfig配置信息
可以看到依次將 application --->module--->provider--->protocolConfig--->ServiceConfig
的信息收集到map里面來(lái),如果存在重復(fù)的配置信息,后面的配置會(huì)覆蓋前面的配置,所以針對(duì)一些全局的缺省配置我們可以配置在前面,而一些很細(xì)節(jié)的配置,我們配置在后面即可。
在ServiceConfig里面可以針對(duì)具體的method進(jìn)行進(jìn)一步的配置,如使用注解配置,樣例如下
@Service(methods = [@Method(name = "orderCancel",retries = 2)])
在進(jìn)行服務(wù)暴露的過(guò)程中,有一段代碼如下
exportLocal(url);
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
其中
protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
會(huì)將ref就是真實(shí)處理的bean,stub interfaceClass 和local url進(jìn)行封裝成一個(gè)Invoker就行暴露,而在dubbo中最重要對(duì)象就是這個(gè)Invoker。
而在暴露Invoker的時(shí)候,首先要拿到proxyFactory,其中proxyFactory的初始化語(yǔ)句如下
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
這個(gè)proxyFactory是動(dòng)態(tài)生成的(也是真牛)手動(dòng)的生成java文件,然后編譯加載,這種使用代碼的方式來(lái)動(dòng)態(tài)生成代碼的方式更加的靈活,在我的電腦上,動(dòng)態(tài)生成的這個(gè)proxyFactory的代碼如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0, boolean arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
從上面的代碼我們可以認(rèn)為 ProxyFactory$Adaptive就是對(duì)JavassistProxyFactory的簡(jiǎn)單的代理。
而在拿到Invoke之后,使用protocol對(duì)其生命周期進(jìn)行管理,我們使用同樣的方法,看protocol動(dòng)態(tài)生成的代碼如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
有上面的代碼我們可以知道,最終會(huì)調(diào)用InjvmProtocol的export方法,分析到最后我們發(fā)現(xiàn)其實(shí)這個(gè)方法將該service在當(dāng)前的jvm中暴露出來(lái)。
最重要的是接下的在注冊(cè)中心的暴露,源碼如下
Class ServiceConfig
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
在我 的本機(jī)上registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())的結(jié)果是
registry://***:2181/com.alibaba.dubbo.registry.RegistryService?application=contract-logistics-wms&backup=****:2181,***:2181&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.131.127%3A20880%2Fcom.**.wms.dubbo.service.OrderCancelService%3Fanyhost%3Dtrue%26application%3Dcontract-logistics-wms%26bean.name%3DServiceBean%3Acom.zto.wms.dubbo.service.OrderCancelService%26bind.ip%3D10.10.131.127%26bind.port%3D20880%26default.delay%3D-1%26default.retries%3D2%26default.service.filter%3DCatTransaction%26delay%3D-1%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.zto.wms.dubbo.service.OrderCancelService%26logger%3Dslf4j%26methods%3DorderCancel%26orderCancel.retries%3D2%26orderCancel.return%3Dtrue%26pid%3D24140%26side%3Dprovider%26timestamp%3D1618222873762&logger=slf4j&pid=24140®ister=true®istry=zookeeper&subscribe=true×tamp=1618222873751
最終拿到的invoker如下圖

第二句
DelegateProviderMetaDataInvoker(invoker, this);
其實(shí)就是將當(dāng)前的serviceConfig最為metadata跟invoke一起封裝起來(lái)(典型的裝飾模式)
第三句
Exporter<?> exporter = protocol.export(wrapperInvoker)
根據(jù)前面的分析,最終會(huì)調(diào)到RegistryProtocol的export方法,其代碼如下
Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
在該方法中主要分兩步
第一步 將服務(wù)在本地暴露出來(lái)(如果不暴露出來(lái),別人無(wú)法調(diào)用)
第二步 將服務(wù)注冊(cè)到zk(如果不注冊(cè),別人不知道有這個(gè)服務(wù))
先看第一步
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//這一步是關(guān)鍵,由于invokerDelegete.getUrl.getProtol = "dubbo"
//所以最終的調(diào)用了DubboProtocol的export方法
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
我們跟到DubboProtocol的export的方法里面去,代碼如下
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//最重要的是這句
openServer(url);
optimizeSerialization(url);
return exporter;
}
我們跟到openServer(url)里面去
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
首先看當(dāng)前ip:port下是否開(kāi)啟了本地服務(wù),如果沒(méi)有那么調(diào)用createServer,
在dubbo中默認(rèn)使用的是netty server,代碼如下
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//這句綁定url(包含ip和port)和請(qǐng)求處理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Class Exchangers
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
Class HeaderExchanger
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
而getTransporter()最終會(huì)返回netty4 的NettyTransporter
如下
Class NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
Class NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
Class AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//這句最重要
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
如果你能看到這里,真的很佩服你,終于看到了我們熟悉的netty代碼,后面我們會(huì)專(zhuān)門(mén)的針對(duì)netty做介紹(現(xiàn)在我還了解的不深入)
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
通過(guò)如上的操作之后,在本地起了個(gè)netty服務(wù),理論上我們現(xiàn)在就可以接受rpc請(qǐng)求了,但是我們還需要回到開(kāi)頭,將服務(wù)注冊(cè)到zk上面去,回到開(kāi)頭。
@Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 前面講的一大堆就是講的這個(gè)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//找到注冊(cè)的zk地址
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
最終在ZookeeperRegistry調(diào)用doRegister進(jìn)行注冊(cè)
代碼如下
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
經(jīng)過(guò)上面的操作之后,最終會(huì)在
/dubbo/com.***.wms.dubbo.service.OrderCancelService/providers 永久節(jié)點(diǎn)下注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)信息,巧妙的利用了zk的永久節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)的特征,可以動(dòng)態(tài)的增減注冊(cè)信息。
然后在此path下注冊(cè)監(jiān)聽(tīng)器防止暴露的url被重寫(xiě)(這個(gè)邏輯可以先忽略,我也沒(méi)搞清楚)
最終返回一個(gè)DestroyableExporter,可以在返回的時(shí)候取消所有注冊(cè)信息。
如上就是一個(gè)dubbo service暴露的全過(guò)程,后面我們接著介紹一個(gè)service unexport的全過(guò)程。