手寫RPC框架(5)-Netty入門了解和實(shí)踐

手寫RPC框架
1、手寫一個(gè)RPC框架,看看100個(gè)線程同時(shí)調(diào)用效果如何
2、手寫RPC框架(2)-引入zookeeper做服務(wù)治理
3、手寫RPC框架(3)-引入Hessian序列化工具
4、手寫RPC框架(4)-重寫服務(wù)治理,開啟1000個(gè)線程看看netty的執(zhí)行調(diào)用情況

Netty是基于NIO的的服務(wù)框架,屏蔽了使用Java原生NIO網(wǎng)絡(luò)模型的各種問題,對(duì)外提供靈活的Reactor模型配置,也提供了插拔式的Handler處理器,便于支持各種網(wǎng)絡(luò)協(xié)議和特定業(yè)務(wù)等操作,也是異步事件驅(qū)動(dòng),使得性能能夠更高。此前RPC中關(guān)于Netty的代碼邏輯存在些問題,對(duì)Netty的一些概念也沒有理解到位,所以這次就一起再學(xué)習(xí)Netty,先寫一個(gè)demo有大致的了解和印象,隨后通過問題介紹各個(gè)組件的功能和特點(diǎn),其原因是什么。

  • 粘包、拆包是什么情況,為什么會(huì)發(fā)生這種情況?
  • pipeline 和 handler是什么關(guān)系?
  • pipeline.addLast的順序是如何執(zhí)行的?
  • handler中的各個(gè)fireXXX執(zhí)行順序是怎樣的?
  • 為什么server是2個(gè)EventLoopGroup,而client卻只有1個(gè)EventLoopGroup?

Demo

運(yùn)行效果如下圖

image

服務(wù)端

public class Server {
    public void run(int port) throws InterruptedException {
        EventLoopGroup workGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    });

            ChannelFuture cf = serverBootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Server().run(10002);
    }
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered ...");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered ...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive ...");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive ...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Client:[" + body + "]");

        String cur = ("Hello, My name is jwfy".equalsIgnoreCase(body) ? "OK" : "ERROR") + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(cur.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete ...");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客戶端

public class Client {
    public void connection(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            ChannelFuture cf = bootstrap.connect(host, port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Client().connection("127.0.0.1", 10002);
    }
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered ...");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered ...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive ...");
        for(int i = 0; i < 1; i++ ) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(("Hello, My name is jwfy" + System.getProperty("line.separator")).getBytes()));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive ...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Server:[" + body + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

EventLoopGroup

EventLoopGroup 是一種Reactor多線程模型的抽象,具體實(shí)現(xiàn)一般都是NioEventLoopGroup。而Reactor模型又有單線程、多線程、以及主從多線程模型,他們有什么區(qū)別呢?

單線程模型

image

1個(gè)NIO線程原則上可以負(fù)責(zé)所有IO相關(guān)的請(qǐng)求操作,通過acceptor接收客戶端發(fā)生的TCP請(qǐng)求,當(dāng)鏈接建立成功之后,通過Dispatch將對(duì)于的請(qǐng)求數(shù)據(jù)包裝成bytebuf指派給相關(guān)的handler處理。但是這在某些場(chǎng)景下也不太合適。

  • 一個(gè)NIO線程同時(shí)管理成百上千的客戶端鏈接,會(huì)嚴(yán)重影響性能
  • 當(dāng)NIO線程的負(fù)載很高時(shí),導(dǎo)致處理速度變慢,同時(shí)還可能因?yàn)槟骋徽?qǐng)求影響整個(gè)NIO線程的工作,進(jìn)而影響其他端口的處理請(qǐng)求。故有了多線程模型。

多線程模型

image

同樣是一個(gè)NIO線程接收客戶端的請(qǐng)求調(diào)用,當(dāng)鏈接完成后請(qǐng)求會(huì)分配給一個(gè)NIO線程池,具體的消息序列化反序列化、數(shù)據(jù)處理等任務(wù)可有NIO線程池中的線程完成。

