注:文章中使用的dubbo源碼版本為2.5.4
零、文章目錄
- Consumer發(fā)送請求
- Provider接收請求并發(fā)送響應(yīng)
- Consumer接收響應(yīng)
一、Consumer發(fā)送請求
1.1 代碼入口
- 在 dubbo剖析:二 服務(wù)引用 中講到,服務(wù)引用方根據(jù)引用接口
DemoService,使用dubbo的代理工廠類JavassistProxyFactory.getProxy()創(chuàng)建出該接口的動(dòng)態(tài)代理對象。 - 當(dāng)用戶想調(diào)用
DemoService的相關(guān)方法時(shí),實(shí)際是調(diào)用了代理對象的相關(guān)方法,從InvokerInvocationHandler.invoke()進(jìn)入Consumer請求發(fā)送流程。
1.2 整體流程

- 上圖從上往下展示了服務(wù)引用方發(fā)送一個(gè)RPC請求的關(guān)鍵步驟,經(jīng)歷了“代理層”、“集群層”、“過濾監(jiān)聽擴(kuò)展點(diǎn)”、“調(diào)用協(xié)議層”、“信息交換層”、“網(wǎng)絡(luò)傳輸層”。
- 紫色實(shí)線條表示各層關(guān)鍵類的方法調(diào)用,藍(lán)色虛線表示關(guān)鍵類的初始化過程。
1)代理執(zhí)行(InvokerInvocationHandler.invoke):
- 服務(wù)引用的過程中,由
ReferenceConfig使用JavassistProxyFactory為引用接口創(chuàng)建了代理對象; - 服務(wù)引用方調(diào)用dubbo代理類
DemoService.sayHello時(shí),實(shí)際執(zhí)行InvokerInvocationHandler.invoke()方法,即這是Consumer發(fā)送請求的起點(diǎn); -
InvokerInvocationHandler內(nèi)包含一個(gè)Invoker,在JavassistProxyFactory.getProxy()過程中通過其構(gòu)造器注入,該Invoker為一個(gè)集群路由功能的AbstractClusterInvoker;
2)集群容錯(cuò)+負(fù)載均衡(AbstractClusterInvoker.invoke):
- 服務(wù)引用的過程中,由
RegistryProtocol使用Cluster.join()創(chuàng)建集群Invoker,Cluster由ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable")動(dòng)態(tài)生成; - 集群
Invoker根據(jù)負(fù)載均衡算法有多種不同實(shí)現(xiàn)類(failover、failfast、failsafe、failback),具體使用哪一種由對應(yīng)的Cluster實(shí)現(xiàn)決定; -
AbstractClusterInvoker通過Directory.list()方法獲取請求路徑對應(yīng)的Invoker列表; -
AbstractClusterInvoker再通過LoadBalance.select()方法從多個(gè)Invoker中選取一個(gè)做本次調(diào)用,即負(fù)載均衡算法(Random、RoundRobin、LeastActive);
3)Filter鏈擴(kuò)展點(diǎn)(ProtocolFilterWrapper + ProtocolListenerWrapper):
- 在
ReferenceConfig進(jìn)行服務(wù)引用的過程中,通過refProtocol.refer()創(chuàng)建Invoker對象; -
refprotocol.refer()先后經(jīng)過修飾類ProtocolFilterWrapper、ProtocolListenerWrapper,最后執(zhí)行RegistryProtocol;ProtocolFilterWrapper和ProtocolListenerWrapper就是Dubbo引入的擴(kuò)展點(diǎn); - 擴(kuò)展點(diǎn)對請求發(fā)送和接收的核心功能流程無影響,目的是以插件的方式進(jìn)行一些輔助功能處理,這里不再進(jìn)一步展開;
4)調(diào)用協(xié)議層執(zhí)行(AbstractInvoker.invoke):
- 經(jīng)過集群路由和擴(kuò)展點(diǎn),現(xiàn)在將直接執(zhí)行
AbstractInvoker.invoke方法,開始真正的遠(yuǎn)程調(diào)用了; - 服務(wù)引用的過程中,由
RegistryDirectory使用Protocol.refer()創(chuàng)建遠(yuǎn)程執(zhí)行AbstractInvoker,Protocol默認(rèn)采用default實(shí)現(xiàn),即DubboProtocol; -
AbstractInvoker有多種協(xié)議的具體實(shí)現(xiàn)(dubbo、rmi、hessian、http),具體使用哪一種協(xié)議由對應(yīng)的Protocol實(shí)現(xiàn)決定,默認(rèn)采用dubbo協(xié)議為DubboInvoker; -
DubboInvoker中包含了ExchangeClient的引用,通過DubboInvoker的構(gòu)造器注入;
5)交換層執(zhí)行(ExchangeClient.request):
- 遠(yuǎn)程執(zhí)行
Invoker通過其引用的ExchangeClient.request完成遠(yuǎn)程調(diào)用請求的發(fā)送并得到ResponseFuture,然后調(diào)用ResponseFuture.get()得到 遠(yuǎn)程調(diào)用結(jié)果Result; - 服務(wù)引用的過程中,由
DubboProtocol使用Exchanger.connect()創(chuàng)建ExchangeClient; -
Exchanger的實(shí)現(xiàn)類為HeaderExchanger,由ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)動(dòng)態(tài)生成; -
ExchangeClient在Client的基礎(chǔ)上封裝了請求響應(yīng)模式(其以Request、Response、ResponseFuture為核心,后續(xù)單獨(dú)文章講解),這也是交換層的核心功能;
6)網(wǎng)絡(luò)層執(zhí)行(Client.send):
- 交換層
ExchangeClient.request封裝請求響應(yīng)模式后,最終依賴網(wǎng)絡(luò)層Client.send將請求消息通過網(wǎng)絡(luò)發(fā)送給服務(wù)提供方; - 服務(wù)引用的過程中,由
HeaderExchanger使用Transporter.connect()創(chuàng)建Client并完成初始連接操作,Client有多種網(wǎng)絡(luò)層實(shí)現(xiàn)(netty、mina...),具體使用哪一種由對應(yīng)的Transporter實(shí)現(xiàn)決定; -
Transporter有多種網(wǎng)絡(luò)層實(shí)現(xiàn)(netty、mina...),由ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()動(dòng)態(tài)生成,默認(rèn)為NettyTransporter; - 最后,
NettyClient使用其包含的底層NettyChannel完成網(wǎng)絡(luò)消息發(fā)送的功能;
二、Provider接收請求并發(fā)送響應(yīng)
2.1 代碼入口
- 在 dubbo剖析:一 服務(wù)發(fā)布 中講到,服務(wù)提供方通過
NettyServer完成服務(wù)端創(chuàng)建及監(jiān)聽工作。 - 在
NettyServer的doOpen()階段創(chuàng)建了網(wǎng)絡(luò)事件處理器NettyHandler,當(dāng)服務(wù)端收到客戶端消息時(shí),將觸發(fā)NettyHandler的messageReceived()方法。
2.2 整體流程

