開(kāi)篇
- 本篇是基于Dubbo-2.6.7版本的異步調(diào)用的分析,在這個(gè)過(guò)程中會(huì)涉及異步的調(diào)用過(guò)程和響應(yīng)過(guò)程的分析。
- 文章中會(huì)有一部分簡(jiǎn)單的例子,用于講解異步的是使用方式。
異步調(diào)用說(shuō)明

Dubbo 異步調(diào)用過(guò)程
- 關(guān)注userThread的行為,用戶發(fā)出調(diào)用后,IOThread會(huì)在上下文RpcContext中設(shè)置Future,對(duì)應(yīng)上圖中步驟1.2.3。
- 用戶從RpcContext中取得Future,然后wait這個(gè)Future其它的事情都由IOThread完成,對(duì)應(yīng)上圖中步驟4.5。
- server端響應(yīng)后會(huì)把調(diào)用結(jié)果設(shè)置在RpcContext上下文當(dāng)中,同時(shí)通知UserThread線程。
異步回調(diào)使用案例
<dubbo:reference id="fooService" interface="com.alibaba.foo.FooService">
<dubbo:method name="findFoo" async="true" />
</dubbo:reference>
<dubbo:reference id="barService" interface="com.alibaba.bar.BarService">
<dubbo:method name="findBar" async="true" />
</dubbo:reference>
// 此方法應(yīng)該返回Foo,但異步后會(huì)立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到當(dāng)前調(diào)用的Future實(shí)例,當(dāng)發(fā)生新的調(diào)用時(shí)這個(gè)東西將會(huì)被覆蓋
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
// 調(diào)用另一個(gè)服務(wù)的方法
barService.findBar(barId);
// 立刻得到當(dāng)前調(diào)用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 此時(shí),兩個(gè)服務(wù)的方法在并發(fā)執(zhí)行
// 等待第一個(gè)調(diào)用完成,線程會(huì)進(jìn)入Sleep狀態(tài),當(dāng)調(diào)用完成后被喚醒。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一個(gè)調(diào)用需要等待5秒,第二個(gè)等待6秒,則整個(gè)調(diào)用過(guò)程完成的時(shí)間是6秒。
- 1、異步調(diào)用的實(shí)現(xiàn)步驟先執(zhí)行fooService.findFoo()的執(zhí)行服務(wù)調(diào)用。
- 2、獲取RPC上下文RpcContext.getContext().getFuture()。
- 3、通過(guò)future.get()方法獲取執(zhí)行結(jié)果,如果當(dāng)時(shí)沒(méi)有結(jié)果當(dāng)前線程就會(huì)被掛起。
Dubbo異步調(diào)用棧

- Consumer => InvokerInvocationHandler =>DubboInvoker =>HeaderExchangeClient。
InvokerInvocationHandler
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
- InvokerInvocationHandler的invoke()方法創(chuàng)建rpc調(diào)用的RpcInvocation對(duì)象,這個(gè)對(duì)象會(huì)在單次調(diào)用的過(guò)程中傳遞,相當(dāng)于單次調(diào)用的上下文。
RpcInvocation
public class RpcInvocation implements Invocation, Serializable {
private static final long serialVersionUID = -4355285085441097045L;
// 方法名字
private String methodName;
// 參數(shù)類型
private Class<?>[] parameterTypes;
// 參數(shù)值
private Object[] arguments;
// 上下文透?jìng)鞯膮?shù)值
private Map<String, String> attachments;
private transient Invoker<?> invoker;
public RpcInvocation() {
}
public RpcInvocation(Method method, Object[] arguments) {
this(method.getName(), method.getParameterTypes(), arguments, null, null);
}
public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment) {
this(method.getName(), method.getParameterTypes(), arguments, attachment, null);
}
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments) {
this(methodName, parameterTypes, arguments, null, null);
}
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments) {
this(methodName, parameterTypes, arguments, attachments, null);
}
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
this.methodName = methodName;
this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
this.arguments = arguments == null ? new Object[0] : arguments;
this.attachments = attachments == null ? new HashMap<String, String>() : attachments;
this.invoker = invoker;
}
}

RpcInvocation
- RpcInvocation的核心變量包括方法名、參數(shù)類型、參數(shù)值、附帶上下文數(shù)據(jù)。
- methodName為方法名。
- parameterTypes為參數(shù)類型。
- arguments為參數(shù)值。
- attachments為附帶上下文數(shù)據(jù)。
異步調(diào)用流程

