Dubbo Consumer 響應(yīng)過(guò)程

開篇

  • 這篇文章主要是分析Dubbo Consumer在處理Provider的響應(yīng)的流程,整體思路會(huì)按照Dubbo Client的初始化流程和Dubbo Client的響應(yīng)流程兩部分進(jìn)行分析。

  • Dubbo Client的初始化流程著重分析Client的連接過(guò)程以及處理Handler的封裝關(guān)系。

  • Dubbo Client的響應(yīng)流程著重分析響應(yīng)過(guò)程的流程,整個(gè)處理流程建立在Dubbo Client的初始化流程基礎(chǔ)上。

  • 這篇文章順便講解了Dubbo 2.6.5的版本Client側(cè)線程過(guò)多的問(wèn)題的原因。


Consumer Client 初始化流程

DubboProtocol

public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            // 省略相關(guān)代碼
    };

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // 創(chuàng)建DubboInvoker對(duì)象過(guò)程中g(shù)etClients初始化Client對(duì)象
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

    private ExchangeClient initClient(URL url) {

        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        ExchangeClient client;
        try {
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                // 由Exchange層負(fù)責(zé)進(jìn)行連接操作
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
        }
        return client;
    }
}
  • DubboProtocol在refer過(guò)程中創(chuàng)建DubboInvoker對(duì)象,在創(chuàng)建DubboInvoker對(duì)象過(guò)程中會(huì)初始化ExchangeClient對(duì)象。
  • 初始化ExchangeClient對(duì)象是通過(guò)Exchangers層的connect()方法實(shí)現(xiàn)。


Exchangers

public class Exchangers {

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // getExchanger()返回HeaderExchanger
        return getExchanger(url).connect(url, handler);
    }
}



public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 封裝關(guān)系 DecodeHandler => HeaderExchangeHandler => requestHandler
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
}
  • HeaderExchanger內(nèi)部會(huì)調(diào)用Transporters的connect()方法。
  • Handler的封裝關(guān)系 DecodeHandler => HeaderExchangeHandler => requestHandler。


Transporters

public class Transporters {

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 獲取NettyTransporter對(duì)象執(zhí)行connect()方法
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}
  • Transporters內(nèi)部獲取NettyTransporter對(duì)象,執(zhí)行connect()方法。
  • NettyTransporter的connect()方法內(nèi)部構(gòu)造NettyClient對(duì)象,參數(shù)listener為DecodeHandler對(duì)象。


NettyClient

public class NettyClient extends AbstractClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));

    private Bootstrap bootstrap;

    private volatile Channel channel; // volatile, please copy reference to use

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

    @Override
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            // 省略其他代碼
        } finally {
            if (!isConnected()) {
                //future.cancel(true);
            }
        }
    }

}


public abstract class AbstractClient extends AbstractEndpoint implements Client {

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }
}


public class ChannelHandlers {

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // ExtensionLoader.getExtensionLoader()返回AllChannelHandler對(duì)象
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
  • NettyClient的構(gòu)造函數(shù)中通過(guò)wrapChannelHandler()方法再次封裝handler。
  • ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch()返回AllChannelHandler對(duì)象。
  • handler的封裝關(guān)系為MultiMessageHandler => HeartbeatHandler
    => AllChannelHandler => DecodeHandler => HeaderExchangeHandler => requestHandler。
  • NettyClient的NettyClientHandler為NettyClient本身。
  • nettyClientHandler會(huì)在NettyClient收到響應(yīng)報(bào)文后開始執(zhí)行。


AbstractPeer

public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    private final ChannelHandler handler;
    private volatile URL url;

    public AbstractPeer(URL url, ChannelHandler handler) {
        this.url = url;
        this.handler = handler;
    }

    @Override
    public void received(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.received(ch, msg);
    }
}
AbstractPeer
  • AbstractPeer是NettyClient的基類,在AbstractPeer的構(gòu)造函數(shù)當(dāng)中handler為MultiMessageHandler,由NettyClient的構(gòu)造函數(shù)傳入。
  • AbstractPeer作為Client端響應(yīng)入口,具體的received()方法等執(zhí)行的入口,其他方法可以在實(shí)現(xiàn)類查看。


AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
}

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 構(gòu)建executor對(duì)象
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}
  • AllChannelHandler的父類WrappedChannelHandler的構(gòu)造函數(shù)中會(huì)創(chuàng)建executor對(duì)象。
  • 每個(gè)連接會(huì)有一個(gè)executor對(duì)象,consumer側(cè)的executor是基于連接維度的,每個(gè)connection會(huì)有對(duì)應(yīng)的executor對(duì)象。


Handler封裝關(guān)系

Handler封裝關(guān)系
封裝關(guān)系


Consumer Client 響應(yīng)流程

Consumer Client 響應(yīng)階段一
Consumer 響應(yīng)階段一
  • Consumer響應(yīng)階段一的調(diào)用棧如上圖。
  • 按照NettyClientHandler => NettyClient =>MultiMessageHandler => HeartbeatHandler => AllChannelHandler的順序進(jìn)行調(diào)用。


NettyClientHandler
public class NettyClientHandler extends ChannelDuplexHandler {

    private final URL url;

    private final ChannelHandler handler;

    public NettyClientHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
            throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.disconnected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}
  • NettyClientHandler的各個(gè)方法負(fù)責(zé)處理各類連接讀取事件。


AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {

    public void received(Channel channel, Object message) throws RemotingException {
        // 獲取對(duì)應(yīng)的executor線程池對(duì)象
        ExecutorService executor = getExecutorService();
        try {
            // 構(gòu)造ChannelEventRunnable對(duì)象并進(jìn)行投遞
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}
  • AllChannelHandler負(fù)責(zé)往消費(fèi)端線程池投遞ChannelEventRunnable對(duì)象。
  • ExecutorService cexecutor = getExecutorService()獲取線程池對(duì)象,每個(gè)連接一個(gè)ExecutorService對(duì)象。


Consumer Client 響應(yīng)階段二
Consumer 響應(yīng)階段二
  • Consumer 響應(yīng)階段二的調(diào)用棧如上圖。
  • 調(diào)用棧按照ChannelEventRunnable => DecodeHandler => HeaderExchangeHandler => DefaultFuture的順序調(diào)用。


ChannelEventRunnable

public class ChannelEventRunnable implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;



    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
  • ChannelEventRunnable的線程內(nèi)部執(zhí)行run()方法進(jìn)行執(zhí)行流程。
  • ChannelEventRunnable的內(nèi)部的handler對(duì)象為DecodeHandler對(duì)象。
  • 執(zhí)行DecodeHandler的內(nèi)部會(huì)接著調(diào)用HeaderExchangeHandler對(duì)象方法。


HeaderExchangeHandler

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // 處理請(qǐng)求
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                // 處理響應(yīng)
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // 處理telnet等請(qǐng)求
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }


    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}
  • HeaderExchangeHandler的received()內(nèi)部區(qū)別請(qǐng)求/響應(yīng)/字符串進(jìn)行不同的處理。
  • Consumer處理響應(yīng)的邏輯在handleResponse()方法內(nèi)部。
  • handleResponse()方法最終執(zhí)行的是DefaultFuture的方法。


DefaultFuture

public class DefaultFuture implements ResponseFuture {

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}
  • DefaultFuture負(fù)責(zé)保存響應(yīng)對(duì)象并通過(guò)信號(hào)量喚醒消費(fèi)線程。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容