附上netty 5 用戶指南地址http://ifeve.com/netty5-user-guide/
流數據的傳輸處理
一個小的Socket Buffer問題
在基于流的傳輸里比如TCP/IP,接收到的數據會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數據包隊列,而是一個字節(jié)隊列。即使你發(fā)送了2個獨立的數據包,操作系統(tǒng)也不會作為2個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠程寫入的數據就會準確地讀取。舉個例子,讓我們假設操作系統(tǒng)的TCP/TP協(xié)議棧已經接收了3個數據包:
netty5_1.png
由于基于流傳輸的協(xié)議的這種普通的性質,在你的應用程序里讀取數據的時候會有很高的可能性被分成下面的片段。

netty5_2.png
因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意思并且能夠讓程序的業(yè)務邏輯更好理解的數據。在上面的例子中,接收到的數據應該被構造成下面的格式:

netty5_3.png
問題總結一下就是客戶端連續(xù)發(fā)了三個包,服務端在收到的時候可能是不同片段大小收到的,如果在每一次收到一片時便處理數據的話,很有可能數據是不完整的。(我的猜測是,TCP底層的窗口機制引起的,窗口的挪動會讓已經完成的數據向上傳送,而沒有收到還會等待。這也是保證順序不錯亂的機制,所以我在一開始考慮會不會有順序錯亂問題,這個應該是在底層已經解決掉了)
Netty提供的解決方案總結
第一種:緩存收到的字節(jié),當緩存的字節(jié)數目到達要求時,讀取buff完成業(yè)務邏輯
(1)在handler生命周期手動緩存
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)在生命周期中,聲明一個4字節(jié)的緩存大小
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)在生命周期結束的時候釋放掉
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)將每次收到的字節(jié)流寫進緩存里
m.release();
if (buf.readableBytes() >= 4) { // (3)當緩存大小達到4字節(jié)時處理實際業(yè)務
long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
(2)利用netty的解碼函數來控制字節(jié)
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)extends ByteToMessageDecoder類
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)回調函數將字節(jié)流寫入in中來積累緩存
if (in.readableBytes() < 4) { // (3)判斷in的大小
return;
}
out.add(in.readBytes(4)); // (4)將4個字節(jié)輸出到out中
}
}
第二種:利用POJO(Plain Ordinary Java Object)來代替ByteBuf
構造新類型(感覺和javabean一樣)
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final int value;
public UnixTime() {
this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
}
public UnixTime(int value) {
this.value = value;
}
public int value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
在decode回調函數中,將bytebuf組裝成unixTime對象
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readInt()));
}
總結:關于bytebuf大小的約定,我覺得在發(fā)送的時候在首部加上包頭,即用1字節(jié)來表示整個包的大小,這樣就可以控制可變的緩存大小了。雖然增加了數據量但是更適應于可變的環(huán)境。同樣在使用POJO時,加上類型標識符,也能擁有更加廣泛的可行性。