Dubbo Client NIO處理

請(qǐng)求開始

主要是區(qū)分是不是異步

image.png

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
//構(gòu)造Future對(duì)象,并且放入到線程變量
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
//直接調(diào)用get,返回結(jié)果
                return (Result) currentClient.request(inv, timeout).get();
            }

網(wǎng)絡(luò)請(qǐng)求構(gòu)造及創(chuàng)建Future

返回上一步需要的Future對(duì)象。

image.png

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

        // create request.在構(gòu)造函數(shù)里設(shè)置ID
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;

執(zhí)行網(wǎng)絡(luò)請(qǐng)求

真正執(zhí)行數(shù)據(jù)發(fā)送,到這里發(fā)送階段就全部完成。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel#send

image.png
 public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        
        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
        }
        if(! success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

接收數(shù)據(jù)解碼

接收數(shù)據(jù)最先進(jìn)行解碼工作。

image.png

請(qǐng)求ID的解碼,返回結(jié)果解碼
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

 protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.
// 從頭中獲取請(qǐng)求ID
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {
                        DecodeableRpcResult result;
//解碼工作是否在IO線程處理,默認(rèn)是true
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation)getRequestData(id), proto);
//執(zhí)行解碼
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
//設(shè)置返回結(jié)果
                    res.setResult(data);
                } catch (Throwable t) {
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            } else {
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;

返回Message處理

如果是區(qū)分heartbeat與業(yè)務(wù)請(qǐng)求,將業(yè)務(wù)處理放到線程池執(zhí)行。

image.png

斷點(diǎn):com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

線程池中執(zhí)行 com.alibaba.dubbo.remoting.transport.DecodeHandler#decode

處理回調(diào)函數(shù)

斷點(diǎn):com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#doReceived

image.png

線程池運(yùn)行的任務(wù)com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run

com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received

    public static void received(Channel channel, Response response) {
        try {
//根據(jù)ID獲取請(qǐng)求時(shí)放的Future對(duì)象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {  
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

參考

http://flychao88.iteye.com/blog/2190465
http://blog.csdn.net/qq418517226/article/details/51906357

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容