netty4用最簡(jiǎn)單的協(xié)議解決一個(gè)半包問(wèn)題

有時(shí)候簡(jiǎn)化實(shí)現(xiàn)別人的代碼,有助于你更好的理解代碼,不要一味地讀源代碼。

問(wèn)題來(lái)源

客戶端往服務(wù)器發(fā)送小文件

解決思路

1、使用netty(廢話)
2、只是用ByteBuf
3、自定義一種協(xié)議,用最小的網(wǎng)絡(luò)代價(jià)完成數(shù)據(jù)傳送

實(shí)現(xiàn)

其實(shí)netty有很多的定義好的協(xié)議來(lái)解決各種各樣的問(wèn)題,這篇文章來(lái)自《netty權(quán)威指南》作者李林峰,詳細(xì)介紹了netty的編解碼框架,以及一些常用的編解碼協(xié)議。

在解決這個(gè)問(wèn)題的時(shí)候,我遇到的一個(gè)主要問(wèn)題就是我在客戶端發(fā)送一個(gè)數(shù)據(jù)包,這個(gè)數(shù)據(jù)包的大小可以很大,但是如果只用簡(jiǎn)單的channelRead去讀取數(shù)據(jù)的話得到的數(shù)據(jù)并不是完整的。具體原因參考netty用戶指南中的tcp stream-based傳輸?shù)膯?wèn)題。

我先做了一個(gè)簡(jiǎn)單的協(xié)議設(shè)計(jì):
packet = |文件名長(zhǎng)度|文件名|文件字節(jié)長(zhǎng)度|文件字節(jié)流|

于是就有了客戶端發(fā)送的簡(jiǎn)單代碼

            String name = "diagram.png";
            FileInputStream fileInputStream = new FileInputStream(new File("src/main/resources/diagram.png"));
            byte[] bytes = new byte[fileInputStream.available()];
            fileInputStream.read(bytes);

            ByteBuf byteBuf = Unpooled.buffer();

            byteBuf.writeInt("diagram.png".getBytes().length);
            byteBuf.writeBytes("diagram.png".getBytes());

            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
            channelFuture.channel().writeAndFlush(byteBuf);

這樣發(fā)送沒(méi)有問(wèn)題,因?yàn)閎yteBuf是動(dòng)態(tài)擴(kuò)展的。但是接受的時(shí)候就有問(wèn)題了。如果我們接受比較小的,比如一個(gè)int,我們可以直接這樣寫(xiě)

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            if(byteBuf.readableBytes() > 4)
            {
                int result = byteBuf.readInt();
            }
        }
}

但是當(dāng)長(zhǎng)度很大的時(shí)候,我們就需要解決讀半包的問(wèn)題了。直到讀到完整的數(shù)據(jù)才進(jìn)行處理。但是每次收到的數(shù)據(jù)怎么去判斷是不是和上一個(gè)數(shù)據(jù)是連續(xù)的,如何在沒(méi)有收集到完整數(shù)據(jù)時(shí)不處理數(shù)據(jù)而繼續(xù)接受呢?這是我一直困擾的問(wèn)題。因?yàn)槲野衙恳粋€(gè)byteBuf當(dāng)成一個(gè)message來(lái)想了,其實(shí)不是的,ByteBuf中有兩個(gè)指針readIndex和writeIndex,readIndex永遠(yuǎn)小于writeIndex。大概如下圖所示


ByteBuf示例圖

在netty的設(shè)計(jì)中ByteBuf是可以被重用的,所以可能針對(duì)這一個(gè)ChannelRead一直讀取的是同一個(gè)ByteBuf。這其中readrIndex之前的是已經(jīng)讀取過(guò)的,就是已經(jīng)被調(diào)用readXXX()之后的數(shù)據(jù),可以重新去讀取,readerIndex和writerIndex之前的是當(dāng)前的readableBytes,writerIndex到capacity的是writeableBytes,當(dāng)writerIndex超過(guò)capacity時(shí)就會(huì)擴(kuò)展。同時(shí)為了重用這部分空間,當(dāng)調(diào)用discardBytes時(shí),會(huì)把readerIndex和writerIndex拷貝到開(kāi)頭,這樣前面廢棄的部分就被重用了,也一定程度場(chǎng)避免了擴(kuò)容,節(jié)省了空間。

