Netty入門——Server開(kāi)發(fā)(一)

最近一直在看Netty方面的資料,發(fā)現(xiàn)在某寶上面賣得最好的關(guān)于Netty的書是《Netty實(shí)戰(zhàn)》,恕我直言,這本翻譯自《Netty in Action》的中文版對(duì)于想入門的同學(xué)來(lái)說(shuō)真的不太好??赐曛蠖疾恢涝谥v些什么,讓人摸不著頭腦。還是建議大家去看英文原版。我個(gè)人推薦通過(guò)幾個(gè)簡(jiǎn)單的Demo,模仿別人的代碼多造“輪子”,分析Netty的源碼,最后結(jié)合《Netty精髓》,才能更好地入門,當(dāng)然如果想精通還是要運(yùn)用到實(shí)際項(xiàng)目中。
什么是Netty?為什么要用Netty?怎么安裝Netty?這些問(wèn)題大家可以在網(wǎng)上搜索到答案,這邊不再多說(shuō)了。這篇文章主要介紹如何寫一個(gè)支持Http協(xié)議的Server端并對(duì)源碼進(jìn)行分析。

服務(wù)端代碼

public class TestServer {
    public static void main(String[] args) {
        //(1)創(chuàng)建兩個(gè)NIO線程組。bossGroup負(fù)責(zé)接收客戶端的連接,workerGroup負(fù)責(zé)網(wǎng)絡(luò)讀寫
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new  NioEventLoopGroup();
        //(2) serverBootstrap 是NIO服務(wù)端的輔助啟動(dòng)類,可以降低服務(wù)端的開(kāi)發(fā)難度
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //(3) group將線程組作為參數(shù)加入到serverBootstrap,設(shè)置channel類似JDK中的ServerSocketChannel類
        // 綁定channel初始化類TestServerInitializer
        serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                childHandler(new TestServerInitializer());

        try {
            //(4) server啟動(dòng)類配置完成之后 調(diào)用bind綁定監(jiān)聽(tīng)端口,sync同步等待綁定操作完成
            // 綁定成功之后返回ChannelFuture類似JDK中的Future,用于異步操作回調(diào)通知
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            //(5) 等待服務(wù)端鏈路關(guān)閉之后main函數(shù)退出
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //(6) 優(yōu)雅的關(guān)閉兩個(gè)線程池,釋放線程資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

上面是一個(gè)簡(jiǎn)單的Server代碼,每行代碼都有注釋,如果不理解這些注釋,沒(méi)關(guān)系下面對(duì)代碼中的主要類源碼進(jìn)行分析。

EventLoopGroup源碼分析

EventLoopGroup 主要有兩個(gè)作用:注冊(cè)channel 以及 執(zhí)行Runnable任務(wù)

EventLoopGroup的源代碼如下

image.png

下面是EventLoopGroup的整體類圖,可以看到NioEventLoopGroup繼承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup實(shí)現(xiàn)了EventLoopGroup。


image.png

繼續(xù)看MultithreadEventLoopGroup源碼發(fā)現(xiàn)他繼承MultithreadEventExecutorGroup。該類的構(gòu)造方法如下


image.png

可以看到MultithreadEventLoopGroup 主要負(fù)責(zé)封裝線程數(shù)組,子類中newChild負(fù)責(zé)具體的初始化。
再看一下子類NioEventLoopGroup中的newChild方法如下。

image.png

經(jīng)過(guò)上述EventLoopGroup源代碼分析我們可以得下面的EventLoopGroup與EventLoop的關(guān)系。


image.png

EventLoop源碼分析

EventLoop負(fù)責(zé)監(jiān)聽(tīng)Channel讀寫事件以及注冊(cè)Channel。我們知道Linux下有三種事件監(jiān)控方法:select、poll、epoll。對(duì)應(yīng)到Netty中有兩種類:NioEventLoop 和 EpollEventLoop。前者使用的是JDK Selector接口實(shí)現(xiàn)Channel事件檢測(cè)(poll方式),而后者是Netty自己實(shí)現(xiàn)epoll對(duì)Channel事件檢測(cè)。
下面重點(diǎn)介紹介紹一個(gè)NioEventLoop。

NioEventLoop繼承自SingleThreadEventExecutor。SingleThreadEventExecutor有如下代碼:

    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;

可以看到SingleThreadEventExecutor包含了一個(gè)線程和一個(gè)隊(duì)列。NioEventLoop 既要執(zhí)行Selector 帶過(guò)來(lái)IO事件,還要執(zhí)行隊(duì)列中的非IO事件。

下面看一下NioEventLoop中線程執(zhí)行邏輯。

    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

1、 首先switch判斷是否有需要執(zhí)行select過(guò)程,select過(guò)程之后討論。
2、根據(jù)ioRatio(比較IO任務(wù)的時(shí)間占比)來(lái)執(zhí)行IO任務(wù)和非IO任務(wù)。如果ioRatio=100,表示執(zhí)行全部的IO任務(wù)。默認(rèn)ioRatio=50,表示一半時(shí)間執(zhí)行IO任務(wù),另外一半時(shí)間執(zhí)行非IO任務(wù)。如何控制非IO任務(wù)的時(shí)間呢?
runAllTasks函數(shù)中每執(zhí)行64個(gè)任務(wù)會(huì)判斷是否超時(shí)。
接著我們?cè)倏?code>select 函數(shù)做了什么

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
              // 1.定時(shí)任務(wù)截至事時(shí)間快到了,中斷本次輪詢
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
              //2.輪詢過(guò)程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                // 3.阻塞式select操作
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                // 4.解決jdk的nio bug
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

1、計(jì)算select過(guò)程可以執(zhí)行到的時(shí)間點(diǎn)selectDeadLineNanos
2、把時(shí)間點(diǎn)轉(zhuǎn)化成毫秒,如果時(shí)間很短,即timeoutMillis<=0,并且是第一次執(zhí)行 selectCnt==0,那么執(zhí)行selectNow,該函數(shù)返回已經(jīng)準(zhǔn)備好IO操作的select key 集合
3、如果有任務(wù)需要執(zhí)行,那么就跳出for 循環(huán)
4、執(zhí)行selector.select(timeoutMillis),如果有事件那么跳出for 循環(huán),如果沒(méi)有事件發(fā)生,那么會(huì)進(jìn)入下一個(gè)循環(huán)直到timeoutMillis小于0,跳出循環(huán)。 這一步同時(shí)會(huì)對(duì)selectCnt變量加1操作,SELECTOR_AUTO_REBUILD_THRESHOLD 默認(rèn)為512,當(dāng)selectCnt超過(guò)這個(gè)閥值時(shí)就會(huì),重新建立新的Selector,把原來(lái)的Channel遷移到新的Selector 上。為什么要設(shè)立一個(gè)selectCnt變量呢?因?yàn)镴DK nio可能會(huì)有Bug,具體可以參照http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055,這個(gè)bug會(huì)導(dǎo)致select 空輪詢,在執(zhí)行select操作時(shí)如果沒(méi)有事件發(fā)生,直接返回,沒(méi)有等待timeoutMillis, 這是一次空輪詢,selectCnt加1。
下圖是NioEventLoop的工作示意圖。

image.png

bind過(guò)程分析

ServerBootstrap 繼承自AbstractBootstrap,AbstractBootstrap的bind過(guò)程如下圖。

image.png

其中doBind的代碼如下:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

1、initAndRegister返回ChannelFuture實(shí)例regFuture,以此來(lái)判斷initAndRegister 是否執(zhí)行完畢。