dubbo異步調(diào)用.jpg
DubboInvoker
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否異步
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否單向
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超時(shí)時(shí)間
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 處理單向發(fā)送
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 處理異步發(fā)送
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 處理同步發(fā)送
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
- 異步調(diào)用執(zhí)行ResponseFuture future = currentClient.request(inv, timeout)發(fā)送請(qǐng)求。
- 異步調(diào)用執(zhí)行RpcContext.getContext().setFuture(new FutureAdapter<Object>(future))保存future到RpcContext
- 同步調(diào)用執(zhí)行RpcContext.getContext().setFuture(null)設(shè)置RpcContext為空。
- 同步調(diào)用執(zhí)行currentClient.request(inv, timeout).get()等待同步消息結(jié)果。
- 同步執(zhí)行和異步執(zhí)行的差別在于同步發(fā)送requst之后執(zhí)行g(shù)et()同步等待結(jié)果,異步執(zhí)行發(fā)送request之后保存future到RpcContext上下文。
HeaderExchangeChannel
final class HeaderExchangeChannel implements ExchangeChannel {
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);
private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";
private final Channel channel;
private volatile boolean closed = false;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
this.channel = channel;
}
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 保存channel、req等信息到DefaultFuture對(duì)象當(dāng)中
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 調(diào)用底層邏輯發(fā)送消息
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
- HeaderExchangeChannel的request()方法內(nèi)部會(huì)創(chuàng)建Request對(duì)象,核心變量包括version、data的變量。
- channel.send()方法中channel指的是NettyClient對(duì)象。
- HeaderExchangeChannel.request()方法返回DefaultFuture對(duì)象,用于保存異步至上下文的RpcContext當(dāng)中
- HeaderExchangeChannel的request()方法內(nèi)部創(chuàng)建Request對(duì)象,創(chuàng)建DefaultFuture對(duì)象(包含request對(duì)象),調(diào)用NettyClient.send()異步發(fā)送消息。
Request
public class Request {
public static final String HEARTBEAT_EVENT = null;
public static final String READONLY_EVENT = "R";
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId;
private String mVersion;
private boolean mTwoWay = true;
private boolean mEvent = false;
private boolean mBroken = false;
private Object mData;
public Request() {
mId = newId();
}
public Request(long id) {
mId = id;
}
private static long newId() {
// getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
return INVOKE_ID.getAndIncrement();
}
public long getId() {
return mId;
}
}

- Request對(duì)象的核心字段INVOKE_ID,全局靜態(tài)用于記錄標(biāo)識(shí)request對(duì)象的唯一性。
- Request對(duì)象的核心變量如上圖所示,其中mData保存RpcInvocation對(duì)象。
DefaultFuture
public class DefaultFuture implements ResponseFuture {
private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);
// 保存等待響應(yīng)的Channel
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
// 保存等待響應(yīng)的DefaultFuture
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
static {
// 超時(shí)檢測(cè)線程
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}
// invoke id.
private final long id;
private final Channel channel;
private final Request request;
private final int timeout;
// 核心的lock和done字段
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private final long start = System.currentTimeMillis();
private volatile long sent;
private volatile Response response;
private volatile ResponseCallback callback;
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
// 超時(shí)檢測(cè)線程RemotingInvocationTimeoutScan
private static class RemotingInvocationTimeoutScan implements Runnable {
@Override
public void run() {
while (true) {
try {
// 遍歷所有保存的DefaultFuture對(duì)象檢測(cè)超時(shí)
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
// 檢測(cè)超時(shí)的處理邏輯
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// 創(chuàng)建Response對(duì)象,唯一標(biāo)識(shí)符為Request的唯一標(biāo)識(shí)mId
// future.getId() 等價(jià)于request.getId()
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// 調(diào)用DefaultFuture.received執(zhí)行超時(shí)響應(yīng)邏輯
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
}
}
}
}
// 處理數(shù)據(jù)接受的邏輯或者超時(shí)響應(yīng)的邏輯received => doReceived
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
// 內(nèi)部通過(guò)done.signal()方式通知等待的線程異步結(jié)果。
private void doReceived(Response res) {
// 通過(guò)lock來(lái)實(shí)現(xiàn)互斥
lock.lock();
try {
response = res;
if (done != null) {
// 通知等待線程
done.signal();
}
} finally {
lock.unlock();
}
// 如果配置回調(diào)函數(shù)就執(zhí)行回調(diào)函數(shù)callback
if (callback != null) {
invokeCallback(callback);
}
}
// 以下邏輯是同步等待的邏輯
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
}
- DefaultFuture作為異步實(shí)現(xiàn)的核心,本質(zhì)上通過(guò)ReentrantLock來(lái)實(shí)現(xiàn)異步通知。
- Lock lock = new ReentrantLock(),互斥鎖用于保證單個(gè)DefaultFuture的線程安全。
- Condition done = lock.newCondition(),用于單個(gè)DefaultFuture的喚醒通知機(jī)制。
- DefaultFuture包含全局唯一的靜態(tài)線程RemotingInvocationTimeoutScan用于掃描超時(shí)的DefaultFuture對(duì)象。
- DefaultFuture包含靜態(tài)變量FUTURES保存所有請(qǐng)求的DefaultFuture對(duì)象。
- RemotingInvocationTimeoutScan掃描超時(shí)線程后會(huì)執(zhí)行DefaultFuture的received => doReceived流程進(jìn)行響應(yīng)。
- 正常響應(yīng)返回的處理流程會(huì)執(zhí)行DefaultFuture的received => doReceived流程進(jìn)行響應(yīng)。
- DefaultFuture的get方法用于執(zhí)行等待操作,通過(guò)done.await()方法實(shí)現(xiàn)。
正常返回的處理流程
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;
public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;
private final ExchangeHandler handler;
public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
}
- 正常響應(yīng)的時(shí)候通過(guò)received => handleResponse執(zhí)行到DefaultFuture的received進(jìn)行響應(yīng)。
結(jié)論
- Dubbo的異步調(diào)用流程的底層核心借助于NettyClient的異步過(guò)程。
- Dubbo的異步調(diào)用流程的每個(gè)請(qǐng)求對(duì)象Request都有唯一的標(biāo)識(shí)符(使用遞增的數(shù)字標(biāo)識(shí))。
- Dubbo的異步調(diào)用流程的核心邏輯通過(guò)DefaultFuture來(lái)完成,底層邏輯是通過(guò)ReentrantLock來(lái)實(shí)現(xiàn)的。