netty服務(wù)端源碼分析之eventloop和eventloopgroup

在眾多的編程語言和網(wǎng)絡(luò)庫中,拿來介紹網(wǎng)絡(luò)編程的例子,echo服務(wù)器和客戶端恐怕是最多的一個例子。netty作為一個在java語言中應(yīng)用非常廣泛、非常優(yōu)秀的網(wǎng)絡(luò)編程框架,echo服務(wù)器和客戶端程序往往是大家第一個接觸的實例程序。很多工程師正是通過echo服務(wù)器和客戶端跨入netty的大門。

我們先看一個echo服務(wù)器的完整實例代碼:

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
    public static void main(String[] args) throws Exception {
        
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                // 把ServerBootstrap的channelfactory設(shè)置為ReflectiveChannelFactory
                // 當(dāng)執(zhí)行channelfactory的newChannel方法時,會創(chuàng)建NioServerSocketChannel實例
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
                // 處理server Socket事件
             .handler(new LoggingHandler(LogLevel.INFO))
                // 處理client socket事件
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

代碼中前面一段跟SSL相關(guān)的代碼我們先跳過,從這兩行代碼開始:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

這里創(chuàng)建了兩個EventLoopGroup對象bossGroupworkerGroup,bossGroup主要用來處理server socket監(jiān)聽client socket的連接請求,server socket接收了新建連接后,會把connection socket放到workerGroup中進(jìn)行處理,也就是說workerGroup主要用來處理connection socket的網(wǎng)絡(luò)IO和相關(guān)的業(yè)務(wù)邏輯。netty支持很多種線程模型,這種bossGroupworkerGroup的組合是比較典型的。

netty作為一個異步的事件驅(qū)動(event drivern)的網(wǎng)絡(luò)應(yīng)用程序框架,采用非阻塞的多路復(fù)用的網(wǎng)絡(luò)io模型。連接建立、連接關(guān)閉、socket可讀、socket可寫這些狀態(tài)的改變稱為一個個“事件(event)”,當(dāng)然事件也可以是應(yīng)用程序添加的一個任務(wù)。而處理這些event的線程就是一個無限循環(huán)(for or while loop),沒有事件發(fā)生時,線程一直等待事件的產(chǎn)生。當(dāng)有事件發(fā)生時,則處理所有的事件,處理完后則開始下一個循環(huán),繼續(xù)等待新的事件的發(fā)生。所以這種處理邏輯叫做eventloop,eventloop與線程是一一對應(yīng)的,一個eventloop對應(yīng)一個線程,一個線程也只有一個eventloop。而eventloopgroup主要包含了一組eventloop(線程池),當(dāng)然還有許多線程任務(wù)分發(fā)等管理功能。

eventloopgroup/eventloop是netty的核心部件之一,可以說是netty三大核心部件之一,另外兩個是channel和pipeline。eventloopgroup/eventloop定義了netty的線程模型,包括channel對socket的操作、pipeline的業(yè)務(wù)處理、用戶提交的任務(wù)在內(nèi)的所有任務(wù)的分發(fā)調(diào)度和執(zhí)行都是在eventloopgroup/eventloop中進(jìn)行。

這里為eventloopgroup接口實例化的是類NioEventLoopGroup,NioEventLoopGroup基于java nio進(jìn)行網(wǎng)絡(luò)操作。

我們跟蹤一下NioEventLoopGroup的構(gòu)造函數(shù),

NioEventLoopGroup

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                  final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

然后進(jìn)入他的父類:

MultithreadEventLoopGroup

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

繼續(xù)進(jìn)入上一級父類:

MultithreadEventExecutorGroup

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        
    }

這里進(jìn)入了一個有實質(zhì)內(nèi)容的構(gòu)造函數(shù),在分析這個函數(shù)之前,我們先根據(jù)構(gòu)造函數(shù)的調(diào)用鏈?zhǔn)崂硪幌聵?gòu)造函數(shù)的入?yún)⒌闹怠?/p>

nThreads = (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads);
executor = null;
chooserFactory = DefaultEventExecutorChooserFactory.INSTANCE;

args = [SelectorProvider SelectorProvider.provider(),
        SelectStrategyFactory DefaultSelectStrategyFactory.INSTANCE,
        RejectedExecutionHandler RejectedExecutionHandlers.reject()]

可見如果NioEventLoopGroup的構(gòu)造函數(shù)如果nThreads為非0值,則為傳入的實際值,如果為0或沒有參數(shù),則為DEFAULT_EVENT_LOOP_THREADS,我們看下DEFAULT_EVENT_LOOP_THREADS的定義,

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

繼續(xù)跟下去,可以知道NettyRuntime.availableProcessors()默認(rèn)值為Runtime.getRuntime().availableProcessors(),也就是說nThreads值默認(rèn)為CPU核心數(shù)的2倍

接下來我們分析構(gòu)造函數(shù):

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)

