Dubbo源碼分析(五)服務(wù)暴露的具體流程(下)

一、服務(wù)暴露

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    //首先嘗試從緩存中獲取
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                //從export中拿到之前的url 即為dubbo協(xié)議的url
                //創(chuàng)建 Invoker 為委托類對象
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(
                                            originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) 
                                        protocol.export(invokerDelegete), originInvoker);
                //寫入緩存
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

如上代碼,它先嘗試從緩存中獲取,如果沒有則調(diào)用protocol.export去暴露。

在這里的protocol對象其實是一個自適應(yīng)擴展類對象Protocol$Adaptive,我們調(diào)用它的export方法,它會根據(jù)協(xié)議名稱獲取對應(yīng)的擴展實現(xiàn)類,在這里它是DubboProtocol。

不知諸位是否還有印象,我們在第二章節(jié)已經(jīng)說過。通過ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);這句代碼獲取到的其實是Wrapper包裝類的對象,ProtocolListenerWrapper

1、服務(wù)暴露監(jiān)聽

ProtocolListenerWrapper.export方法主要是獲取服務(wù)暴露監(jiān)聽器,在服務(wù)暴露和取消服務(wù)暴露時可以獲得通知。

public class ProtocolListenerWrapper implements Protocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if ("registry".equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //獲取ExporterListener類型的擴展點加載器
        ExtensionLoader<ExporterListener> extensionLoader =
                    ExtensionLoader.getExtensionLoader(ExporterListener.class);     
        //獲取監(jiān)聽器
        List<ExporterListener> activateExtension = extensionLoader.
                    getActivateExtension(invoker.getUrl(), "exporter.listener");
                    
        //調(diào)用ProtocolFilterWrapper.export繼續(xù)暴露
        Exporter<T> export = protocol.export(invoker);
        List<ExporterListener> exporterListeners = 
                        Collections.unmodifiableList(activateExtension);
        
        //循環(huán)監(jiān)聽器 通知方法。返回ListenerExporterWrapper對象
        ListenerExporterWrapper<T> listenerExporterWrapper = 
                    new ListenerExporterWrapper<>(export, exporterListeners);
        return listenerExporterWrapper;
    }   
}

比如,我們可以創(chuàng)建一個自定義的監(jiān)聽器。

public class MyExporterListener1 implements ExporterListener {
    public void exported(Exporter<?> exporter) throws RpcException {
        System.out.println("111111111111111-------服務(wù)暴露");
    }
    public void unexported(Exporter<?> exporter) {
        System.out.println("111111111111111-------取消服務(wù)暴露");
    }
}

然后創(chuàng)建擴展點配置文件,文件名稱為:
org.apache.dubbo.rpc.ExporterListener
內(nèi)容為:
listener1=org.apache.dubbo.demo.provider.MyExporterListener1

然后在Dubbo配置文件中,這樣定義:
<dubbo:provider listener="listener1" />

那么,當(dāng)服務(wù)暴露完成后,你將會獲得通知。

2、構(gòu)建調(diào)用鏈

上一步在ProtocolListenerWrapper.export方法中,返回之前還調(diào)用了ProtocolFilterWrapper.export。它主要是為了創(chuàng)建包含各種Filter的調(diào)用鏈。

public class ProtocolFilterWrapper implements Protocol {    
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //創(chuàng)建Filter 過濾鏈的 Invoker
        Invoker<T> tInvoker = buildInvokerChain(invoker, "service.filter","provider");
        //調(diào)用DubboProtocol繼續(xù)暴露
        Exporter<T> export = protocol.export(tInvoker);
        //返回
        return export;
    }
}

這里的重點是buildInvokerChain方法,它來創(chuàng)建調(diào)用鏈攔截器。每次遠程方法執(zhí)行,該攔截都會被執(zhí)行,在Dubbo中已知的Filter有

org.apache.dubbo.rpc.filter.EchoFilter
org.apache.dubbo.rpc.filter.GenericFilter
org.apache.dubbo.rpc.filter.GenericImplFilter
org.apache.dubbo.rpc.filter.TokenFilter
org.apache.dubbo.rpc.filter.AccessLogFilter
org.apache.dubbo.rpc.filter.CountFilter
org.apache.dubbo.rpc.filter.ActiveLimitFilter
org.apache.dubbo.rpc.filter.ClassLoaderFilter
org.apache.dubbo.rpc.filter.ContextFilter
org.apache.dubbo.rpc.filter.ConsumerContextFilter
org.apache.dubbo.rpc.filter.ExceptionFilter
org.apache.dubbo.rpc.filter.ExecuteLimitFilter
org.apache.dubbo.rpc.filter.DeprecatedFilter