同樣的基本情況下是沒有問題的,但是多個(gè)客戶端連接依舊可能出現(xiàn)性能問題,故有了主從多線程模型

主從多線程模型

主從情況就是從一個(gè)NIO線程變成了一個(gè)NIO線程池,可同時(shí)由多個(gè)NIO線程處理客戶端的請(qǐng)求連接操作,減少因?yàn)樾阅懿蛔銓?dǎo)致的問題,這也是netty推薦的使用方法。

EventLoopGroup 則也是一個(gè)NIO線程池,即可用于客戶端的TCP請(qǐng)連接求,也可用于數(shù)據(jù)的IO處理,所以在上述代碼中觀察發(fā)現(xiàn)服務(wù)端和客戶端的EventLoopGroup個(gè)數(shù)不一樣也是這個(gè)道理,服務(wù)端一個(gè)線程池用來接收客戶端連接,另一個(gè)則用來進(jìn)行讀寫IO操作。

粘包、拆包

眾所周知,網(wǎng)絡(luò)上的傳輸?shù)亩际亲止?jié)流,從TCP/IP協(xié)議角度出發(fā)無法知道具體的業(yè)務(wù)數(shù)據(jù)組裝情況,所以實(shí)際場(chǎng)景中一個(gè)請(qǐng)求可能被分批次傳輸,也有可能因?yàn)檎?qǐng)求數(shù)據(jù)太少故打包多個(gè)請(qǐng)求統(tǒng)一傳輸

image

如上圖,正常的情況是分別有D1和D2兩個(gè)數(shù)據(jù)包發(fā)送到服務(wù)端,但是因?yàn)榫W(wǎng)絡(luò)擁塞比較嚴(yán)重,滑動(dòng)窗口自適應(yīng)的縮小,使得1個(gè)緩沖區(qū)的大小無法裝滿整個(gè)請(qǐng)求體,就會(huì)出現(xiàn)拆包的情況;又例如請(qǐng)求體內(nèi)容較少,無法填充完整緩沖區(qū),那么就會(huì)等待多個(gè)請(qǐng)求把緩沖區(qū)填滿再發(fā)送出去,就會(huì)出現(xiàn)粘包的情況,如下距離:

  • D2和D1 同時(shí)發(fā)送到服務(wù)端,那么服務(wù)端則需要正確的進(jìn)行拆分處理,否則反序列化會(huì)失敗
  • D1和D2 的一部分D2_1 同時(shí)發(fā)送到服務(wù)端,服務(wù)端除了需要把D1拆出來,還需要等待D2_2的到來才能開始處理D2數(shù)據(jù)

必須首先處理好拆包和粘包問題,才能保證收到正常的完整的消息,而netty則幫我們解決了大部分問題了,例如根據(jù)長(zhǎng)度拆分(FixedLengthFrameDecoder),根據(jù)換行符拆分(LineBasedFrameDecoder),又或者分割符拆分(DelimiterBasedFrameDecoder),只是在本Demo中使用的是換行符切分的LineBasedFrameDecoder

ChannelPipeline 和 ChannelHandler

image

ChannelPipeline 是一個(gè)擁有頭(Head)和尾(Tail)的雙向鏈?zhǔn)饺萜?,可自由添加不同的handler處理器以滿足不同的業(yè)務(wù)需求。同時(shí)因?yàn)橛袕耐饨缱x取數(shù)據(jù)和發(fā)送數(shù)據(jù)兩種場(chǎng)景,所以有inbound和outbound兩種情況。

image

ChannelHandler 則是具體的處理器,可通過addLast方式添加到pipeline管道鏈路上,如粘包說的LineBasedFrameDecoder也是一種具體的handler處理器。demo中提到的添加自定義handler代碼塊如下所示

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());

通過這種添加方式形成了下面的鏈路