  • 如果執(zhí)行完畢那么調(diào)用doBind0進(jìn)行socket綁定
  • 否則添加listener監(jiān)聽(tīng)器,當(dāng)initAndRegister完成時(shí),再調(diào)用doBind0進(jìn)行綁定

2、initAndRegister函數(shù)會(huì)創(chuàng)建NioServerSocketChannel,主要做一些初始化的工作,包括pipeline添加handler,把channel注冊(cè)到seletor上。
3、回到ServerBootstrap類,ServerBootstrap的init(Channel channel)方法,會(huì)添加handler到channel的pipeline中該handler就是ServerBootstrapAcceptor。代碼如下:


        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

4、如何把channel注冊(cè)到selector上呢?EventLoopGroup bossGroup會(huì)選擇內(nèi)部的一個(gè)EventLoop來(lái)執(zhí)行實(shí)際的注冊(cè)行為然后開(kāi)始將該Channel注冊(cè)到上述EventLoopGroup bossGroup。注冊(cè)成功后會(huì)執(zhí)行上述的initChannel方法。
bind過(guò)程結(jié)束,當(dāng)Selector檢測(cè)到NioServerSocketChannel有新的連接事件時(shí),就會(huì)交給NioServerSocketChannel的ChannelPipeline中的ServerBootstrapAcceptor處理。
ServerBootstrapAcceptor做兩件事:

  • 為新的Channel的ChannelPipeline配置我們上述代碼中的childHandler指定的ChannelHandler
  • 將新的Channel注冊(cè)到了上述EventLoopGroup workerGroup中

sync介紹

bind方法是異步方法,返回ChannelFuture,sync可以等待該異步過(guò)程結(jié)束。每一個(gè)ChannelFuture都是和一個(gè)Channel綁定的,而每一個(gè)ChannelFuture又有一個(gè)closeFuture方法,然后調(diào)用sync方法等待ChannelFuture的結(jié)束,只有當(dāng)Channel 關(guān)閉,closeFuture才會(huì)被調(diào)用,一般正常情況下不會(huì)被調(diào)用,所以主線程會(huì)一直阻塞在sync方法上。

注意點(diǎn)

Reactor模型中可以通過(guò)多個(gè)Acceptor線程加快accept操作,我們是否可以增大bossGroup線程數(shù)來(lái)加快accept呢?答案是否,沒(méi)有效果因?yàn)閎ossGroup中多線程是為了綁定多個(gè)端口,ServerSocketChannel創(chuàng)建后會(huì)綁定到bossGroup中的一個(gè)Eventloop, 由他來(lái)負(fù)責(zé)accept操作。

Tomocat 6開(kāi)始也支持NIO模型,可以開(kāi)啟多個(gè)Acceptor線程,可以參照這篇文章


參考文章:
http://www.itdecent.cn/p/e577803f0fb8
https://my.oschina.net/pingpangkuangmo/blog/742929

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容