此時的invoker經(jīng)過各種Filter的包裝,就變成了下面這個樣子:

帶有Filter的調(diào)用鏈

當(dāng)然了,我們也可以自定義Filter。比如像下面這樣:

public class MyFilter1 implements Filter {
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        System.out.println("調(diào)用之前:"+invoker.getUrl().toFullString());
        Result result = invoker.invoke(invocation);
        System.out.println("調(diào)用之后:"+invoker.getUrl().toFullString());
        return result;
    }
}

然后創(chuàng)建擴展點配置文件,文件名稱為:
resources\META-INF\dubbo\com.alibaba.dubbo.rpc.Filter
內(nèi)容為:
myfilter1=org.apache.dubbo.demo.provider.MyFilter1

然后在Dubbo配置文件中,這樣定義:
<dubbo:provider filter="myfilter1"/>

需要注意的是,這樣配置之后,myfilter1會在默認(rèn)的Filter之后。如果你希望在默認(rèn)的Filter前面,那么你可以這樣配置<dubbo:provider filter="myfilter1,default"/>

3、DubboProtocol

經(jīng)過上面各種的搞來搞去,終于可以真正的暴露服務(wù)了。調(diào)用DubboProtocol.export,我們重點兩部分:創(chuàng)建DubboExporter和啟動服務(wù)器。

public class DubboProtocol extends AbstractProtocol {
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //服務(wù)標(biāo)識
        //例如:com.viewscenes.netsupervisor.service.InfoUserService:20880
        String key = serviceKey(url);
        //創(chuàng)建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //將 <key, exporter> 鍵值對放入緩存中
        exporterMap.put(key, exporter);
        //省略無關(guān)代碼...
        // 啟動通信服務(wù)器
        openServer(url);
        //優(yōu)化序列化
        optimizeSerialization(url);
        return exporter;
    }
}

3.1、創(chuàng)建DubboExporter

事實上,創(chuàng)建DubboExporter的過程非常簡單,就是調(diào)用構(gòu)造函數(shù)賦值而已。

public class DubboExporter<T> extends AbstractExporter<T> {
    public DubboExporter(Invoker<T> invoker, String key, 
                Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
    }
}

3.2、啟動服務(wù)器

private void openServer(URL url) {
    //獲取IP:端口 ,并將它作為服務(wù)器實例的key
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        //先從緩存中獲取
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            //創(chuàng)建服務(wù)器實例
            serverMap.put(key, createServer(url));
        } else {
            //重置服務(wù)器
            server.reset(url);
        }
    }
}

如上代碼,Dubbo先從緩存中獲取已啟動的服務(wù)器實例,未命中的話就去創(chuàng)建。如果已經(jīng)存在服務(wù)器實例,就根據(jù)url的內(nèi)容重置服務(wù)器。我們重點分析創(chuàng)建的過程。

private ExchangeServer createServer(URL url) {
    
    //服務(wù)器關(guān)閉時 發(fā)送readonly事件
    url = url.addParameterIfAbsent("channel.readonly.sent","true");
    //設(shè)置心跳檢測
    url = url.addParameterIfAbsent("heartbeat", "60000");
    //獲取服務(wù)器參數(shù) 默認(rèn)為netty
    String str = url.getParameter("server","netty");
    //通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常
    if (str != null && str.length() > 0 && 
            !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }
    //設(shè)置服務(wù)器編解碼器為dubbo
    url = url.addParameter("codec", "dubbo");
    ExchangeServer server;
    try {
        //創(chuàng)建ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.
                        getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

上面的代碼主要分為兩部分:設(shè)置默認(rèn)參數(shù)和創(chuàng)建服務(wù)器實例。設(shè)置參數(shù)沒什么好說的,下面調(diào)用到HeaderExchanger.bind方法,它只是設(shè)置封裝Handler處理器。

public class HeaderExchanger implements Exchanger {

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //封裝Handler處理器
        HeaderExchangeHandler headerExchangeHandler = new HeaderExchangeHandler(handler);
        DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler);
        //創(chuàng)建服務(wù)器
        Server bind = Transporters.bind(url, decodeHandler);
        //封裝為HeaderExchangeServer對象返回
        HeaderExchangeServer server = new HeaderExchangeServer(bind);
        return server;
    }
}

我們只需關(guān)注Transporters.bind,它負(fù)責(zé)啟動服務(wù)器。

