Netty簡單入門

筆者最近在看Netty相關的東西,想把過程中所學到的和感悟記錄下來,于是決定單獨開一個專欄,專門記錄Netty相關的文章。

第一篇就從「簡單入門」開始吧?。。?/p>

Netty簡介

Netty是由JBOSS提供的一個java開源框架,現(xiàn)為 Github上的獨立項目。Netty提供異步的、事件驅動的網(wǎng)絡應用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。

提取句子主干,首先,Netty是一個網(wǎng)絡應用程序框架,也可以理解為網(wǎng)絡IO框架。利用Netty,開發(fā)者可以快速開發(fā)出一個高性能、高可靠的網(wǎng)絡服務器或客戶端程序。

例如,你要開發(fā)一個RPC框架,生產者需要暴露服務,消費者需要調用服務。生產者和消費者之間如何通信呢?使用什么協(xié)議通信呢?雙方通信的IO模型如何定義呢?通過Netty就可以快速實現(xiàn)。

Netty的特點就是異步的、事件驅動的、高性能的,下面分別說下。

異步

在Netty中,所有的IO操作都是異步的,這意味著如:接收請求,Channel數(shù)據(jù)的讀寫等操作都不會阻塞,Netty會返回一個ChannelFuture,它是一個異步操作的結果占位符。如果開發(fā)者就是想同步調用怎么辦?通過調用ChannelFuture.sync()可以異步轉同步,但是非常不建議這么做,它會阻塞當前線程,這和高性能是相悖的。

ChannelFuture的接口定義:

public interface ChannelFuture extends Future<Void>

泛型是Void,這意味著你并不能通過ChannelFuture獲取到操作操作的結果,但是你可以通過addListener()來注冊回調,Netty會在異步操作完成時觸發(fā)回調,這時你可以知道操作是否成功,以決定后續(xù)的操作。

Netty官方推薦使用addListener()注冊監(jiān)聽來獲取結果,而不是調用await(),await()會阻塞當前線程,這不僅浪費了當前線程資源,而且線程間的切換和數(shù)據(jù)同步需要較大的開銷。
另外需要特別注意的是:不要在ChannelHandler中調用await(),Channel整個生命周期事件都由一個唯一綁定的EventLoop線程處理,執(zhí)行ChannelHandler邏輯的也是EventLoop,調用await()相當于線程本身在等待自己操作完成的一個結果,這會導致死鎖。

相比之下,addListener()是完全非阻塞的,它會注冊一個監(jiān)聽到Channel,當異步操作完成時,EventLoop會負責觸發(fā)回調,性能是最優(yōu)的。

事件驅動

Netty程序是靠事件來驅動執(zhí)行的。

Netty使用不同的事件來通知我們,我們可以根據(jù)已經(jīng)發(fā)生的事件來執(zhí)行對應的動作邏輯。

Netty是一個網(wǎng)絡IO框架,所以事件可以按照出站和入站進行分類:

  • 入站
    1. 連接已激活/失活。
    2. 有數(shù)據(jù)可以讀取。
    3. 用戶自定義事件。
    4. 異常事件。
  • 出站
    1. 打開/關閉到遠程節(jié)點的連接。
    2. 將數(shù)據(jù)write/flush到Socket。

當Channel被注冊到EventLoop后,該EventLoop會開啟線程不斷輪詢,直到Channel有事件發(fā)生。有事件發(fā)生時,EventLoop會觸發(fā)相應的回調,通過ChannelPipeline進行事件的傳播。


image

高性能

Netty開發(fā)服務端程序時,面對海量客戶端連接,還必須保證高性能。

