provider、consumer通信原理


Consumer發(fā)送原理
-->Result result = invoker.invoke(invocation)
--------------------------------------------------------------------------擴(kuò)展點(diǎn)----------------
-->InvokerWrapper.invoke
-->ProtocolFilterWrapper.invoke
-->ConsumerContextFilter.invoke
-->ProtocolFilterWrapper.invoke
-->MonitorFilter.invoke
-->ProtocolFilterWrapper.invoke
-->FutureFilter.invoke
-->ListenerInvokerWrapper.invoke
-->AbstractInvoker.invoke
---------------------------------------------------------------------------擴(kuò)展點(diǎn)---------------
-->doInvoke(invocation)
-->DubboInvoker.doInvoke//為什么DubboInvoker是個(gè)protocol? 因?yàn)镽egistryDirectory.refreshInvoker.toInvokers: protocol.refer
-->ReferenceCountExchangeClient.request
-->HeaderExchangeClient.request
-->HeaderExchangeChannel.request
-->NettyClient.send
-->AbstractPeer.send
-->NettyChannel.send
-->ChannelFuture future = channel.write(message);//最終的目的:通過netty的channel發(fā)送網(wǎng)絡(luò)數(shù)據(jù)
consumer的RegistryDirectory創(chuàng)建DubboInvoker是在zk配置發(fā)生變化回調(diào)RegistryDirectory.notify的時(shí)候。創(chuàng)建DubboInvoker使用的是protocol.refer(serviceType, url)方法,Protocol$Adpative會(huì)有一些Wapper給DubboInvoker添加很多包裝類,所以在調(diào)用鏈中會(huì)有一些filter,這和服務(wù)發(fā)布的時(shí)候是一樣的
#com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
......
}
#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
ResponseFuture future = currentClient.request(inv, timeout) ;
......
}
provider接收、處理、返回
NettyHandler.messageReceived
-->AbstractPeer.received
-->MultiMessageHandler.received
-->HeartbeatHandler.received
-->AllChannelHandler.received
-->ChannelEventRunnable.run //線程池 執(zhí)行線程
-->DecodeHandler.received
-->HeaderExchangeHandler.received
-->handleRequest(exchangeChannel, request)//網(wǎng)絡(luò)通信接收處理
-->DubboProtocol.reply
-->getInvoker
-->exporterMap.get(serviceKey)//從服務(wù)暴露里面提取
-->DubboExporter.getInvoker()//最終得到一個(gè)invoker
-------------------------------------------------------------------------擴(kuò)展點(diǎn)--------------
-->ProtocolFilterWrapper.invoke
-->EchoFilter.invoke
-->ClassLoaderFilter.invoke
-->GenericFilter.invoke
-->TraceFilter.invoke
-->MonitorFilter.invoke
-->TimeoutFilter.invoke
-->ExceptionFilter.invoke
-->InvokerWrapper.invoke
-------------------------------------------------------------------------擴(kuò)展點(diǎn)--------------
-->AbstractProxyInvoker.invoke
-->JavassistProxyFactory.AbstractProxyInvoker.doInvoke
--> 進(jìn)入真正執(zhí)行的實(shí)現(xiàn)類 DemoServiceImpl.sayHello
....................................
-->channel.send(response);//把接收處理的結(jié)果,發(fā)送回去
-->AbstractPeer.send
-->NettyChannel.send
-->ChannelFuture future = channel.write(message);//數(shù)據(jù)發(fā)回consumer
Consumer的接收原理
//consumer的接收原理
NettyHandler.messageReceived
-->AbstractPeer.received
-->MultiMessageHandler.received
-->HeartbeatHandler.received
-->AllChannelHandler.received
-->ChannelEventRunnable.run //線程池 執(zhí)行線程
-->DecodeHandler.received
-->HeaderExchangeHandler.received
-->handleResponse(channel, (Response) message);
-->HeaderExchangeHandler.handleResponse
-->DefaultFuture.received
-->DefaultFuture.doReceived
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
發(fā)送數(shù)據(jù)是異步的,可以參考netty的案例,通過channel發(fā)送數(shù)據(jù)不能直接拿到結(jié)果,必須通過epoll中的等待回調(diào)事件再取得數(shù)據(jù)
數(shù)據(jù)通信的基本類
NettyChannel
包含一個(gè)send(Object message, boolean sent)方法,內(nèi)置了一個(gè)netty自己的channel可直接發(fā)送數(shù)據(jù)NettyHandler
與原生netty的事件交互,獲取到netty的事件后會(huì)回調(diào)上層hook的handlerChannelEventRunnable
接收到的數(shù)據(jù)的處理DecodeHandler
數(shù)據(jù)的解碼器HeaderExchangeHandler
是一個(gè)中間層,負(fù)責(zé)將解析好的數(shù)據(jù)與上層業(yè)務(wù)的邏輯的流轉(zhuǎn)
handleRequest,handleResponse

