【Netty】Netty的啟動過程一

首先來張網(wǎng)上盛傳的netty框架參考圖,以供跟蹤代碼參考:


netty框架參考圖.jpg

一段標準的Netty服務(wù)端啟動代碼如下:

    public NettyTcpServer() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup(4);
        bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 5)
                .childOption(ChannelOption.TCP_NODELAY, true);
    }

    public void bind(String ip, int port) {
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast("decoder", new ProtoDecoder(upLimit))
                        .addLast("server-handler", new ServerHandler()) 
                        .addLast("encoder", new ProtoEncoder(downLimit));
            }
        });
        InetSocketAddress address = new InetSocketAddress(ip, port);
        try {
            bootstrap.bind(address).sync();
        } catch (InterruptedException e) {
            log.error("bind "+ip+":"+port+" failed", e);
            shutdown();
        }
    }

Netty的服務(wù)端,一般會啟動兩個NioEventLoopGroup線程組(個人感覺用組比用池更準確,這里組指數(shù)組),一個為bossGroup線程組,處理客戶端的連接請求;一個workerGroup線程組,用來處理IO事件。很多人都知道Netty服務(wù)端就是做這事的,今天我們就用源碼來揭示這是如何實現(xiàn)的。
先從NioEventLoopGroup的構(gòu)造方法著手:

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

一路查看實現(xiàn),會來到這里:

    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

可見,實例化NioEventLoopGroup時,如果在這里沒有設(shè)置參數(shù),也沒有在JVM參數(shù)里設(shè)置“-Dio.netty.eventLoopThreads=x”,那么這個線程組的默認線程數(shù)為CPUx2,否則為設(shè)置的參數(shù)值,最后,可以看到NioEventLoopGroup的具體實現(xiàn)為:

        if (executor == null) {
            //后續(xù)Netty對各種IO事件的處理就是通過此executor創(chuàng)建線程處理的
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //創(chuàng)建NIOEventLoop數(shù)組
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //實例化每個NIOEventLoop,每個NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                //......
            }
        }

        //選擇器,選擇由哪個NIOEventLoop處理,注意這里以children作為傳參
        chooser = chooserFactory.newChooser(children);

在實例化NIOEventLoopGroup時,首先創(chuàng)建了一個Executor,而Executor的作用就是通常被用來代替顯示地創(chuàng)建線程的,Executor對象可以用來執(zhí)行Runnable任務(wù),該接口將“任務(wù)提交”從任務(wù)的運行機制中解耦出來,包括線程使用、調(diào)度等細節(jié)。我們看Netty中的它的實現(xiàn)如下,由它的execute方法中threadFactory.newThread(command).start();一句,證實了我們所說的Executor的作用,netty很有可能就是通過這句來創(chuàng)建它的那些IO線程的,我們不妨先猜猜,它是在什么時候執(zhí)行這句的呢?(彩蛋)

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
      threadFactory.newThread(command).start();//創(chuàng)建NettyIO線程,重點代碼,將斷點打于此處,可查看整個Netty線程啟動步驟
    }

再回溯到上面的NIOEventLoopGroup代碼段,由children = new EventExecutor[nThreads];一句,NIOEventLoopGroup實則創(chuàng)建了一個new NioEventLoopGroup(4)中參數(shù)數(shù)量的NIOEventLoop數(shù)組,NIOEventLoop的實例就是通過newChild(executor, args)方法添加的,由下面的幾段代碼可知,每個NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers。而newChild(executor, args)方法就是做NIOEventLoop實例的初始化工作,一路跟蹤我們還可發(fā)現(xiàn),NIOEventLoop的任務(wù)隊列用的是LinkedBlockingQueue,大小為Integer.MAX_VALUE,如果沒設(shè)置JVM參數(shù)“-Dio.netty.eventLoop.maxPendingTasks=x”的話

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NIOEventLoop構(gòu)造為:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }

再看該構(gòu)造中的super方法為:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

由以上代碼我們大概知道了NIOEventLoopGroup和NIOEventLoop的實現(xiàn),NIOEventLoopGroup其實是一個NIOEventLoop的數(shù)組,每個NIOEventLoop都公用了一個Executor,后續(xù)創(chuàng)建線程的事都由Executor來創(chuàng)建,由threadFactory.newThread(command).start();一句可知。

但是我們還有一個疑問,那就是Netty什么時候開始啟動這些線程的呢?上述線程的啟動貌似和new NioEventLoopGroup(4)中的參數(shù)也沒有任何關(guān)系,那它怎么知道只創(chuàng)建new NioEventLoopGroup(4)中的參數(shù)個數(shù)的線程呢?我們只知道這個參數(shù)賦值給了children = new EventExecutor[nThreads];但在哪里限制了此參數(shù)的線程數(shù)量呢?即Netty怎么知道最大要啟動這么多個線程呢?線程的啟動數(shù)量和這個children數(shù)組有什么關(guān)系呢?