Netty為了高性能做了很多努力和優(yōu)化,這里簡單列下,后面會詳細說明,包括但不僅限于:

  1. 非阻塞的Nio編程,主從Reactor線程模型,只需少量線程即可應對海量連接。
  2. 基于引用計數(shù)算法的內存池化技術,避免ByteBuf的頻繁創(chuàng)建和銷毀。
  3. 更少的內存復制:
    • Socket讀寫數(shù)據(jù)使用堆外內存,避免內存拷貝。
    • CompositeByteBuf組合ByteBuf,實現(xiàn)數(shù)據(jù)零拷貝。
    • 文件傳輸FileRegion避免內存拷貝。
  1. 局部無鎖化,EventLoop串行執(zhí)行事件和任務,避免了線程競爭和數(shù)據(jù)同步。
  2. Netty實現(xiàn)的MpscQueue高性能無鎖隊列。
  3. 反射替換SelectorImpl的selectedKeys,將HashSet替換為數(shù)組,避免哈希沖突。
  4. FastThreadLocal使用數(shù)組代替Hash表,帶來更好的訪問性能。
  5. ... ...想到再補充。

Netty的組件

Channel

Channel譯為「通道」,它代表一個到實體(文件、硬件、Socket等)的開放連接,例如針對網(wǎng)絡有SocketChannl,針對文件有FileChannel等。既然是通道,就代表它可以被打開,也可以被關閉,

Netty沒有使用JDK原生的Channel,而是自己封裝了一個,這樣可以為客戶端和服務端Channel提供一個統(tǒng)一的視圖,使用起來更加方便。

Channel分為兩大類:

  1. 服務端ServerSocketChannel,負責綁定本地端口,監(jiān)聽客戶端的連接請求。
  2. 客戶端SocketChannel,負責和遠程節(jié)點建立連接。

在網(wǎng)絡編程模型中, 服務端和客戶端進行IO數(shù)據(jù)交互的媒介就是Channel,Channel被打開的目的就是與對端進行數(shù)據(jù)交換,你可以通過Channel來給對端發(fā)送數(shù)據(jù),和從對端讀取數(shù)據(jù)。

常用的Channel實現(xiàn)如下:


image

EventLoopGroup和EventLoop

EventLoopGroup本身并不干活,它負責管理一組EventLoop的啟動和停止,它提供一個next()方法從一組EventLoop線程中挑選出一個來執(zhí)行任務。

EventLoopGroup的next()方法依賴一個EventExecutorChooser選擇器,通過選擇器來從一組EventLoop中進行選擇,Netty默認的策略就是簡單輪詢,源碼如下:

/*
創(chuàng)建一個選擇器,從一組EventExecutor中挑選出一個。
Netty默認的選擇策略就是:簡單輪詢。
*/
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 兩種Chooser實現(xiàn)都有一個AtomicLong計數(shù)器,每次next()先自增再取余

    // 如果數(shù)量是2的冪次方數(shù),則采用位運算
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        // 否則,對長度進行取余
        return new GenericEventExecutorChooser(executors);
    }
}

// 2的冪次方數(shù)的選擇器,位運算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        // 計數(shù)器自增 & 長度-1,和HashMap一樣
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

// 普通的選擇器,取余
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

EventLoopGroup管理的一組EventLoop應該趨向于處理同一類任務和事件,例如開發(fā)服務端程序,Netty官方推薦的Reactor主從線程模型需要兩個EventLoopGroup:Boss和Worker,Boss專門負責接收客戶端的連接,連接建立后,Boss會將客戶端Channel注冊到Worker中,由Worker來負責后續(xù)的數(shù)據(jù)讀寫事件。

EventLoopGroup可以理解為是一個多線程的線程池,而EventLoop則是一個單線程的線程池,也是真正干活的角色。

EventLoop不僅可以處理Channel的IO事件,還可以執(zhí)行用戶提交的系統(tǒng)任務,因為它本身就是個線程池。此外,它還實現(xiàn)了ScheduledExecutorService接口,因此它還可以執(zhí)行定時任務。最常見的應用場景就是:你可以每隔一段時間檢測一下客戶端連接是否斷開!

EventLoop是如何工作的呢?

拿最常用的NioEventLoop來說,它內部會持有一個Selector多路復用器,初始化時,Selector也會被一同創(chuàng)建,然后當有任務被提交到NioEventLoop時,它會利用ThreadPerTaskExecutor創(chuàng)建一個線程執(zhí)行run()方法。核心就在run()方法里,它會不斷的輪詢,檢查Selector上是否有準備就緒的Channel需要處理,如果有則根據(jù)SelectionKey的事件類型觸發(fā)相應的事件回調,并通過ChannelPipeline將事件傳播出去。
如果沒有準備就緒的Channel,則去檢查taskQueue中是否有待處理的系統(tǒng)任務、或定時任務,如果有則執(zhí)行,否則就阻塞在Selector.select()上,等待準備就緒的Channel。

