SOCKET編程關于stream-based的一些思考

附上netty 5 用戶指南地址http://ifeve.com/netty5-user-guide/

流數據的傳輸處理

一個小的Socket Buffer問題

在基于流的傳輸里比如TCP/IP,接收到的數據會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數據包隊列,而是一個字節(jié)隊列。即使你發(fā)送了2個獨立的數據包,操作系統(tǒng)也不會作為2個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠程寫入的數據就會準確地讀取。舉個例子,讓我們假設操作系統(tǒng)的TCP/TP協(xié)議棧已經接收了3個數據包:
netty5_1.png

由于基于流傳輸的協(xié)議的這種普通的性質,在你的應用程序里讀取數據的時候會有很高的可能性被分成下面的片段。

netty5_2.png

因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意思并且能夠讓程序的業(yè)務邏輯更好理解的數據。在上面的例子中,接收到的數據應該被構造成下面的格式:

netty5_3.png

問題總結一下就是客戶端連續(xù)發(fā)了三個包,服務端在收到的時候可能是不同片段大小收到的,如果在每一次收到一片時便處理數據的話,很有可能數據是不完整的。(我的猜測是,TCP底層的窗口機制引起的,窗口的挪動會讓已經完成的數據向上傳送,而沒有收到還會等待。這也是保證順序不錯亂的機制,所以我在一開始考慮會不會有順序錯亂問題,這個應該是在底層已經解決掉了)

Netty提供的解決方案總結

第一種:緩存收到的字節(jié),當緩存的字節(jié)數目到達要求時,讀取buff完成業(yè)務邏輯

(1)在handler生命周期手動緩存
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)在生命周期中,聲明一個4字節(jié)的緩存大小
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)在生命周期結束的時候釋放掉
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)將每次收到的字節(jié)流寫進緩存里
        m.release();

        if (buf.readableBytes() >= 4) { // (3)當緩存大小達到4字節(jié)時處理實際業(yè)務
            long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(2)利用netty的解碼函數來控制字節(jié)

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)extends ByteToMessageDecoder類
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)回調函數將字節(jié)流寫入in中來積累緩存
        if (in.readableBytes() < 4) { // (3)判斷in的大小
            return;
        }

        out.add(in.readBytes(4)); // (4)將4個字節(jié)輸出到out中
    }
}

第二種:利用POJO(Plain Ordinary Java Object)來代替ByteBuf

構造新類型(感覺和javabean一樣)
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final int value;

    public UnixTime() {
        this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    }

    public UnixTime(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}
在decode回調函數中,將bytebuf組裝成unixTime對象
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readInt()));
}

總結:關于bytebuf大小的約定,我覺得在發(fā)送的時候在首部加上包頭,即用1字節(jié)來表示整個包的大小,這樣就可以控制可變的緩存大小了。雖然增加了數據量但是更適應于可變的環(huán)境。同樣在使用POJO時,加上類型標識符,也能擁有更加廣泛的可行性。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現,斷路器,智...
    卡卡羅2017閱讀 136,569評論 19 139
  • 個人認為,Goodboy1881先生的TCP /IP 協(xié)議詳解學習博客系列博客是一部非常精彩的學習筆記,這雖然只是...
    貳零壹柒_fc10閱讀 5,195評論 0 8
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 6,221評論 0 13
  • 前言 問題 現如今我們使用通用的應用程序或者類庫來實現系統(tǒng)之間地互相訪問。例如,我們經常使用一個HTTP客戶端來從...
    Kohler閱讀 836評論 0 2
  • 簡介 用簡單的話來定義tcpdump,就是:dump the traffic on a network,根據使用者...
    保川閱讀 6,082評論 1 13

友情鏈接更多精彩內容