dubbo剖析:三 網(wǎng)絡通信之 -- Server實現(xiàn)

引子:

  • dubbo剖析:一 服務發(fā)布 中,我們講到了 RegistryProtocol.export過程中有一個關鍵步驟,即調(diào)用doLocalExport(final Invoker<T> originInvoker)生成Exporter,其最終調(diào)用了DubboProtocolexport()方法。
  • DubboProtocolexport()方法:完成了 "啟動并監(jiān)聽網(wǎng)絡服務" 的工作,具體是通過HeaderExchangerbind()方法創(chuàng)建了一個HeaderExchangerServer實現(xiàn)的。
  • 本章我們就來介紹HeaderExchangerServer設計架構(gòu)功能實現(xiàn)。

一、入口流程

服務發(fā)布流程中,RegistryProtocol會調(diào)用到DubboProtocolexport()方法,用于完成網(wǎng)絡服務的啟動和監(jiān)聽。

1.1 入口代碼

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        //step1 export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //...省略部分代碼...

        //step2 創(chuàng)建ExchangeServer
        openServer(url);

        return exporter;
    }
    private void openServer(URL url) {
        String key = url.getAddress();
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //調(diào)用createServer
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        //...省略部分代碼,參數(shù)解析之類的...
        ExchangeServer server;
        try {
            //關鍵代碼,使用HeaderExchanger.bind創(chuàng)建HeaderExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        return server;
    }

1.2 流程圖解

DubboProtocol.export()流程圖

第一步:生成Exporter:

  • AbstractProxyInvoker作為構(gòu)造參數(shù),new出一個DubboExporter;
  • DubboExporter維護了AbstractProxyInvoker的生命周期;
  • 服務url、DubboExporter放入DubboProtocol的緩存map供消息接收處理使用;

第二步:創(chuàng)建ExchangeServer:

  • AbstractProxyInvoker中獲取服務url;
  • 服務url作為入?yún)?、同時使用DubboProtocol包含的網(wǎng)絡事件處理器requestHandler,調(diào)用HeaderExchangerbind()方法創(chuàng)建ExchangeServer;
  • HeaderExchanger中又依賴了NettyTransporter,使用其bind()方法創(chuàng)建NettyServer;
  • NettyServer是啟動網(wǎng)絡服務的核心類。HeaderExchangerServer使用NettyServer作為構(gòu)造參數(shù),擴展了它的功能;

二、server端網(wǎng)絡層結(jié)構(gòu)

server端網(wǎng)絡層類圖關系說明

2.1 網(wǎng)絡傳輸層

  • EndPoint為網(wǎng)絡端點的抽象接口,定義了獲取網(wǎng)絡端點地址、連接、及最原始的發(fā)送消息的方法。
  • ChannelHandler為網(wǎng)絡事件處理器接口,定義了Server端監(jiān)聽到各種類型的網(wǎng)絡事件時的處理方法(connected、disconnected、sent、received、caught),Netty中也有類似定義。
  • Server為網(wǎng)絡服務端的抽象接口,繼承了EndPoint的功能,并擴展了獲取與服務端建連的通道Channel的方法。
  • Transporter為網(wǎng)絡傳輸層的抽象接口,核心作用就是提供了創(chuàng)建ServerClient兩個核心接口實現(xiàn)類的方法。

2.2 信息交換層

  • ExchangeHandler,在ChannelHandler接口基礎上,添加了 響應請求 的方法。
  • ExchangeServer,在Server接口基礎上,將獲取Channel的方法擴展為獲取ExchangeChannel的方法。
  • Exchanger為信息交換層的抽象接口,核心作用就是提供了創(chuàng)建ExchangeServerExchangeClient兩個核心接口實現(xiàn)類的方法。

2.3 網(wǎng)絡通道Channel

網(wǎng)絡通道接口定義
  • Channel為網(wǎng)絡通道的抽象接口,繼承了EndPoint的功能,并擴展了綁定獲取屬性和獲取通道對端地址的方法。
  • ExchangeChannel,在Channel接口的基礎上,擴展了請求響應模式的功能,并能獲取綁定在通道上的網(wǎng)絡事件監(jiān)聽器。

三、HeaderExchangeServer & NettyServer實現(xiàn)詳解

Server實現(xiàn)層次結(jié)構(gòu)圖

3.1 網(wǎng)絡層

AbstractPeer類(網(wǎng)絡事件處理器和網(wǎng)絡節(jié)點的通用實現(xiàn)):

  • 定義了屬性ChannelHandlerURL,作為構(gòu)造方法入?yún)⒆⑷耄?/li>
  • 實現(xiàn)了ChannelHandlerEndPoint接口,ChannelHandler接口的相關方法依賴其channelHandler屬性完成實現(xiàn);

AbstractEndPoint類(加入編解碼功能):

  • 定義了構(gòu)造方法,入?yún)瑢傩?code>ChannelHandler和URL;
  • 定義了屬性Codec2,用于編解碼,通過SPI動態(tài)注入;
  • 定義了timeout/connectTimeout相關超時屬性,由URL解析賦值;
  • 對外暴露了獲取Codec2和超時相關屬性的方法,供上層依賴調(diào)用;

AbstractServer類(網(wǎng)絡服務端通用抽象,抽象出openclose、send的公共流程,并提供了doOpendoClose的實現(xiàn)擴展):

  • 定義了構(gòu)造方法,入?yún)瑢傩?code>ChannelHandler和URL,并觸發(fā)doOpen()擴展;
  • 重寫EndPoint接口的close()方法,觸發(fā)doClose()擴展;
  • 實現(xiàn)EndPoint接口的send()方法,遍歷并調(diào)用Channel.send();

NettyServer類(網(wǎng)絡服務端Netty實現(xiàn)類,實現(xiàn)了doOpen、doClose、getChannels三個具體擴展):

  • 實現(xiàn)了doOpen()擴展方法,使用Netty的ServerBootstrap完成服務啟動監(jiān)聽,其網(wǎng)絡世界處理器為this包裝成的NettyHandler
  • 實現(xiàn)了doClose()擴展方法,調(diào)用Netty的boostrapchannel完成網(wǎng)絡資源釋放;
  • 實現(xiàn)了getChannels()方法,channels的值由網(wǎng)絡事件處理器在connect、disconnect事件觸發(fā)時變動維護;
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

3.2 交換層

HeaderExchangeServer類(交換層服務端,將網(wǎng)絡層的Channel擴展為交換層的ExchangeChannel,并加入心跳檢測功能):

  • 定義了構(gòu)造方法,入?yún)瑢傩?code>Server,用于實現(xiàn)服務端網(wǎng)絡層功能;
  • 定義了屬性定時任務線程池scheduled,用于執(zhí)行“定時心跳收發(fā)及心跳超時監(jiān)測”任務;
  • 定義了hearbeat/heartbeatTieout相關心跳屬性,由URL解析賦值;
  • 構(gòu)造方法中啟動“定時心跳收發(fā)及心跳超時監(jiān)測”任務,超時時“Server斷連、Client斷連重連”;
  • 將網(wǎng)絡層Channel擴展為交換層ExchangeChannel,具體實現(xiàn)后續(xù)另辟章節(jié);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容