這里就簡單過一下吧,后面會有源碼解析的文章,敬請期待!

ChannelFuture

前面已經(jīng)說過,Netty是完全異步的IO框架,它所有的操作都會立即返回,不會阻塞在那里,這對于習慣了同步編程的同學可能要適應一下。你不能再調用一個方法,得到一個結果,根據(jù)結果判斷再去執(zhí)行后面的操作。因為此時異步操作可能還沒有執(zhí)行完,ChannelFuture還沒有結果。

ChannelFuture只是一個異步操作的結果占位符,它代表未來可能會發(fā)生的一個結果,這個結果可能是執(zhí)行成功,或是執(zhí)行失敗得到一個異常信息。

你可以通過調用await()阻塞等待這個操作完成,但是Netty不建議這么去做,這樣會阻塞當前線程,浪費線程資源,而且線程間的切換和數(shù)據(jù)同步都是一個較大的開銷。

Netty推薦使用addListener()來注冊一個回調,當操作執(zhí)行完成/異常時,ChannelFuture會向EventLoop提交任務來觸發(fā)回調,你可以在回調方法里根據(jù)操作結果來執(zhí)行后面的業(yè)務邏輯?;卣{和任務是由同一個線程驅動的,這樣就避免了線程間數(shù)據(jù)同步的問題,性能是最好的。

ChannelPromise和ChannelFuture的區(qū)別?
ChannelPromise是ChannelFuture的子類,是一個特殊的可寫的ChannelFuture。前面說過ChannelFuture代表未來操作的一個結果占位符,使用ChannelFuture你只能乖乖等待結果完成然后觸發(fā)回調,這個結果是由Netty來設置,它沒有提供可寫操作。
而ChannelPromise就不同了,它提供了手動設置結果的API:setSuccess()setFailure(),結果只能設置一次,設置完后會自動觸發(fā)回調。

入站事件處理器ChannelInboundHandler所有操作都不需要提供ChannelPromise,因為這些回調是由Netty來主動觸發(fā)的。而出站事件處理器ChannelOutboundHandler很多操作都需要提供一個ChannelPromise,當出站數(shù)據(jù)處理完成時,你需要往ChannelPromise設置結果來通知回調。

ChannelHandler

ChannelHandler是Netty的事件處理器,根據(jù)數(shù)據(jù)的流向,分為入站、出站兩大類。

  • ChannelInboundHandler:入站事件處理器
  • ChannelOutboundHandler:出站事件處理器

當一個ChannelHandler被添加到Channel的Pipeline后,只要EventLoop輪詢到Channel有事件發(fā)生時,就會根據(jù)事件類型觸發(fā)相應的回調。例如:收到對端發(fā)送的數(shù)據(jù),Channel有數(shù)據(jù)可讀時,會觸發(fā)channelRead()方法。你要做的就是實現(xiàn)ChannelHandler類,重寫channelRead()方法,Netty會將讀取到的數(shù)據(jù)包裝成ByteBuf,至于拿到數(shù)據(jù)要做哪些事,那就是你的業(yè)務了。

對于Netty開發(fā)者來說,你的主要工作,就是開發(fā)ChannelHandler,實現(xiàn)ChannelHandler類,重寫你感興趣的事件即可。

常用的類有:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,分別處理入站和出站事件的,默認全部通過ctx.fireXXX()無腦向后傳播,你只需要重寫你感興趣的事件,不用被迫重寫所有方法了。

繼承SimpleChannelInboundHandler類,你只需要重寫channelRead0()方法,它會在該方法執(zhí)行完畢后自動釋放內存,防止內存泄漏,這一塊后面會詳細說明。

其他的就是Netty內置的一些開箱即用的編解碼器,可以針對公有協(xié)議(如HTTP)進行編解碼,處理讀/寫半包的問題,SslHandler針對讀寫數(shù)據(jù)進行加解密等等。

需要注意的點:

  1. Netty使用池化技術來復用ByteBuf對象,使用完畢后切記及時釋放資源。
  2. 如果你需要將事件傳播下去,必須手動觸發(fā)fireXXX()方法,Pipeline可不會自動幫你傳遞。
  3. 數(shù)據(jù)讀取需要注意:粘包/拆包 問題。
  4. 注意write()消息積壓問題。

ChannelPipeline

Pipeline譯為「管道」,如果把 網(wǎng)絡數(shù)據(jù) 比作水,那它就像 水管 一樣,讀入的數(shù)據(jù)從管道的頭部流入,經(jīng)過一系列ChannelInboundHandler處理,寫出的數(shù)據(jù)從管道的尾部流入,經(jīng)過一系列ChannelOutboundHandler處理,最終通過Socket發(fā)送給對端。

ChannelPipeline是ChannelHandler的容器,默認實現(xiàn)DefaultChannelPipeline是一個雙向鏈表,頭節(jié)點始終是HeadContext,尾節(jié)點始終是TailContext(頭尾節(jié)點有它們自己的職責所在),你可以往中間添加你自定義的ChannelHandler。

HeadContext頭節(jié)點的職責:對于入站事件,它會無腦向后傳播,確保你定義的ChannelHandler事件會被觸發(fā),對于出站事件,它會轉交給Channel.Unsafe執(zhí)行,例如bindwrite等,因為這些操作是偏底層的,需要和底層類打交道,Netty不希望開發(fā)者去調用這些方法。

TailContext尾節(jié)點的職責:對于出站事件,當然是無腦向后傳遞了,但是對于入站事件,如果前面的Handler沒有釋放讀取的數(shù)據(jù)資源,TailContext會自動釋放,避免內存泄漏。對于異常,如果前面的Handler沒有處理,TailContext會打印日志記錄下來,提醒開發(fā)者需要處理異常。

Bootstrap和ServerBootstrap

Netty的引導類,Bootstrap是客戶端的引導類,ServerBootstrap是服務端的引導類。

一個Netty服務的運行需要多個組件互相配合,使用Bootstrap可以快速組裝這些組件,讓它們協(xié)同運行。當然,你也可以脫離Bootstrap,自己去引導服務,只是完全沒有必要而已。

Bootstrap的創(chuàng)建非常簡單,默認的構造器不需要你傳任何參數(shù),這是因為它需要的參數(shù)很多,可能以后的版本還會發(fā)生改變,因此Netty使用建造者Builder模式來構建Bootstrap。

Bootstrap本身邏輯很簡單,它只是負責組裝組件,核心的邏輯都在各個組件里。下面是一個ServerBootstrap的標準啟動代碼示例,這篇文章先簡單帶過,后面會有詳細的源碼解析。

public class Server {
    public static void main(String[] args) {
        /*
        NioEventLoopGroup創(chuàng)建流程:
            1.創(chuàng)建ThreadPerTaskExecutor,利用FastThreadLocalThread來提升FastThreadLocal的性能
            2.初始化children,創(chuàng)建一組NioEventLoop(此時未創(chuàng)建線程)
            3.創(chuàng)建EventExecutorChooser,需要時從Group中輪詢出一個EventLoop來執(zhí)行任務
         */
        final NioEventLoopGroup boss = new NioEventLoopGroup(1);
        final NioEventLoopGroup worker = new NioEventLoopGroup();
        /*
        NioEventLoop流程:
            1.創(chuàng)建兩個任務隊列:taskQueue、tailTaskQueue
            2.openSelector()創(chuàng)建多路復用器Selector
            3.run()輪詢Selector、taskQueue,串行處理IO事件和Task
        懶啟動,只有在第一次execute()提交任務時才會利用executor創(chuàng)建線程
        對于Boss來說,線程啟動是在調用bind()時,提交一個register()任務
        對于Worker,線程啟動是在Boss接收到客戶端連接時,提交一個register()任務
         */
        new ServerBootstrap()
                .group(boss, worker)
                .option(ChannelOption.SO_BACKLOG, 100)
                //.attr(null,null)
                .childOption(ChannelOption.SO_TIMEOUT, 1000)
                //.childAttr(null,null)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
                                ctx.writeAndFlush(buf);
                            }
                        });
                    }
                }).bind(9999);
    }
}