public class Transporters {
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //獲取自適應(yīng) Transporter 實例
        Transporter adaptiveExtension = ExtensionLoader.
                getExtensionLoader(Transporter.class).getAdaptiveExtension();
        //調(diào)用NettyServer.bind
        return adaptiveExtension.bind(url, handler);
    }
}

如上代碼,它首先獲取自適應(yīng) Transporter 實例,即TransporterAdaptive。然后根據(jù)傳入的url參數(shù)來加載哪個Transporter,在Dubbo中默認(rèn)是NettyTransporter。需要注意的是,根據(jù)Dubbo版本的不同,有可能使用Netty的版本也不一樣。

比如,筆者在Dubbo2.7快照版本中(還未發(fā)行),看到的Netty配置文件是這樣,說明它默認(rèn)使用的就是Netty4:

netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter
netty= org.apache.dubbo.remoting.transport.netty4.NettyTransporter

在Dubbo2.6版本中,看到的Netty配置文件是這樣,說明你只要不指定Netty4,那就使用Netty3

netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter

不過這些都無傷大雅,我們以Netty3接著看....

public class NettyTransporter implements Transporter {
    public Server bind(URL url, ChannelHandler listener){
        //創(chuàng)建 NettyServer
        return new NettyServer(url, listener);
    }
}
public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) {
        super(url, ChannelHandlers.wrap(handler, 
            ExecutorUtil.setThreadName(url, "DubboServerHandler")));
    }
}

我們看到, 在NettyTransporter.bind方法里,它調(diào)用的是NettyServer構(gòu)造函數(shù),緊接著又調(diào)用父類的構(gòu)造函數(shù)。

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
    
        //獲取 ip 和端口
        String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
        int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
        if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // 設(shè)置 ip 為 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter("accepts", 0);
        this.idleTimeout = url.getParameter("idle.timeout", 600000);
        try {
            //調(diào)用子類方法 開啟服務(wù)器
            doOpen();
        }
    }
}

如上代碼,在父類的構(gòu)造函數(shù)里面主要是設(shè)置了一些參數(shù),無需多說。接著我們再看子類的doOpen實現(xiàn)。

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    // 創(chuàng)建 boss 和 worker 線程池
    // 設(shè)置線程的名稱
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(
        boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    
    //創(chuàng)建 ServerBootstrap
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // 設(shè)置 PipelineFactory
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // 綁定到指定的 ip 和端口上
    channel = bootstrap.bind(getBindAddress());
}

以上方法就通過Netty啟動了通信服務(wù)器。熟悉Netty的朋友對這段代碼一定不陌生,如果想了解更多,我們需要關(guān)注一下它的處理器。

處理器

ChannelHandler是Netty中的核心組件之一。在這里,Dubbo使用NettyHandler作為消息處理器。它繼承自SimpleChannelHandler,這說明Netty接收到的事件都會由此類來處理。比如:客戶端連接、客戶端斷開連接、數(shù)據(jù)讀取、網(wǎng)絡(luò)異常...我們重點來看數(shù)據(jù)讀取方法。

@Sharable
public class NettyHandler extends SimpleChannelHandler {

    public NettyHandler(URL url, ChannelHandler handler) {
        this.url = url;
        this.handler = handler;
    }
    //接收到消息
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}

當(dāng)Netty的Selector輪詢到數(shù)據(jù)讀取事件后,將調(diào)用messageReceived方法。在這里,它調(diào)用的是handler.received,由構(gòu)造函數(shù)可得知,此處的handler對象其實是NettyServer對象的實例。

中間它會經(jīng)過AllChannelHandler,在這里會在線程池中分配一個線程去處理。

public class AllChannelHandler extends WrappedChannelHandler {
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        cexecutor.execute(new ChannelEventRunnable(channel, 
                                  handler, ChannelState.RECEIVED, message));
    }
}   

ChannelEventRunnable實現(xiàn)Runnable接口,我們看它的run方法。其實也很簡單,就是根據(jù)事件狀態(tài),繼續(xù)往下調(diào)用。

public class ChannelEventRunnable implements Runnable {
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }
}

再深入的過程我想不必再深究了,無非是業(yè)務(wù)邏輯處理。不過還有另外一個問題,這個線程池是什么樣的?大小多少呢?
通過跟蹤,我們發(fā)現(xiàn)它是在其父類中被初始化的。它也是通過ExtensionLoader加載的

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;
    
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        ExtensionLoader<ThreadPool> extensionLoader = 
                            ExtensionLoader.getExtensionLoader(ThreadPool.class);
        ThreadPool adaptiveExtension = extensionLoader.getAdaptiveExtension();
        executor = (ExecutorService) adaptiveExtension.getExecutor(url);
    }
}

