Netty理論三:Netty線程模型

1、Reactor模式:NIO網(wǎng)絡(luò)框架的典型模式

Reactor是網(wǎng)絡(luò)編程中的一種設(shè)計(jì)模式,reactor會(huì)解耦并發(fā)請(qǐng)求的服務(wù)并分發(fā)給對(duì)應(yīng)的事件處理器來處理。目前,許多流行的開源框架都用到了reactor模式,如:netty、node.js、Cindy等,包括java的nio。

何為Reactor線程模型?

Reactor模式是事件驅(qū)動(dòng)的,有一個(gè)或多個(gè)并發(fā)輸入源,有一個(gè)Service Handler,有多個(gè)Request Handlers;這個(gè)Service Handler會(huì)同步的將輸入的請(qǐng)求(Event)多路復(fù)用的分發(fā)給相應(yīng)的Request Handler

image.png

Reactor模式的三種形式
1、Reactor 單線程模式:

image.png

這種實(shí)現(xiàn)方式,和第一章java NIO中單線程N(yùn)IO實(shí)現(xiàn)是一樣的,一個(gè)Reactor處理所有的事情。

image.png

2、Reactor 多線程模式:
編解碼及業(yè)務(wù)處理使用線程池,這樣的話,可以避免IO阻塞(IO阻塞的代價(jià)是非常大的)。

image.png
image.png

3、Reactors 主從模式:
把Reactor分為兩個(gè),一個(gè)負(fù)責(zé)接收,一個(gè)負(fù)責(zé)讀寫,業(yè)務(wù)處理可以用線程池,在服務(wù)端啟動(dòng)時(shí)配置(也可以選擇不用線程池,這個(gè)看具體業(yè)務(wù)需求)

image.png
image.png

2、Netty中如何使用Reactor模式

  • 單線程Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
  • 多線程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
//Handler使用線程池進(jìn)行處理
  • 主從Reactors 模式(官方推薦)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )

EventLoopGroup初始化是創(chuàng)建創(chuàng)建兩個(gè)NioEventLoopGroup類型的Reactor線程池bossGroup和workGroup分別用來處理客戶端的連接請(qǐng)求(bossGroup)和通道IO事件(workerGroup);
注:new NioEventLoopGroup()默認(rèn)創(chuàng)建cpu核數(shù)*2的線程數(shù)

主從模式的好處有:
1、業(yè)務(wù)解耦:一個(gè)reactor用來處理客戶端連接,一個(gè)reactor用來處理業(yè)務(wù)
2、安全性:業(yè)務(wù)解耦以后,就可以在bossGroup中做一些SSL校驗(yàn)、ip黑名單、登錄之類的安全性校驗(yàn)
3、性能提升:只有通過安全性校驗(yàn)的客戶端才能繼續(xù)進(jìn)行業(yè)務(wù)處理,這樣也能提升處理性能,否則大量的無效客戶端接入和正常的業(yè)務(wù)處理混雜在一起,影響業(yè)務(wù)處理性能。
使用demo:

serverBootstrap.group(boss,worker).handler(new RuleBasedIpFilter()).childHandler(new NettyServerInitializer(this.factoryCode));

3、Netty EventLoop源碼解析

1、NioEventLoopGroup整體結(jié)構(gòu)

image.png

EventExecutorGroup視圖

image.png

new NioEventLoopGroup源碼

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //EventExecutorGroup里面有一個(gè)EventExecutor數(shù)組,保存了多個(gè)EventExecutor;
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //初始化EventExecutor數(shù)組,數(shù)組是NioEventLoop,見下面
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        //EventExecutorChooser.next()定義選擇EventExecutor的策略;
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

NioEventLoopGroup.class

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NioEventLoop.class

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        //創(chuàng)建selector
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
  • EventExecutorGroup里面有一個(gè)EventExecutor數(shù)組,保存了多個(gè)EventExecutor(NIOEventLoop);
  • EventExecutorGroup是不干什么事情的,當(dāng)收到一個(gè)請(qǐng)后,他就調(diào)用next()獲得一個(gè)它里面的EventExecutor,再調(diào)用這個(gè)executor的方法;
  • EventExecutorChooserFactory.EventExecutorChooser.next()定義選擇EventExecutor的策略(有兩種,都是輪詢);

2、NioEventLoopGroup創(chuàng)建分析

bossGroup

image.png

注:從圖中可以看出,一個(gè)NioEventLoopGroup中包含多個(gè)NioEventLoop,一個(gè)NioEventLoop中包含一個(gè)Selector,Selector監(jiān)聽NioServerSocketChannel,當(dāng)NioServerSocketChannel上有客戶端channel連接后,觸發(fā)Acceptor事件,在ServerBootstrapAcceptor handler中轉(zhuǎn)發(fā)給workGroup

workerGroup

image.png

當(dāng)客戶端channel初次連接時(shí),將其注冊(cè)到workGroup中的NioEventLoop上(通過EventExecuorChooser.next()獲取workGroup中的一個(gè)NioEventLoop),然后NioEventLoop中的Selector不斷輪詢其所管理的NioSocketChannel,如果其中有讀寫事件準(zhǔn)備好,則由DefaultChannelPipeline處理。

3、ServerBootstrap啟動(dòng)流程分析

image.png

4、ServerBootstrap執(zhí)行流程分析

image.png
        // 配置服務(wù)端的NIO線程組
        // 主線程組, 用于接受客戶端的連接,但是不做任何具體業(yè)務(wù)處理,像老板一樣,
        //負(fù)責(zé)接待客戶,不具體服務(wù)客戶
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 工作線程組, 老板線程組會(huì)把任務(wù)丟給他,讓手下線程組去做任務(wù),服務(wù)客戶
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             //欲加到NioServerSocketChannel Pipeline的handler
             .handler(new LoggingHandler(LogLevel.INFO))
              //欲加到NioSocketChannel(accept()返回的)Pipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                     ch.pipeline().addLast("decoder", new StringDecoder());
                     ch.pipeline().addLast("encoder", new StringEncoder());
                     ch.pipeline().addLast(new EchoServerHandler());
 
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 綁定端口,開始接收進(jìn)來的連接
            ChannelFuture f = b.bind(port).sync(); // (7)
image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容