請(qǐng)求開始
主要是區(qū)分是不是異步

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//構(gòu)造Future對(duì)象,并且放入到線程變量
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
//直接調(diào)用get,返回結(jié)果
return (Result) currentClient.request(inv, timeout).get();
}
網(wǎng)絡(luò)請(qǐng)求構(gòu)造及創(chuàng)建Future
返回上一步需要的Future對(duì)象。

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
// create request.在構(gòu)造函數(shù)里設(shè)置ID
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
執(zhí)行網(wǎng)絡(luò)請(qǐng)求
真正執(zhí)行數(shù)據(jù)發(fā)送,到這里發(fā)送階段就全部完成。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel#send

public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
}
if(! success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
接收數(shù)據(jù)解碼
接收數(shù)據(jù)最先進(jìn)行解碼工作。

請(qǐng)求ID的解碼,返回結(jié)果解碼
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
// 從頭中獲取請(qǐng)求ID
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcResult result;
//解碼工作是否在IO線程處理,默認(rèn)是true
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation)getRequestData(id), proto);
//執(zhí)行解碼
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
//設(shè)置返回結(jié)果
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
返回Message處理
如果是區(qū)分heartbeat與業(yè)務(wù)請(qǐng)求,將業(yè)務(wù)處理放到線程池執(zhí)行。

斷點(diǎn):com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
線程池中執(zhí)行 com.alibaba.dubbo.remoting.transport.DecodeHandler#decode
處理回調(diào)函數(shù)
斷點(diǎn):com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#doReceived

線程池運(yùn)行的任務(wù)com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
public static void received(Channel channel, Response response) {
try {
//根據(jù)ID獲取請(qǐng)求時(shí)放的Future對(duì)象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
參考
http://flychao88.iteye.com/blog/2190465
http://blog.csdn.net/qq418517226/article/details/51906357