然后我們看ThreadPool接口標(biāo)注了默認(rèn)實現(xiàn)@SPI("fixed") ,它是一個固定數(shù)量的線程池。

public class FixedThreadPool implements ThreadPool {
    public Executor getExecutor(URL url) {
        //設(shè)置線程池參數(shù)
        String name = url.getParameter("threadname", "Dubbo");
        int threads = url.getParameter("threads", 200);
        int queues = url.getParameter("queues",0);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

由此我們可以回答上面的問題了,Dubbo中的線程池是固定線程數(shù)量大小為200的線程池。如果線程池滿了怎么辦?我們再看下它的拒絕策略。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    String msg = String.format("Thread pool is EXHAUSTED!" +
                    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
    logger.warn(msg);
    dumpJStack();
    throw new RejectedExecutionException(msg);
}

學(xué)到了嗎?

  • 打印錯誤信息
  • 導(dǎo)出線程棧信息
  • 拋出異常

到此,關(guān)于服務(wù)暴露的過程就分析完了。整個過程比較復(fù)雜,大家在分析的過程中耐心一些。并且多寫 Demo 進行斷點調(diào)試,以便能夠更好的理解代碼邏輯。

二、服務(wù)注冊

服務(wù)注冊就是把已經(jīng)暴露的服務(wù)信息注冊到第三方平臺,以供消費者使用。我們把目光回到RegistryProtocol.export方法,我們以zookeeper注冊中心為例。

1、創(chuàng)建注冊中心

首先,需要根據(jù)配置文件的信息獲取到注冊中心的url,比如以zookeeper為例:
zookeeper://192.168.139.131:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_producer1&client=zkclient&dubbo=2.6.2......

我們直接來到ZookeeperRegistry,這里的重點是調(diào)用connect方法創(chuàng)建Zookeeper 客戶端。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //省略部分代碼...
    
    //創(chuàng)建zookeeper客戶端
    zkClient = zookeeperTransporter.connect(url);
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    //重新連接事件
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

在這里有一點需要注意,Dubbo官網(wǎng)說,連接zookeeper缺省使用zkclient。

2.2.0 版本開始缺省為 zkclient 實現(xiàn),以提升 zookeeper 客戶端的健狀性。

但從代碼上看,它默認(rèn)使用的是curator客戶端。@SPI("curator") 這一點比較費解,所以如果想使用zkclient,要在配置文件中指定:
<dubbo:registry address="zookeeper://192.168.139.131:2181?client=zkclient"/>

然后我們接著往下繼續(xù)看,最終調(diào)用zkclient的方法完成zookeeper客戶端的創(chuàng)建。

public ZkclientZookeeperClient(URL url) {
    
    //異步調(diào)用ZkClient創(chuàng)建客戶端
    client = new ZkClientWrapper(url.getBackupAddress(), 30000);
    //監(jiān)聽zookeeper狀態(tài)
    client.addListener(new IZkStateListener() {
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {
            ZkclientZookeeperClient.this.state = state;
            if (state == KeeperState.Disconnected) {
                stateChanged(StateListener.DISCONNECTED);
            } else if (state == KeeperState.SyncConnected) {
                stateChanged(StateListener.CONNECTED);
            }
        }
        @Override
        public void handleNewSession() throws Exception {
            stateChanged(StateListener.RECONNECTED);
        }
    });
    client.start();
}

2、創(chuàng)建節(jié)點

創(chuàng)建節(jié)點很簡單,就是將服務(wù)配置數(shù)據(jù)寫入到 Zookeeper 的某個路徑的節(jié)點下。

protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " 
            to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

我們看下zookeeper中已經(jīng)創(chuàng)建好的節(jié)點信息:


服務(wù)配置信息

三、總結(jié)

至此,Dubbo中的服務(wù)暴露全過程我們已經(jīng)分析完了。由于篇幅問題,筆者將它們分為了上下兩篇共8000余字。篇幅比較大,邏輯也較為復(fù)雜,如果文章有不妥錯誤之處,希望大家提出寶貴意見。

我們再回憶一下整個流程:

  • 通過Spring onApplicationEvent事件調(diào)用入口方法
  • 配置信息檢查以及缺省值設(shè)置
  • 創(chuàng)建Invoker
  • 構(gòu)建調(diào)用鏈
  • 本地暴露
  • 啟動Netty服務(wù)器
  • 創(chuàng)建zookeeper客戶端以及服務(wù)注冊
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容