那如何針對(duì)上面的輸入寫(xiě)B(tài)yteBuf的解碼呢?
先看看netty自帶的解碼器怎么解決這個(gè)問(wèn)題,其中LengthFieldBasedFrameDecoder就是用來(lái)解決這一類(lèi)的問(wèn)題的。在李林峰的文章中有詳細(xì)介紹,這里就不贅述了。
我在之前代碼的基礎(chǔ)上添加了兩行代碼。

//在服務(wù)器的pipeline中添加的這個(gè)解碼器,然后用4個(gè)字節(jié)表示整個(gè)包的長(zhǎng)度,并且廢棄掉這四個(gè)字節(jié)。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));

//在發(fā)送的byteBuf頭部添加真?zhèn)€包的長(zhǎng)度
byteBuf.writeInt(4+ name.getBytes().length +4+ bytes.length);

然后我再在ChannelRead中處理剩下的數(shù)據(jù)
packet = |文件名長(zhǎng)度|文件名|文件字節(jié)長(zhǎng)度|文件字節(jié)流|

       if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            int nameSize = byteBuf.readInt();
            String name = new String(byteBuf.readBytes(nameSize).array(), "UTF-8");
            int fileSize = byteBuf.readInt();
            FileOutputStream fileOutputStream = new FileOutputStream(new File(name));
            fileOutputStream.write(byteBuf.readBytes(fileSize).array());
            System.out.println(name + " " + fileSize);
        }

問(wèn)題解決,但是自己如何實(shí)現(xiàn)這個(gè)解碼器呢?先看看netty怎么實(shí)現(xiàn)的。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;

            failIfNecessary(false);
        }

        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }

        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }

        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);

        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

好長(zhǎng)。。里面對(duì)于不合理的協(xié)議做了很多假設(shè),并使不合理的輸入快速失敗。但是讓我一個(gè)初學(xué)者寫(xiě)還是寫(xiě)不出來(lái)。所以我假設(shè)協(xié)議就是我設(shè)計(jì)的那樣,簡(jiǎn)化這部分代碼,便于理解。
變量給一個(gè)固定值

    private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    private int maxFrameLength = 1024*10;
    private int lengthFieldLength = 4;
    private int initialBytesToStrip = 0;
    private long tooLongFrameLength;
    private long bytesToDiscard;
    private boolean failFast = true;

然后寫(xiě)decode函數(shù),就這么簡(jiǎn)單。。

 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        int frameLength = (int) in.getUnsignedInt(0);//獲取頭部
        if(in.readableBytes() < frameLength)//當(dāng)ByteBuf沒(méi)有達(dá)到長(zhǎng)度時(shí),return null
        {
            return null;
        }
        in.skipBytes(4);//舍棄頭部
        int index =  in.readerIndex();
        ByteBuf frame = in.slice(index, frameLength).retain();//取出自己定義的packet包返回給ChannelRead

        in.readerIndex(frameLength);//這一步一定要有,不然其實(shí)bytebuf的readerIndex沒(méi)有變,netty會(huì)一直從這里開(kāi)始讀取,將readerIndex移動(dòng)就相當(dāng)于把前面的數(shù)據(jù)處理過(guò)了廢棄掉了。
        return  frame;
    }

所以其實(shí)我們只要不處理bytebuf的數(shù)據(jù)知道可以讀的數(shù)據(jù)達(dá)到我們需要的長(zhǎng)度在處理就可以了。當(dāng)然包的順序不會(huì)出錯(cuò)是由底層tcp保證的,不用關(guān)心。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 6,219評(píng)論 0 13
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說(shuō)閱讀 12,420評(píng)論 6 13
  • 前言 問(wèn)題 現(xiàn)如今我們使用通用的應(yīng)用程序或者類(lèi)庫(kù)來(lái)實(shí)現(xiàn)系統(tǒng)之間地互相訪問(wèn)。例如,我們經(jīng)常使用一個(gè)HTTP客戶端來(lái)從...
    Kohler閱讀 834評(píng)論 0 2
  • 文/鄉(xiāng)土依舊 都知道冬吃蘿卜對(duì)健康有益,所以當(dāng)?shù)厝藗兞?xí)慣種植晚秋季蘿卜,可趕在大地封凍之前收獲。 一來(lái)供自家食用,...
    小小有夢(mèng)閱讀 297評(píng)論 0 2

友情鏈接更多精彩內(nèi)容