引子:
- 在 dubbo剖析:一 服務發(fā)布 中,我們講到了
RegistryProtocol.export過程中有一個關鍵步驟,即調(diào)用doLocalExport(final Invoker<T> originInvoker)生成Exporter,其最終調(diào)用了DubboProtocol的export()方法。 -
DubboProtocol的export()方法:完成了 "啟動并監(jiān)聽網(wǎng)絡服務" 的工作,具體是通過HeaderExchanger的bind()方法創(chuàng)建了一個HeaderExchangerServer實現(xiàn)的。 - 本章我們就來介紹
HeaderExchangerServer的 設計架構(gòu) 和 功能實現(xiàn)。
一、入口流程
服務發(fā)布流程中,RegistryProtocol會調(diào)用到DubboProtocol的export()方法,用于完成網(wǎng)絡服務的啟動和監(jiān)聽。
1.1 入口代碼
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//step1 export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//...省略部分代碼...
//step2 創(chuàng)建ExchangeServer
openServer(url);
return exporter;
}
private void openServer(URL url) {
String key = url.getAddress();
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//調(diào)用createServer
serverMap.put(key, createServer(url));
} else {
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//...省略部分代碼,參數(shù)解析之類的...
ExchangeServer server;
try {
//關鍵代碼,使用HeaderExchanger.bind創(chuàng)建HeaderExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
return server;
}
1.2 流程圖解

DubboProtocol.export()流程圖
第一步:生成Exporter:
- 將
AbstractProxyInvoker作為構(gòu)造參數(shù),new出一個DubboExporter; -
DubboExporter維護了AbstractProxyInvoker的生命周期; - 服務url、
DubboExporter放入DubboProtocol的緩存map供消息接收處理使用;
第二步:創(chuàng)建ExchangeServer:
-
AbstractProxyInvoker中獲取服務url; - 服務url作為入?yún)?、同時使用
DubboProtocol包含的網(wǎng)絡事件處理器requestHandler,調(diào)用HeaderExchanger的bind()方法創(chuàng)建ExchangeServer; -
HeaderExchanger中又依賴了NettyTransporter,使用其bind()方法創(chuàng)建NettyServer; -
NettyServer是啟動網(wǎng)絡服務的核心類。HeaderExchangerServer使用NettyServer作為構(gòu)造參數(shù),擴展了它的功能;
二、server端網(wǎng)絡層結(jié)構(gòu)

server端網(wǎng)絡層類圖關系說明
2.1 網(wǎng)絡傳輸層
-
EndPoint為網(wǎng)絡端點的抽象接口,定義了獲取網(wǎng)絡端點地址、連接、及最原始的發(fā)送消息的方法。 -
ChannelHandler為網(wǎng)絡事件處理器接口,定義了Server端監(jiān)聽到各種類型的網(wǎng)絡事件時的處理方法(connected、disconnected、sent、received、caught),Netty中也有類似定義。 -
Server為網(wǎng)絡服務端的抽象接口,繼承了EndPoint的功能,并擴展了獲取與服務端建連的通道Channel的方法。 -
Transporter為網(wǎng)絡傳輸層的抽象接口,核心作用就是提供了創(chuàng)建Server和Client兩個核心接口實現(xiàn)類的方法。
2.2 信息交換層
-
ExchangeHandler,在ChannelHandler接口基礎上,添加了 響應請求 的方法。 -
ExchangeServer,在Server接口基礎上,將獲取Channel的方法擴展為獲取ExchangeChannel的方法。 -
Exchanger為信息交換層的抽象接口,核心作用就是提供了創(chuàng)建ExchangeServer和ExchangeClient兩個核心接口實現(xiàn)類的方法。
2.3 網(wǎng)絡通道Channel

網(wǎng)絡通道接口定義
-
Channel為網(wǎng)絡通道的抽象接口,繼承了EndPoint的功能,并擴展了綁定獲取屬性和獲取通道對端地址的方法。 -
ExchangeChannel,在Channel接口的基礎上,擴展了請求響應模式的功能,并能獲取綁定在通道上的網(wǎng)絡事件監(jiān)聽器。
三、HeaderExchangeServer & NettyServer實現(xiàn)詳解

Server實現(xiàn)層次結(jié)構(gòu)圖
3.1 網(wǎng)絡層
AbstractPeer類(網(wǎng)絡事件處理器和網(wǎng)絡節(jié)點的通用實現(xiàn)):
- 定義了屬性
ChannelHandler和URL,作為構(gòu)造方法入?yún)⒆⑷耄?/li> - 實現(xiàn)了
ChannelHandler和EndPoint接口,ChannelHandler接口的相關方法依賴其channelHandler屬性完成實現(xiàn);
AbstractEndPoint類(加入編解碼功能):
- 定義了構(gòu)造方法,入?yún)瑢傩?code>ChannelHandler和
URL; - 定義了屬性
Codec2,用于編解碼,通過SPI動態(tài)注入; - 定義了timeout/connectTimeout相關超時屬性,由
URL解析賦值; - 對外暴露了獲取
Codec2和超時相關屬性的方法,供上層依賴調(diào)用;
AbstractServer類(網(wǎng)絡服務端通用抽象,抽象出open、close、send的公共流程,并提供了doOpen和doClose的實現(xiàn)擴展):
- 定義了構(gòu)造方法,入?yún)瑢傩?code>ChannelHandler和
URL,并觸發(fā)doOpen()擴展; - 重寫
EndPoint接口的close()方法,觸發(fā)doClose()擴展; - 實現(xiàn)
EndPoint接口的send()方法,遍歷并調(diào)用Channel.send();
NettyServer類(網(wǎng)絡服務端Netty實現(xiàn)類,實現(xiàn)了doOpen、doClose、getChannels三個具體擴展):
- 實現(xiàn)了
doOpen()擴展方法,使用Netty的ServerBootstrap完成服務啟動監(jiān)聽,其網(wǎng)絡世界處理器為this包裝成的NettyHandler; - 實現(xiàn)了
doClose()擴展方法,調(diào)用Netty的boostrap、channel完成網(wǎng)絡資源釋放; - 實現(xiàn)了
getChannels()方法,channels的值由網(wǎng)絡事件處理器在connect、disconnect事件觸發(fā)時變動維護;
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
3.2 交換層
HeaderExchangeServer類(交換層服務端,將網(wǎng)絡層的Channel擴展為交換層的ExchangeChannel,并加入心跳檢測功能):
- 定義了構(gòu)造方法,入?yún)瑢傩?code>Server,用于實現(xiàn)服務端網(wǎng)絡層功能;
- 定義了屬性
定時任務線程池scheduled,用于執(zhí)行“定時心跳收發(fā)及心跳超時監(jiān)測”任務; - 定義了hearbeat/heartbeatTieout相關心跳屬性,由
URL解析賦值; - 構(gòu)造方法中啟動“定時心跳收發(fā)及心跳超時監(jiān)測”任務,超時時“Server斷連、Client斷連重連”;
- 將網(wǎng)絡層
Channel擴展為交換層ExchangeChannel,具體實現(xiàn)后續(xù)另辟章節(jié);