Dubbo的服務(wù)發(fā)布邏輯是比較復(fù)雜的,我還是以Dubbo自帶的示例講解,這樣更方便和容易理解。
Provider配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- 提供方應(yīng)用信息,用于計(jì)算依賴關(guān)系 -->
<dubbo:application name="hello-world-app" />
<!-- 使用multicast廣播注冊(cè)中心暴露服務(wù)地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234" />
<!-- 用dubbo協(xié)議在20880端口暴露服務(wù) -->
<dubbo:protocol name="dubbo" port="20880" />
<!-- 聲明需要暴露的服務(wù)接口 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
<!-- 和本地bean一樣實(shí)現(xiàn)服務(wù) -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />
</beans>
ApplicationContext
ClassPathXmlApplicationContext父類AbstractApplicationContext的方法refresh()在實(shí)例化bean之后的最后一步finishRefresh()中,此方法作用是發(fā)布相應(yīng)的事件。
protected void finishRefresh() {
//省略LifeCycleProcessor刷新代碼
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// 省略注冊(cè)到 LiveBeansView MBean代碼
}
可以看到發(fā)布了一個(gè)ContextRefreshedEvent事件。
protected void publishEvent(Object event, ResolvableType eventType) {
//省略部分代碼
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
//省略部分代碼
首先獲取ApplicationEvent事件廣播對(duì)象,然后廣播事件。
ApplicationEvent事件廣播對(duì)象默認(rèn)是SimpleApplicationEventMulticaster,這個(gè)對(duì)象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定義,可以實(shí)現(xiàn)接口ApplicationEventMulticaster,并將bean的名字命名為applicationEventMulticaster。
接下來看看SimpleApplicationEventMulticaster類的multicastEvent方法。
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
//事件類型
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
//applicationListener
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
//異常執(zhí)行
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
可以看到此方法會(huì)調(diào)用applicationListener的方法,對(duì)于Dubbo而言,就是ServiceBean.
怎么樣獲取到ServiceBean的呢?
ServiceBean實(shí)現(xiàn)了好幾個(gè)接口,其中有兩個(gè)接口ApplicationContextAware和ApplicationListener<ContextRefreshedEvent>,其中ApplicationContextAware使ServiceBean具有獲取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有響應(yīng)事件響應(yīng)的能力。dubbo實(shí)現(xiàn)ApplicationContextAware的目的是通過反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不實(shí)現(xiàn)ApplicationContextAware接口,spring也會(huì)將實(shí)現(xiàn)了ApplicationListener接口的bean添加到其listener列表中的,dubbo這樣做估計(jì)是向后兼容。
接著看invokeListener(listener, event);方法
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
//省略異常處理
}
}
invokeListener方法內(nèi)部調(diào)用了doInvokeListener方法,而doInvokeListener方法調(diào)用了listener(ServiceBean)的onApplicationEvent方法.
ServiceBean
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
onApplicationEvent方法調(diào)用了export方法,export方法首先判斷是否已經(jīng)發(fā)布了服務(wù),發(fā)布了則直接返回,沒有發(fā)布則會(huì)判斷是否需要延遲發(fā)布,如果需要延遲,則將發(fā)布服務(wù)做為一個(gè)任務(wù)添加到ScheduledThreadPoolExecutor線程池中,如果不延遲,則調(diào)用doExport方法立即發(fā)布服務(wù)。
doExport方法中會(huì)獲取application/registries/monitor/module/protocols,并做一些檢查和屬性填充,然后調(diào)用doExportUrls();發(fā)布服務(wù)。doExportUrls()首先調(diào)用loadRegistries方法得到要注冊(cè)的url,然后發(fā)布相關(guān)Protocol的服務(wù)。
簡單敘述一下獲取url的過程,url通過map組裝參數(shù)和對(duì)應(yīng)的值,參數(shù)有ApplicationConfig和RegistryConfig對(duì)象的屬性以及path、dubbo、timestamp、pid、protocol、registry。
本示例applicationConfig是:
<dubbo:application name="demo-provider" qosPort="22222" id="demo-provider" />
registryURL
registryConfig是:
最終map組裝結(jié)果是:<dubbo:registry address="multicast://224.5.6.7:1234" id="org.apache.dubbo.config.RegistryConfig" />