首先為executor重新賦值:

    if (executor == null) {
        // DefaultThreadFactory設(shè)置: 非daemon、線程優(yōu)先級5、線程名字、線程組為當(dāng)前線程所屬組
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

因為executor入?yún)⒅禐閚ull,所以executor重新賦值為ThreadPerTaskExecutor類的一個新建實例,ThreadPerTaskExecutor就是eventloop的執(zhí)行器,eventloop的所有任務(wù)都是通過調(diào)用ThreadPerTaskExecutorexecute()函數(shù),從而創(chuàng)建一個線程,進(jìn)而執(zhí)行提交的任務(wù)的。ThreadPerTaskExecutor只有一個成員,線程工廠DefaultThreadFactory,這個線程工廠創(chuàng)建的線程實例設(shè)置的線程屬性為:非daemon、線程優(yōu)先級5、線程名的前綴、所屬的線程組與當(dāng)前線程相同,一句話,創(chuàng)建的是一個普通線程,其中線程名的前綴為:
nioEventLoopGroup-poolId-, poolId初始值為0,每創(chuàng)建一個DefaultThreadFactory實例值加1。

然后初始化EventLoopGroup的線程池,具體代碼如下:

        // 創(chuàng)建一個數(shù)組,數(shù)組大小為線程的個數(shù)
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 初始化線程executor
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 如果初始化一個線程executor時拋出異常,則執(zhí)行這一段代碼:關(guān)閉前面創(chuàng)建的所有executor,并等待所有線程退出運行
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

初始化過程,在代碼注釋中已經(jīng)說明了,關(guān)鍵語句children[i] = newChild(executor, args);我們先放下,待會再詳細(xì)分析。

線程選擇器初始化:

chooser = chooserFactory.newChooser(children);

chooser實現(xiàn)了eventloopgroup的任務(wù)分派,當(dāng)需要向線程池提交一個新的任務(wù)時,如注冊一個新的服務(wù)端socket到eventloop,則通過chooser選擇一個具體的executor。從前面分析得知,chooserFactory是DefaultEventExecutorChooserFactory的一個實例,它的executor選擇邏輯代碼如下:

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        // executor數(shù)是2的n次方,用與來計算,改善性能
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        // executor數(shù)不是2的n次方,用除法求余的方式計算
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

可見,線程池中的個數(shù)是2的n次方與不是2的n次方,具體的計算方法不一樣,但效果都是round-robin的選擇方法。

最后,為線程池的每一個EventExcutor注冊一個結(jié)束后的回調(diào)函數(shù),并把所有的EventExcutor保存到一個不可修改的Set中。具體代碼如下:

        // 線程結(jié)束時的回調(diào)函數(shù)
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        // 注冊回調(diào)函數(shù)
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);

這個構(gòu)造函數(shù)分析完了,下面分析eventloop以及它的初始化,也就是前面遺留沒有分析的children[i] = newChild(executor, args); newChild是一個抽象函數(shù):

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

我們先看看類NioEventLoopGroup的繼承體系:

NioEventLoopGroup的繼承體系.png

然后從NioEventLoopGroup開始往上找哪個最底層的類實現(xiàn)了newChild()函數(shù)。

然后在類NioEventLoopGroup中找到了它的實現(xiàn),

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

可見EventExecutor是NioEventLoop的一個實例,按照分析NioEventLoopGroup一樣的分析過程,這里我不在粘貼具體代碼了,只列出NioEventLoop的關(guān)鍵域的初始值,
NioEventLoop的繼承體系:

NioEventLoop的繼承體系.png

關(guān)鍵域的初始值及說明:

SelectorProvider provider = SelectorProvider.provider()
Selector selector = provider.openSelector()
SelectStrategy selectStrategy = DefaultSelectStrategyFactory.INSTANCE
boolean addTaskWakesUp = false // 添加任務(wù)時不wakeup線程eventloop
int maxPendingTasks = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); // 最小值為16,默認(rèn)值為Integer.MAX_VALUE
Queue<Runnable> tailTasks = new LinkedBlockingQueue<Runnable>(maxPendingTasks)
Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks)
EventExecutorGroup parent = 執(zhí)行newChild函數(shù)的NioEventLoopGroup實例
Executor executor = ThreadPerTaskExecutor的實例

這里特別說明一下providerselector。
provider在不同的操作系統(tǒng)平臺下有不同的實現(xiàn),windowns平臺是WindowsSelectorProvider。在linux平臺下,如果內(nèi)核版本>=2.6則,具體的實現(xiàn)為EPollSelectorProvider,否則為默認(rèn)的PollSelectorProvider。MAC平臺下則是KQueueSelectorProvider。
selector是netty的一個核心概念,封裝了不同平臺的多路復(fù)用網(wǎng)絡(luò)io模式,eventloop正是通過向selector注冊感興趣的事件(event),然后等待事件發(fā)生,從而實現(xiàn)非阻塞異步處理事件的目的。在linux平臺下,底層通過epoll實現(xiàn),執(zhí)行provider.openSelector()可以理解為執(zhí)行了epoll_init()系統(tǒng)調(diào)用,會返回一個epoll句柄給selector,然后通過selector可以向epoll注冊感興趣的事件,并在事件發(fā)生時獲取發(fā)生的事件,然后對事件進(jìn)行相應(yīng)的處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容