第一個Netty程序

OK,前面介紹完Netty的組件,現(xiàn)在我們就基于這些組件,來寫第一個Netty程序。

下面是一個Echo服務實例,如果有新的客戶端接入,服務端會打印一句話,如果客戶端向服務端發(fā)送數(shù)據(jù),服務端會打印數(shù)據(jù)內容,并將數(shù)據(jù)原樣寫回給客戶端,一個非常簡單的程序。

EchoServer服務端標準實現(xiàn):

public class EchoServer {
    // 綁定的端口
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) {
        // 啟動Echo服務
        new EchoServer(9999).start();
    }

    public void start() {
        /*
        bossGroup負責客戶端的接入
        workerGroup負責IO數(shù)據(jù)的讀寫
         */
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        new ServerBootstrap()
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                super.channelActive(ctx);
                                System.out.println("有新的客戶端連接...");
                            }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                /*
                                原樣寫回給客戶端,因為OutBoundHandler還要使用,因此不能釋放msg。
                                底層數(shù)據(jù)寫完后會自動釋放。
                                 */
                                byte[] bytes = ByteBufUtil.getBytes(((ByteBuf) msg));
                                System.out.println("接受到數(shù)據(jù):" + new String(bytes));
                                ctx.writeAndFlush(msg);
                            }

                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                // 出現(xiàn)異常了
                                cause.printStackTrace();
                                ctx.channel().close();
                            }
                        });
                    }
                })
                .bind(port);
    }
}