- 上圖從上往下表示了服務(wù)提供方接收到一個(gè)網(wǎng)絡(luò)請求時(shí)的處理步驟,經(jīng)歷了一個(gè)
Handler處理器鏈,鏈中的每個(gè)Handler負(fù)責(zé)實(shí)現(xiàn)自己的處理功能。
1)Netty網(wǎng)絡(luò)事件處理器(NettyHandler):
- 繼承自Netty的原生網(wǎng)絡(luò)時(shí)間處理器實(shí)現(xiàn)類
SimpleChannelHandler,定義了網(wǎng)絡(luò)建連(channelConnected)、斷連(channelDisconnected)、消息接收(messageReceived)、異常(exceptionCaught)等事件處理方法; - 維護(hù)了
<ip:port, channel>的對應(yīng)關(guān)系Map<String, Channel>channels,在網(wǎng)絡(luò)建連/斷連時(shí)進(jìn)行相應(yīng)put/remove操作,并暴露給NettyServer使用; - 接收到網(wǎng)絡(luò)消息時(shí),執(zhí)行
messageReceived()方法,將Netty的原生Channel轉(zhuǎn)換為Dubbo封裝的NettyChannel,并將事件傳遞給其包含的ChannelHandler處理;
2)復(fù)合消息處理器(MultiMessageHandler):
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
- 處理
MultiMessage,將其拆分成多個(gè)Message處理;
3)心跳消息處理器(HeartbeatHandler):
- 消息收發(fā)時(shí)重置當(dāng)前通道的最新消息收發(fā)時(shí)間,用于配合
HeaderExchangeServer和HeaderExchangeClient中的心跳檢測任務(wù)HeartBeatTask; - 攔截并處理心跳請求/響應(yīng)消息。對心跳請求消息,構(gòu)建對應(yīng)的心跳響應(yīng)消息并通過Channel發(fā)送回去;對心跳響應(yīng)消息,僅記錄日志后返回,不做功能上的處理;
4)業(yè)務(wù)線程轉(zhuǎn)換處理器(AllChannelHandler):
-
Dubbo通過該處理器完成了 IO線程 與 業(yè)務(wù)線程 的解耦! - 內(nèi)部封裝了業(yè)務(wù)線程池,默認(rèn)使用
FixedThreadPool;
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
注意點(diǎn):
a)線程池默認(rèn)業(yè)務(wù)線程數(shù)為200
b)隊(duì)列默認(rèn)采用SynchronousQueue
- 將接收到的網(wǎng)絡(luò)消息事件封裝成可執(zhí)行任務(wù)
ChannelEventRunnable,交由業(yè)務(wù)線程池處理;
5)業(yè)務(wù)解碼處理器(DecodeHandler):
- 進(jìn)行業(yè)務(wù)請求響應(yīng)的解碼工作;
- 對
Request和Response中攜帶的消息體或結(jié)果體,如果其實(shí)現(xiàn)了Decodeable接口,則進(jìn)行一次解碼處理;
6)交換層請求響應(yīng)處理器(HeaderExchangeHandler):
- 交換層真正完成請求響應(yīng)收發(fā)功能的處理器!
- 將網(wǎng)絡(luò)層
Channel轉(zhuǎn)換為交換層ExchangeChannel,為其增加了請求響應(yīng)方法request(); - 判斷收到的網(wǎng)絡(luò)消息類型,根據(jù)類型分別執(zhí)行不同的處理邏輯;
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
//case a: 請求響應(yīng)模型的請求處理
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
}
//case b: 單向消息接收的處理
else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//case c: 請求響應(yīng)模型的響應(yīng)處理
handleResponse(channel, (Response) message);
}
a)請求響應(yīng)模型的Request消息:調(diào)用ExchangeHandlerAdapter.reply()獲取執(zhí)行結(jié)果Result -->
將本地執(zhí)行結(jié)果Result封裝成RPC響應(yīng)Response --> 通過channel.send()發(fā)送RPC響應(yīng);
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
try {
// 調(diào)用```ExchangeHandlerAdapter.reply()```獲取執(zhí)行結(jié)果```Result```
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
//將本地執(zhí)行結(jié)果```Result```封裝成RPC響應(yīng)```Response```
return res;
}
b)單向請求消息的處理:調(diào)用ExchangeHandlerAdapter.received()處理請求消息,如果該消息是Invocation則執(zhí)行reply()邏輯但不主動(dòng)發(fā)送RPC響應(yīng)Response;
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
c)請求響應(yīng)模型的Response消息:調(diào)用DefaultFuture.received()處理響應(yīng)消息。
...注:請求響應(yīng)模型(Request,Response,DufaultFuture)相關(guān)后續(xù)專門分析,此處不展開...
7)真正本地實(shí)現(xiàn)類方法的執(zhí)行(ExchangeHandlerAdapter):
-
ExchangeHandlerAdapter由DubboProtocol創(chuàng)建,并實(shí)現(xiàn)了reply()方法; -
reply()方法,實(shí)際通過RPC調(diào)用參數(shù)Invocation從DubboProtocol.exporterMap中獲取到對應(yīng)的本地實(shí)現(xiàn)DubboExporter--> 進(jìn)而獲取到對應(yīng)的本地執(zhí)行AbstractProxyInvoker--> 最終通過AbstractProxyInvoker.invoke()方法,以反射的方式執(zhí)行真正實(shí)現(xiàn)類的對應(yīng)方法,完成RPC請求。
三、Consumer接收響應(yīng)
整體流程與 “Provider接收請求” 一樣,唯一的區(qū)別是在 交換層請求響應(yīng)處理器(HeaderExchangeHandler)步驟中會(huì)執(zhí)行 “分支c:請求響應(yīng)模型的Response消息”,將Response交由DefaultFuture處理。