HEAD                                             TAIL
LineBasedFrameDecoder -> StringDecoder ->  TimeServerHandler
  • netty讀取的規(guī)則是從head開始的,先進(jìn)行拆分、粘包的處理,再反序列化,后面交由具體的業(yè)務(wù)處理器,是inbound類處理器
  • netty寫出的規(guī)則是從tail開始的,先進(jìn)行數(shù)據(jù)的序列號(hào),再發(fā)送出去,是outbound類處理器

這整個(gè)鏈路是比較清晰完整的,如果把StringDecoder和LineBasedFrameDecoder的處理器順序換一下,則會(huì)發(fā)現(xiàn)出現(xiàn)錯(cuò)誤,如下圖

image

圈住的地方換行符就是我們代碼中添加的 System.getProperty("line.separator") 換行導(dǎo)致,因?yàn)檫@個(gè)就1次調(diào)用,所以會(huì)發(fā)現(xiàn)只進(jìn)行了字符串的轉(zhuǎn)換,并沒有進(jìn)行拆包處理,再次把請(qǐng)求的數(shù)據(jù)量加大些,再測(cè)試看看

image

會(huì)發(fā)現(xiàn)服務(wù)端接收到的數(shù)據(jù)全部錯(cuò)誤了,沒有一個(gè)正確,切記不要把handler處理器順序搞錯(cuò),如下圖是netty源碼中關(guān)于順序的說明情況。

image

Handler 生命周期

handler在處理的時(shí)候是有著一定的順序,例如服務(wù)端先接收請(qǐng)求的注冊(cè),等到TCP/IP三次握手完成后,相當(dāng)于channel激活完成,開始接受客戶端正常的請(qǐng)求調(diào)用,然后返回響應(yīng)結(jié)果等,客戶端關(guān)閉后,服務(wù)端也需要進(jìn)行取消激活,關(guān)閉注冊(cè)的操作,以放棄該channel的管理操作。通過對(duì)其各個(gè)步驟的生命周期的管理,可以實(shí)現(xiàn)自定義的各種管理和控制。fireXXX又被包裝成類似于channelRegistered的名字,如下圖的調(diào)用過程

**該圖來自 http://www.jiangxindc.com/view/2398**

如本demo的運(yùn)行結(jié)果也可以很明顯的看出其執(zhí)行鏈路。

image

結(jié)束

到此netty的學(xué)習(xí)就結(jié)束了,并沒有介紹的太深入,也只是把常用的組件知識(shí)梳理了一遍,以便于我們?cè)谑褂胣etty的時(shí)候注意到這些問題,以發(fā)揮netty的最大功效,文中很多內(nèi)容都參考自《Netty權(quán)威指南》,大家如果有興趣的話可以自行閱讀學(xué)習(xí)和加強(qiáng)理解,下一期將會(huì)進(jìn)行RPC代碼中的netty部門的改造。

如代碼存在的問題歡迎提出~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 Netty是一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,用于快速開發(fā)可維護(hù)的高性能協(xié)議服務(wù)器和客戶端。 Netty4...
    L2先森閱讀 8,483評(píng)論 2 1
  • 筆者所有文章第一時(shí)間發(fā)布于:hhbbz的個(gè)人博客 Netty的簡(jiǎn)單介紹 Netty 是一個(gè) NIO client-...
    蝦餃閱讀 527評(píng)論 0 0
  • 在學(xué)習(xí)netty源碼之前,應(yīng)該對(duì)netty的基本用法有所了解,由于netty大多數(shù)時(shí)候用于開發(fā)服務(wù)器端程序,因此下...
    史圣杰閱讀 648評(píng)論 0 2
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開了第一次的黨會(huì),身份的轉(zhuǎn)變要...
    余生動(dòng)聽閱讀 10,805評(píng)論 0 11
  • 彩排完,天已黑
    劉凱書法閱讀 4,467評(píng)論 1 3

友情鏈接更多精彩內(nèi)容