帶著這些疑問,我們不妨全局搜索這個children這個變量

children全局引用.png

發(fā)現(xiàn)它大多數(shù)時候用于關(guān)閉和終止判斷,這些看不出和啟動線程數(shù)有什么關(guān)系;只有在NioEventLoopGroup的實現(xiàn)中這里將children傳參給了一個字面意思叫選擇器的類,如下:

//選擇器,選擇由哪個NIOEventLoop處理,注意這里children的傳參
chooser = chooserFactory.newChooser(children);

啟動的線程數(shù)是否和這個選擇器有關(guān)呢?我們再跟進去看看

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

發(fā)現(xiàn),netty把這個NIOEventLoop數(shù)組賦值給了它的兩個內(nèi)部類Chooser之一,它們的唯一提供的使用方法是next()方法,看這方法實現(xiàn):executors[Math.abs(idx.getAndIncrement() % executors.length)]貌似這里告訴了netty當前應(yīng)使用哪個NIOEventLoop,看來可以懷疑啟動的線程數(shù)量和這個next()方法有關(guān),那只要全局搜下這個next()方法,就應(yīng)該知道了

next()全局引用.png

有這么多地方調(diào)用,要是第一次看Netty源碼,從next()反推過去看代碼直至看到跟ServerBootstrap或NioEventLoopGroup相關(guān)的next()調(diào)用,太難找了(我第一次看就是- -),而Netty服務(wù)端啟動代碼中間還有那么多的源碼沒看,究竟是在哪里限制了線程的啟動數(shù)量呢?

既然不知道在哪里限制線程數(shù)量的,也不知道何時啟動線程的,但是線程的創(chuàng)建地方我們是找到了的,即executor的初始化地方,如下

        if (executor == null) {
            //后續(xù)Netty對各種IO事件的處理就是通過此executor創(chuàng)建線程處理的
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

點進ThreadPerTaskExecutor,根據(jù)Executor的作用“就是通常被用來代替顯示地創(chuàng)建線程的”,我們懷疑是在這里創(chuàng)建線程的,那么我們何不在此打上斷點呢?

    @Override
    public void execute(Runnable command) {
      threadFactory.newThread(command).start();//創(chuàng)建NettyIO線程,重點代碼,將斷點打于此處,可查看整個Netty線程啟動步驟
    }

打上斷點后,啟動ServerBootstrap,發(fā)現(xiàn)確實調(diào)到這里來了,由此證實了我們猜想的正確性,通過查看調(diào)用堆棧,如下圖所示:

服務(wù)端啟動啟動boss線程.png

一下就可以發(fā)現(xiàn),原來Netty第一個線程的啟動是在綁定地址和端口開始的,再看里面的NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) 行: 85 這句,

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel); //先選一個NIOEventLoop出來,然后再執(zhí)行它的register(channel)方法
    }

它就是我們在查找next()全局引用圖中(見上一圖next()全局引用)的最小紅框標記的MultithreadEventLoopGroup類下的register(Channel)方法,我們開始不知道netty啟動線程是如何限制數(shù)量的,只懷疑這數(shù)量限制跟next()方法有關(guān),現(xiàn)在前后呼應(yīng)起來了,知道是這么個流程了,即Netty是在綁定服務(wù)器地址和ip時,先啟動一個線程去接受客戶端連接的,這個線程的啟動過程就如上圖所示(客戶端的啟動線程是在connect方法中開始的)。

再回到Netty的服務(wù)端啟動代碼,現(xiàn)在我們換種方式看源碼,之前我們只看了NioEventLoopGroup實現(xiàn),其余的還都沒看,現(xiàn)在我們跳過中間那些源碼,從bootstrap.bind(address).sync();中bind方法開始。

一直跟蹤方法堆棧,暫且先別管其他源碼,我們要先弄懂Netty是如何啟動boss線程組,去接受客戶端的連接,如何啟動worker線程組,去處理各類IO事件。點進看register實現(xiàn):

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }

跟蹤源碼時可知NioEventLoop繼承自SingleThreadEventLoop,而SingleThreadEventLoop繼承自SingleThreadEventExecutor,SingleThreadEventExecutor最終又繼承自Executor,所以在這里最終會進入我們剛打的斷點,再看SingleThreadEventExecutor中的execute方法:

    @Override
    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    }