異步轉(zhuǎn)同步
dubbo的consumer發(fā)送請求是非阻塞的,不會(huì)等待返回值。provider接收是阻塞的會(huì)等待provider調(diào)用invoker處理完直接返回給consumer。
public void start() throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i ++) {
try {
String hello = demoService.sayHello("world" + i);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
}
}
dubbo 是基于netty NIO的非阻塞并行調(diào)用通信。(阻塞 非阻塞 異步 同步 區(qū)別 )dubbo 的通信方式 有3類類型:
異步,有返回值
<dubbo:method name="sayHello" async="true"> </dubbo:method>
Future<String> temp= RpcContext.getContext().getFuture(); hello=temp.get();異步,無返回值
<dubbo:method name="sayHello" return="false"></dubbo:method>異步,變同步(默認(rèn)的通信方式)
A.當(dāng)前線程怎么讓它 “暫停,等結(jié)果回來后,再執(zhí)行”?
B.socket是一個(gè)全雙工的通信方式,那么在多線程的情況下,如何知道那個(gè)返回結(jié)果對應(yīng)原先那條線程的調(diào)用?
通過一個(gè)全局唯一的ID來做consumer 和 provider 來回傳輸。
單工 全雙工 半雙工 區(qū)別
- 單工
在同一時(shí)間只允許一方向另一方傳送信息,而另一方不能向一方傳送 - 全雙工
是指在發(fā)送數(shù)據(jù)的同事也能夠接收數(shù)據(jù),兩者同步進(jìn)行,這好像我們平時(shí)打電話一樣,說話的同事也能夠聽到對方的聲音。目前的網(wǎng)卡一般都支持全雙工 - 半雙工
所謂的半雙工就是指一個(gè)時(shí)間段內(nèi)只有一個(gè)動(dòng)作發(fā)生,舉個(gè)簡單例子,一條窄窄的馬路,同事只能有一輛車通過,當(dāng)目前有兩輛車對開,這種情況下就只能一輛車先過,到頭后另一輛車再開,這個(gè)例子就形象的說明了半雙工的原理。
同步、異步實(shí)現(xiàn)
異步.有返回值時(shí)會(huì)將能獲取結(jié)果的future放在上下文變量中,如果需要取結(jié)果直接從future中阻塞獲取就可以了;異步,無返回值,則不再處理;同步,有返回值會(huì)直接調(diào)用future.get
#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
//是否有返回值
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
//2.異步,無返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
//ReferenceCountExchangeClient
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//1.異步.有返回值
//ReferenceCountExchangeClient 發(fā)送請求
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//
//3. 異步,變同步(默認(rèn)的通信方式)
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
- 生成future
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
//AbstractPeer
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
- 阻塞
可以看到所有的DefaultFuture都會(huì)被維護(hù)到FUTURES這個(gè)map中,而且request.getId()為key。當(dāng)我們調(diào)用future.get獲取結(jié)果的時(shí),會(huì)循環(huán)判斷當(dāng)前response是否為空,如果為空就當(dāng)前線程一直等待在done上,直到超時(shí)為止。
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response;
public DefaultFuture(Channel channel, Request request, int timeout){
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
public boolean isDone() {
return response != null;
}
}
- 喚醒
當(dāng)NettyHandler獲取到數(shù)據(jù)之后傳遞給HeaderExchangeHandler后,會(huì)調(diào)用DefaultFuture的靜態(tài)方法received來設(shè)置response。會(huì)根據(jù)response的id從FUTURES中獲取創(chuàng)建時(shí)緩存的future實(shí)例。如果獲取到了就調(diào)用future.doReceived(response)來設(shè)置返回值,并且通知等待在done上的線程,這時(shí)之前阻塞在future.get()方法上的線程就會(huì)立即返回。
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
#com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}