第二十節(jié) netty源碼分析之 reactor中的EventLoop01

EventLoopGroup

(如果使用到的是 NIO, 那么通常是 NioEventLoopGroup), 那么這個(gè) NioEventLoopGroup 在 Netty 中到底扮演著什么角色呢?
NIO 的Reactor 模型

  • 補(bǔ)充多線程的reactor模式
Reactor 多線程模型 有如下特點(diǎn):
    有專門一個(gè)線程, 即 Acceptor 線程用于監(jiān)聽客戶端的TCP連接請(qǐng)求.
    客戶端連接的 IO 操作都是由一個(gè)特定的 NIO 線程池負(fù)責(zé). 每個(gè)客戶端連接都與一個(gè)特定的 NIO 線程綁定, 因此在這個(gè)客戶端連接中的所有 IO 操作都是在同一個(gè)線程中完成的.
    客戶端連接有很多, 但是 NIO 線程數(shù)是比較少的, 因此一個(gè) NIO 線程可以同時(shí)綁定到多個(gè)客戶端連接中.
圖片.png

Netty 是 Reactor 模型與NIO的Reactor 本質(zhì)上區(qū)別不是很大。那么和nio中的實(shí)現(xiàn)有哪些不同的。下面我們分析:

reactor 一般是服務(wù)端用的最多,這里我們以EchoServer分析
單線程模式:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
 ...

多線程模式

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...
  • 上面兩端代碼,區(qū)別其實(shí)就是單線程重載方法group。
 @Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

接下來(lái)分析reactor的核心NioEventLoopGroup,來(lái)確定這是個(gè)什么玩意,為什么它能充當(dāng)一個(gè)線程組
類圖如下:


圖片.png
   public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    //調(diào)用下面
     public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
//調(diào)用下面
public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    ///調(diào)用下面
     public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
//繼續(xù)調(diào)用父類MultithreadEventLoopGroup
 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
 //在次調(diào)用父類的父類MultithreadEventExecutorGroup
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    

注意:這里我們初始話executor為null那么后續(xù)我們猜測(cè)應(yīng)該netty會(huì)為我們創(chuàng)建默認(rèn)的executor。SelectorProvider.provider()這個(gè)方法前面介紹過(guò),會(huì)根據(jù)當(dāng)前系統(tǒng)來(lái)選擇核實(shí)的io多路復(fù)用(select、poll、epoll)。DefaultSelectStrategy默認(rèn)策略 。Execution的拒絕策略reject(線程池的拒絕策略)

最后在父類的父類MultithreadEventExecutorGroup構(gòu)造器中

 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
//創(chuàng)建一個(gè)大小為 nThreads 的 EventExecutor數(shù)組
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //newChild的實(shí)現(xiàn)類在NioEventLoopGroup中,返回NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
//從DefaultEventExecutorChooserFactory工廠實(shí)現(xiàn)類中的newChooser方法: 根據(jù)線程數(shù)在children 數(shù)組中選出一個(gè)合適的 EventExecutor 實(shí)例
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
  1. 已知children是一個(gè)EventExecutor數(shù)組, 而ThreadPerTaskExecutor是Executor,最后使用newChild方法將ThreadPerTaskExecutor封裝成EventLoop放到數(shù)組中
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

newChild方法將ThreadPerTaskExecutor封裝成EventLoop放到數(shù)組中

@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

綜上所述,我們可以先猜測(cè)這個(gè)EventLoop的作用,可能是客戶端一旦和服務(wù)端accept后會(huì)將task丟到從EventExecutor數(shù)組取出一個(gè)EventLoop來(lái)執(zhí)行,那么會(huì)是這樣嗎?我們來(lái)繼續(xù)

簡(jiǎn)要分析下NioEventLoop:


圖片.png

NioEventLoop的繼承很多,這里我們只需了解他的父類SingleThreadEventExecutor 構(gòu)造器中, 通過(guò) threadFactory.newThread 創(chuàng)建了一個(gè)新的 Java 線程. 在這個(gè)線程中所做的事情主要就是調(diào)用 SingleThreadEventExecutor.this.run() 方法, 而因?yàn)?NioEventLoop 實(shí)現(xiàn)了這個(gè)方法, 因此根據(jù)多態(tài)性, 其實(shí)調(diào)用的是 NioEventLoop.run() 方法.

接下來(lái)我們追蹤這個(gè)NioEventLoop是在哪里器作用的,需要注意的是我們使用了兩個(gè)NioEventLoopGroup,一個(gè)是bossGroup一個(gè)是workerGroup

 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }
  • 作為服務(wù)端我們肯定是要從啟動(dòng)的bind入手分析:
    根據(jù)之前服務(wù)端的分析,我們一路找到ServerBootstrap父類AbstractBootstrap中doBind0這個(gè)方法
private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

根據(jù)前面的分析channel.eventLoop()取得為bossgroup,也就是應(yīng)該accept的線程,正好 channel.bind也同時(shí)印證了我們的猜想。那么接下來(lái)workgroup從哪里來(lái)呢

  • 想一下處理io阻塞事件在netty中一般是一何種形式處理的呢,對(duì)了就是handler,一般在ServerBootstrapAcceptor這handler和客戶端連接后就會(huì)交個(gè)后面的handler處理,在哪里處理就是在childgroup線程組中處理

回想一下,在分析server端的是我們有介紹過(guò)ServerBootstrap實(shí)現(xiàn)的init初始化handler,這里出現(xiàn)過(guò)childGroup,正是我們苦苦尋找的workgroup

//這里初始化的為nioserverchannel
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                //這里從config獲取的handler為parent handler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
//currentChildGroup、currentChildHandler客戶端的連接的 IO 交互
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

簡(jiǎn)要分析下,像pipeline中添加ChannelInitializer,前面分析pipeline已經(jīng)知道之后再register掉initChannel方法。添加的ServerBootstrapAcceptor這個(gè)handler
在它抄寫了channelread 事件,然后交給childgroup線程處理自定義handler
ServerBootstrapAcceptor中channelRead方法


//inbound事件到來(lái)時(shí),這里就是客戶端和
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //將操作io的handler綁定到childGroup,執(zhí)行完成后斷開childchannel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

基本上就暫時(shí)分析enveloop作為netty的reactor模式的核心。bossgroup、workgroup等作用

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1-netty源碼分析之Server 看netty源碼之后進(jìn)行總結(jié)的第一篇筆記,無(wú)非幫助自己對(duì)于看代碼的一個(gè)總結(jié),...
    致慮閱讀 932評(píng)論 0 5
  • 我們知道, 一個(gè) Netty 程序啟動(dòng)時(shí), 至少要指定一個(gè) EventLoopGroup(如果使用到的是 NIO,...
    tracy_668閱讀 4,636評(píng)論 1 7
  • 從前面的文章中我們已經(jīng)知道了,一個(gè) Netty 程序啟動(dòng)時(shí),至少要指定一個(gè) EventLoopGroup(如果使用...
    WEIJAVA閱讀 396評(píng)論 0 0
  • background netty 是一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)通信層框架,其官方文檔的解釋為 Netty is a N...
    高級(jí)java架構(gòu)師閱讀 662評(píng)論 0 0
  • 在本文中主要是深入了解EventLoop,以便對(duì)netty的線程模型有更好的了解。Netty是Reactor模型的...
    xiehongm_信陵閱讀 1,217評(píng)論 0 2

友情鏈接更多精彩內(nèi)容