進入startThread()最終來到這里:

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {//這里就是每個NIOEventLoop中的executor,它會執(zhí)行NIOEventLoop中的executor中的threadFactory.newThread(command).start();這句
            @Override
            public void run() { //注意這里是線程啟動后需執(zhí)行的代碼
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                }
            }
        });
    }

這便創(chuàng)建并啟動了netty的處理客戶端連接的線程。線程啟動后需要做什么事呢?根據(jù)我們的了解,Netty的bossGroup線程組和workerGroup線程組啟動后,要分別時刻處理客戶端的連接和IO事件,那么這些線程應(yīng)該具備某種功能,需要時時刻刻知道是否有客戶端連接或IO事件,根據(jù)以往經(jīng)驗,這個實現(xiàn)往往是用無限for循環(huán)實現(xiàn)的,我們只要找到哪里有for死循環(huán)的地方或類似功能的地方即可。再點進SingleThreadEventExecutor.this.run(),來到NioEventLoop的run方法,我們要找到哪里具備如此功能:

@Override
    protected void run() {
        for (;;) {
            try {
                //可以在此處打上斷點,驗證啟動后是否會進這里無限循環(huán)
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } catch (Throwable t) {
            }
        }
    }

一進來就發(fā)現(xiàn)是無限循環(huán)了,根據(jù)以往對無限循環(huán)的認知,在無限循環(huán)里往往是可以時時刻刻做某種事的,再點進processSelectedKeys()方法,查看哪里在做跟連接和處理IO相關(guān)的事了,最終找到這里:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

由SelectionKey.OP_CONNECT、SelectionKey.OP_ACCEPT、SelectionKey.OP_READ、SelectionKey.OP_WRITE字面猜想,很可能就是這里處理連接和IO事件的,我們先在上面run方法里的switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 這句打上斷點,然后再分別在這些if分支里再打上斷點,驗證我們的猜想,重新啟動netty服務(wù)端:

啟動netty服務(wù)端進入run方法無限循環(huán).png

果然進入了此斷點,驗證了我們猜想的正確性,但這里只是啟動了處理客戶端連接的線程,這里還無法進入后面打的那些if分支的,因為還沒有客戶端請求連接和發(fā)送數(shù)據(jù)操作,因此,先放開run里的斷點,我們再啟動一個netty客戶端(netty服務(wù)端和netty客戶端分別用的是《使用Netty+Protobuf實現(xiàn)游戲TCP通信》的源碼):

netty客戶端在connect方法中啟動了自己的線程.png

可見,客戶端在自己的connect()方法中啟動了自己的線程,請求后,客戶端進入了SelectionKey.OP_CONNECT分支,即發(fā)起請求連接

客戶端進入SelectionKey.OP_CONNECT分支發(fā)起請求連接.png

而服務(wù)端,則進入了SelectionKey.OP_ACCEPT分支,此時readyOps=16,接受連接請求

服務(wù)端進入SelectionKey.OP_ACCEPT分支接受連接.png

若我們提前在threadFactory.newThread(command).start()打上斷點的話,如下:

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

會發(fā)現(xiàn),服務(wù)端會再次(第二次)進入此斷點,第一次服務(wù)端進入此斷點,是啟動boss線程,第二次進入是啟動worker線程,若客戶端還有數(shù)據(jù)發(fā)給服務(wù)端,則服務(wù)端還會再次進入SelectionKey.OP_READ分支,此時readyOps=1

服務(wù)端再次進入SelectionKey.OP_READ讀取IO數(shù)據(jù).png

最終來個簡單流程回顧:
1)首先服務(wù)端啟動netty -> 服務(wù)端netty會啟動boss線程;
2)客戶端啟動netty -> 客戶端netty會啟動自己請求連接線程,客戶端進入SelectionKey.OP_CONNECT分支;
3)服務(wù)端進入SelectionKey.OP_ACCEPT分支;
4)服務(wù)端netty -> 啟動worker線程,接受客戶端的連接;
如果客戶端再發(fā)送數(shù)據(jù)給服務(wù)端:
5)服務(wù)端進入SelectionKey.OP_READ,讀取客戶端發(fā)送的數(shù)據(jù);
如果服務(wù)端也發(fā)送數(shù)據(jù)給客戶端:
6)客戶端進入SelectionKey.OP_READ,讀取服務(wù)端發(fā)送的數(shù)據(jù);

我們現(xiàn)在已經(jīng)大致知道了是如何啟動boss線程的了,那么worker線程又是如何啟動的呢?超過worker線程上限,netty又如何知道不需要再啟動線程了呢?

限于篇幅,將在下篇講解,敬請期待...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容