EchoClient標準實現(xiàn):

public class EchoClient {
    private final String host;//遠程IP
    private final int port;//遠程端口

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) {
        new EchoClient("127.0.0.1", 9999).start();
    }

    public void start() {
        // 客戶端只需要一個WorkerGroup
        NioEventLoopGroup worker = new NioEventLoopGroup();
        new Bootstrap()
                .group(worker)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                super.channelActive(ctx);
                                System.out.println("連接建立,開始發(fā)送【hello】...");
                                ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
                            }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                String data = ((ByteBuf) msg).toString(Charset.defaultCharset());
                                System.out.println("收到服務端數(shù)據(jù):" + data);
                            }
                        });
                    }
                }).connect(host, port);//連接服務端
    }
}

如上,只需少量代碼就可以快速開發(fā)出一個Echo服務,Netty向開發(fā)者屏蔽了底層實現(xiàn),你甚至都不需要知道Selector多路復用器,Channel是何時注冊到Selector上的?Netty是如何處理IO事件的?網(wǎng)絡數(shù)據(jù)是如何被讀入的?又是如何被寫出的?你只需要開發(fā)ChannelHandler,寫好回調邏輯,等待Netty調用即可。

如果使用JDK原生類網(wǎng)絡編程,Bio和Nio兩種不同的模式代碼風格差異非常大,如果需要在兩者之間做切換,工作量非常巨大。而Netty就顯得非常靈活,只需要將NioEventLoopGroupNioSocketChannel換成OioEventLoopGroupOioSocketChannel即可快速切換,這是Netty易用性和靈活性的極好體現(xiàn)。

總結

Netty作為事件驅動的異步IO框架,在保證高性能的同時,還擁有非常好的靈活性和可擴展性。使用Netty你可以快速構建你的網(wǎng)絡服務,或者開發(fā)一個框架,需要進行節(jié)點間的通信和數(shù)據(jù)傳輸,使用Netty來幫助你完成底層的通信是非常方便和高效的。例如阿里的Dubbo、Facebook的Thrift等RPC框架都使用Netty來完成底層的通信,你甚至可以使用Netty來定制一套你們公司內部的私有協(xié)議,非???!

入門就先寫到這里吧,后面會有Netty服務端啟動全流程的源碼分析,一步一步看看Netty到底干了什么,后面針對Netty對高性能所做出的努力也會單獨寫篇文章,包括FastThreadLocal也會分析源碼,看看Netty是如何提升ThreadLocal的性能的。敬請期待吧?。?!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容