最后得到registryURL是:
然后調(diào)用doExportUrlsFor1Protocol方法發(fā)布服務(wù),此方法開始部分是構(gòu)造發(fā)布的服務(wù)URL,然后再發(fā)布url。
服務(wù)URL
URL包括以下幾部分:服務(wù)端還是客戶端標(biāo)識(shí),Dubbo版本,時(shí)間戳,Pid,服務(wù)的方法名,token、ApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig對(duì)象的相關(guān)屬性等。
例如本示例的url:
我們來著重看一下在構(gòu)造URL過程中port的獲取過程。
//protocolConfig是配置的<dubbo:protocol />生成的對(duì)象
//name是protocol的name,本示例為"dubbo"
//map保存了url的鍵值對(duì)
Integer port = this.findConfigedPorts(protocolConfig, name, map);
findConfigedPorts顧名思義是查找配置的port,從哪查呢,先從系統(tǒng)環(huán)境變量查,如果沒找到,再查找名字為name的protocol協(xié)義。
private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
Integer portToBind = null;
// 從環(huán)境變量從查找綁定的port
String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
portToBind = parsePort(port);
// 如果沒有從環(huán)境變量從查到,則從名稱為name的protocol查找
if (portToBind == null) {
portToBind = protocolConfig.getPort();
if (provider != null && (portToBind == null || portToBind == 0)) {
portToBind = provider.getPort();
}
//這一句是關(guān)鍵,示例中name值是"dubbo",所以會(huì)實(shí)例化DubboProtocol,得到默認(rèn)的port:20880
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (portToBind == null || portToBind == 0) {
portToBind = defaultPort;
}
if (portToBind == null || portToBind <= 0) {
portToBind = getRandomPort(name);
if (portToBind == null || portToBind < 0) {
portToBind = getAvailablePort(defaultPort);
putRandomPort(name, portToBind);
}
logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
}
}
//保存port到map中,以便后面url使用
map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));
// 從環(huán)境變量中查找注冊(cè)的port,如果沒有找到,則等于綁定的Port.
String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
Integer portToRegistry = parsePort(portToRegistryStr);
if (portToRegistry == null) {
portToRegistry = portToBind;
}
return portToRegistry;
}
有人或許有疑問,ServiceConfig在實(shí)例化時(shí),不是已經(jīng)加載過Protocol了嗎?為什么還要使用ExtensionLoader加載一次呢?
final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();答: ServiceConfig實(shí)例化時(shí),加載的Protocol是自適應(yīng)的Protocol,是動(dòng)態(tài)生成的,類名是Protocol$Adaptive(見Dubbo源碼分析-SPI的應(yīng)用中有分析)。而這里獲取Port時(shí)加載的也是Protocol類,但指名了具體加載的是哪個(gè)Protocol(本示例是名稱為dubbo的Protocol,即DubboProtocol,此類默認(rèn)的端口是20880)。
發(fā)布URL
發(fā)布本地服務(wù)
調(diào)用ServiceConfig類的exportLocal(URL url)發(fā)布本地服務(wù)。
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//本地服務(wù)url
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
本示例的本地服務(wù) url是:
重點(diǎn)看這一句:
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
其中涉及到ProxyFactory和Protocol,下面分別來看一看。
ProxyFactory
proxyFactory也是通過SPI加載的自適應(yīng)類對(duì)象,類名為ProxyFactory$Adaptive,我們來看一下其class文件反編譯后的源碼。
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements ProxyFactory {
public ProxyFactory$Adaptive() {
}
public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException {
if (var3 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var5 = var3.getParameter("proxy", "javassist");
if (var5 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
} else {
ProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);
return var6.getInvoker(var1, var2, var3);
}
}
}
public Object getProxy(Invoker var1, boolean var2) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var3 = var1.getUrl();
String var4 = var3.getParameter("proxy", "javassist");
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
} else {
ProxyFactory var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);
return var5.getProxy(var1, var2);
}
}
}
public Object getProxy(Invoker var1) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var2 = var1.getUrl();
String var3 = var2.getParameter("proxy", "javassist");
if (var3 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");
} else {
ProxyFactory var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);
return var4.getProxy(var1);
}
}
}
}
其中有三個(gè)方法,兩個(gè)獲取代理,一個(gè)獲取Invoker。我們來看其中的getInvoker方法,默認(rèn)獲取名稱為javassist的ProxyFactory。
由于本地服務(wù)URL中沒有proxy參數(shù),所以會(huì)調(diào)用JavassistProxyFactory的getInvoker(T proxy, Class<T> type, URL url)方法,返回AbstractProxyInvoker的匿名類對(duì)象,此對(duì)象代理了服務(wù)對(duì)象(本示例中為DemoServiceImpl對(duì)象)。
其實(shí)(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");獲取到的并不是JavassistProxyFactory對(duì)象,而是StubProxyFactoryWrapper對(duì)象,為什么呢?我們可以看下ExtensionLoader的getExtension(String name)方法
public T getExtension(String name) { //檢查name是否合法 if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); //如果name等于true,則加載SPI的默認(rèn)插件 if ("true".equals(name)) { return getDefaultExtension(); } //從當(dāng)前插件類的緩存實(shí)例對(duì)象中獲取 Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { //創(chuàng)建插件實(shí)例 instance = createExtension(name); holder.set(instance); } } } return (T) instance; } private T createExtension(String name) { //從文件目錄中加載插件類 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } //從已加載的所有插件實(shí)例集合中獲取 try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { //實(shí)例化插件實(shí)例,并放入集合 EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //注入屬性 injectExtension(instance); //插件的包裹類 Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && !wrapperClasses.isEmpty()) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }重點(diǎn)的地方就在于插件的包裹類,StubProxyFactoryWrapper就是JavassistProxyFactory的包裹類,為什么這么說呢,因?yàn)镾tubProxyFactoryWrapper有一個(gè)帶ProxyFactory參數(shù)的構(gòu)造函數(shù)而且實(shí)現(xiàn)了ProxyFactory接口,具體可以看Extension的loadExtensionClasses方法源碼(裝飾者模式)。
Protocol
protocol對(duì)象也是一個(gè)自適應(yīng)插件類,類名為Protocol$Adaptive,在上一篇文章中已有講解。這個(gè)類會(huì)根據(jù)url的協(xié)義取得對(duì)應(yīng)轉(zhuǎn)義的插件類,沒有的話,默認(rèn)為dubbo協(xié)義,本地服務(wù)url協(xié)義為injvm,所以會(huì)加載InjvmProtocol,但是在加載InjvmProtocol并實(shí)例化后,發(fā)現(xiàn)InjvmProtocol還有對(duì)應(yīng)的包裹類即(其實(shí)是所有Protocol的包裹類):ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper類的作用是添加一些過濾器,ProtocolListenerWrapper的作用是添加ExporterListener。InjvmProtocol的export方法僅僅創(chuàng)建一個(gè)InjvmExporter實(shí)例,沒有開啟服務(wù)。
發(fā)布遠(yuǎn)程服務(wù)
如果注冊(cè)u(píng)rl不為空,調(diào)用proxyFactory得到服務(wù)對(duì)象的代理類,然后使用protocol發(fā)布服務(wù)。由于注冊(cè)u(píng)rl的協(xié)義是registry,在使用ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("registry");會(huì)加載RegistryProtocol類并實(shí)例化,而且會(huì)添加其包裹類:ProtocolFilterWrapper和ProtocolListenerWrapper。而在這兩個(gè)包裹類的export方法的首行,都會(huì)對(duì)registry協(xié)義進(jìn)行單獨(dú)處理。
RegistryProtocol
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
經(jīng)過這兩個(gè)包裹類后,最終會(huì)調(diào)用RegistryProtocol的export方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//發(fā)布服務(wù)
//originInvoker中包含了代理服務(wù)對(duì)象的代理類
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//注冊(cè)相關(guān)代碼省略
//訂閱相關(guān)代碼省略
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
//key為發(fā)布的服務(wù)url
String key = getCacheKey(originInvoker);
//從map緩存中獲取
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
//double check
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
最重要的是這一句:
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
其中protocol也是Protocol$Adaptive對(duì)象,而invokerDelegete的URL是服務(wù)的url.
本示例中為:
DubboProtocol
Protocol$Adaptive在解析URL的時(shí)得到dubbo,所以會(huì)加載DubboProtocol并實(shí)例化(DubboProtocol實(shí)際在前面獲取默認(rèn)接口時(shí)已經(jīng)實(shí)例化并緩存起來了,此處取的是緩存的實(shí)例),并調(diào)用了DubboProtocol的export方法(與上面一樣,在得到DubboProtocol實(shí)例后,仍然會(huì)在外面包裹一下)。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 服務(wù)名:本例中為org.apache.dubbo.demo.DemoService:20880
String key = serviceKey(url);
//exporter 控制服務(wù)打開與關(guān)閉
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//省略發(fā)布子服務(wù)的相關(guān)代碼
//打開服務(wù)
openServer(url);
//優(yōu)化序列化處理
optimizeSerialization(url);
return exporter;
}
經(jīng)過層層探索,曲折迂回,終于到openServer了,進(jìn)去看看。
private void openServer(URL url) {
// 服務(wù)ip:端口號(hào)
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// 服務(wù)支持重置
server.reset(url);
}
}
}
可以看到其中有一個(gè)重要方法createServer(url)。
private ExchangeServer createServer(URL url) {
// 當(dāng)服務(wù)關(guān)閉時(shí),默認(rèn)啟動(dòng)發(fā)送只讀事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 默認(rèn)啟動(dòng)心跳
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//str默認(rèn)為netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
//添加編解碼器
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//啟動(dòng)服務(wù),并傳入請(qǐng)求處理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//判斷客戶端使用的是網(wǎng)絡(luò)傳輸層框架是否支持服務(wù)端的網(wǎng)絡(luò)傳輸層。
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers
進(jìn)入Exchangers.bind方法一探究竟。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//如果編碼碼器沒有,則添加參數(shù)exchange
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
getExchanger(url)默認(rèn)得到的是HeaderExchanger,可通過exchanger參數(shù)配置。
到HeaderExchanger中看看bind方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporter
看下Transporters的bind方法。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
通過getTransporter方法獲取一個(gè)自適應(yīng)的Transporter,類名為Transporter$Adaptive,我們來看一下其源碼:
package org.apache.dubbo.remoting;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adaptive implements Transporter {
public Transporter$Adaptive() {
}
public Client connect(URL var1, ChannelHandler var2) throws RemotingException {
if (var1 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var4 = var1.getParameter("client", var1.getParameter("transporter", "netty"));
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([client, transporter])");
} else {
Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
return var5.connect(var1, var2);
}
}
}
public Server bind(URL var1, ChannelHandler var2) throws RemotingException {
if (var1 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var4 = var1.getParameter("server", var1.getParameter("transporter", "netty"));
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([server, transporter])");
} else {
Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
return var5.bind(var1, var2);
}
}
}
}
可以看到Transporter$Adaptive通過判斷URL中是否有transporter參數(shù),如果沒有,就默認(rèn)為netty。
示例中服務(wù)的URL為
其中沒有transporter參數(shù),所以就使用netty。然后dubbo就去查找netty對(duì)應(yīng)的是哪個(gè)Transporter,結(jié)果找到是NettyTransporter。
package org.apache.dubbo.remoting.transport.netty4;
//省略導(dǎo)入部分
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
NettyTransporter很簡單,只有兩個(gè)方法,一個(gè)用于開啟服務(wù),一個(gè)用于連接服務(wù)。到這里已經(jīng)明白了Dubbo是如何發(fā)布一個(gè)服務(wù)的。
我們?cè)龠M(jìn)一步看下NettyServer的構(gòu)造函數(shù)
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
可以看出其調(diào)用父類的構(gòu)造函數(shù),并傳入url和handler的包裹類。handler的包裹類有哪些呢,進(jìn)去看一看。
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
注意到有一個(gè)接口Dispatcher,其自適應(yīng)插件類是AllDispatcher,AllDispatcher的dispatch方法返回AllChannelHandler實(shí)例(此實(shí)例會(huì)將所有請(qǐng)求做為任務(wù)放入線程池中處理),在此實(shí)例基礎(chǔ)上又包裹了HeartbeatHandler和MultiMessageHandler。
NettyServer會(huì)將MultiMessageHandler層層往上傳到其父類AbstractPeer。
我們來回憶一下正向流程:
從ServiceConfig發(fā)布registryURL開始(見doExportUrlsFor1Protocol方法)
1.ServiceConfig生成服務(wù)實(shí)例的代理工廠類JavassistProxyFactory(ProxyFactory SPI默認(rèn)代理工廠類)并包裹到DelegateProviderMetaDataInvoker(此類記錄代理工廠類和服務(wù)信息ServiceBean(<dubbo:service />標(biāo)簽對(duì)應(yīng)的類))
2.由于registryURL的protocol協(xié)義是registry,所以會(huì)加載RegistryProtocol(Protocol類的外面都包裹了ProtocolFilterWrapper和ProtocolListenerWrapper,下面不再特殊說明),并傳入上一步的invoker。
3.RegistryProtocol又找到DubboProtocol,也會(huì)帶上Invoker(此時(shí)的Invoker包含上一次的Invoker并帶有服務(wù)地址(dubbo://IP:端口/服務(wù)接口全稱?參數(shù)=xxx))。
所以requestHandler又會(huì)調(diào)用正向傳過來的Invoker,經(jīng)過ProtocolFilterWrapper和ProtocolListenerWrapper,最終調(diào)用到服務(wù)實(shí)現(xiàn)類相應(yīng)的方法。
最后以一張圖總結(jié):

標(biāo)識(shí)為SPI的類,是可以動(dòng)態(tài)加載的。圖片看不清楚的話,請(qǐng)查看原圖。
再簡單說下接收到請(qǐng)求后的處理流程:NettyServer接收到請(qǐng)求后,交給NettyServerHandler處理,NettyServerHandler轉(zhuǎn)交給NettyServer的父類AbstractPeer處理,AbstractPeer又交給MultiMessageHandler處理,這樣就開始了handler鏈的處理,handler的終點(diǎn)是HeaderExchangerHandler,HeaderExchangerHandler調(diào)用DubboProtocol傳過來的成員變量requestHandler調(diào)用相應(yīng)的服務(wù)類方法,然后得到結(jié)果,調(diào)用NettyServerHandler傳過來的NettyChannel發(fā)送結(jié)果到Client。
用力不如用心!用心寫好每一篇文章!