有時(shí)候簡(jiǎn)化實(shí)現(xiàn)別人的代碼,有助于你更好的理解代碼,不要一味地讀源代碼。
問(wèn)題來(lái)源
客戶端往服務(wù)器發(fā)送小文件
解決思路
1、使用netty(廢話)
2、只是用ByteBuf
3、自定義一種協(xié)議,用最小的網(wǎng)絡(luò)代價(jià)完成數(shù)據(jù)傳送
實(shí)現(xiàn)
其實(shí)netty有很多的定義好的協(xié)議來(lái)解決各種各樣的問(wèn)題,這篇文章來(lái)自《netty權(quán)威指南》作者李林峰,詳細(xì)介紹了netty的編解碼框架,以及一些常用的編解碼協(xié)議。
在解決這個(gè)問(wèn)題的時(shí)候,我遇到的一個(gè)主要問(wèn)題就是我在客戶端發(fā)送一個(gè)數(shù)據(jù)包,這個(gè)數(shù)據(jù)包的大小可以很大,但是如果只用簡(jiǎn)單的channelRead去讀取數(shù)據(jù)的話得到的數(shù)據(jù)并不是完整的。具體原因參考netty用戶指南中的tcp stream-based傳輸?shù)膯?wèn)題。
我先做了一個(gè)簡(jiǎn)單的協(xié)議設(shè)計(jì):
packet = |文件名長(zhǎng)度|文件名|文件字節(jié)長(zhǎng)度|文件字節(jié)流|
于是就有了客戶端發(fā)送的簡(jiǎn)單代碼
String name = "diagram.png";
FileInputStream fileInputStream = new FileInputStream(new File("src/main/resources/diagram.png"));
byte[] bytes = new byte[fileInputStream.available()];
fileInputStream.read(bytes);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt("diagram.png".getBytes().length);
byteBuf.writeBytes("diagram.png".getBytes());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
channelFuture.channel().writeAndFlush(byteBuf);
這樣發(fā)送沒(méi)有問(wèn)題,因?yàn)閎yteBuf是動(dòng)態(tài)擴(kuò)展的。但是接受的時(shí)候就有問(wèn)題了。如果我們接受比較小的,比如一個(gè)int,我們可以直接這樣寫(xiě)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf)msg;
if(byteBuf.readableBytes() > 4)
{
int result = byteBuf.readInt();
}
}
}
但是當(dāng)長(zhǎng)度很大的時(shí)候,我們就需要解決讀半包的問(wèn)題了。直到讀到完整的數(shù)據(jù)才進(jìn)行處理。但是每次收到的數(shù)據(jù)怎么去判斷是不是和上一個(gè)數(shù)據(jù)是連續(xù)的,如何在沒(méi)有收集到完整數(shù)據(jù)時(shí)不處理數(shù)據(jù)而繼續(xù)接受呢?這是我一直困擾的問(wèn)題。因?yàn)槲野衙恳粋€(gè)byteBuf當(dāng)成一個(gè)message來(lái)想了,其實(shí)不是的,ByteBuf中有兩個(gè)指針readIndex和writeIndex,readIndex永遠(yuǎn)小于writeIndex。大概如下圖所示

在netty的設(shè)計(jì)中ByteBuf是可以被重用的,所以可能針對(duì)這一個(gè)ChannelRead一直讀取的是同一個(gè)ByteBuf。這其中readrIndex之前的是已經(jīng)讀取過(guò)的,就是已經(jīng)被調(diào)用readXXX()之后的數(shù)據(jù),可以重新去讀取,readerIndex和writerIndex之前的是當(dāng)前的readableBytes,writerIndex到capacity的是writeableBytes,當(dāng)writerIndex超過(guò)capacity時(shí)就會(huì)擴(kuò)展。同時(shí)為了重用這部分空間,當(dāng)調(diào)用discardBytes時(shí),會(huì)把readerIndex和writerIndex拷貝到開(kāi)頭,這樣前面廢棄的部分就被重用了,也一定程度場(chǎng)避免了擴(kuò)容,節(jié)省了空間。
那如何針對(duì)上面的輸入寫(xiě)B(tài)yteBuf的解碼呢?
先看看netty自帶的解碼器怎么解決這個(gè)問(wèn)題,其中LengthFieldBasedFrameDecoder就是用來(lái)解決這一類(lèi)的問(wèn)題的。在李林峰的文章中有詳細(xì)介紹,這里就不贅述了。
我在之前代碼的基礎(chǔ)上添加了兩行代碼。
//在服務(wù)器的pipeline中添加的這個(gè)解碼器,然后用4個(gè)字節(jié)表示整個(gè)包的長(zhǎng)度,并且廢棄掉這四個(gè)字節(jié)。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));
//在發(fā)送的byteBuf頭部添加真?zhèn)€包的長(zhǎng)度
byteBuf.writeInt(4+ name.getBytes().length +4+ bytes.length);
然后我再在ChannelRead中處理剩下的數(shù)據(jù)
packet = |文件名長(zhǎng)度|文件名|文件字節(jié)長(zhǎng)度|文件字節(jié)流|
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf)msg;
int nameSize = byteBuf.readInt();
String name = new String(byteBuf.readBytes(nameSize).array(), "UTF-8");
int fileSize = byteBuf.readInt();
FileOutputStream fileOutputStream = new FileOutputStream(new File(name));
fileOutputStream.write(byteBuf.readBytes(fileSize).array());
System.out.println(name + " " + fileSize);
}
問(wèn)題解決,但是自己如何實(shí)現(xiàn)這個(gè)解碼器呢?先看看netty怎么實(shí)現(xiàn)的。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
好長(zhǎng)。。里面對(duì)于不合理的協(xié)議做了很多假設(shè),并使不合理的輸入快速失敗。但是讓我一個(gè)初學(xué)者寫(xiě)還是寫(xiě)不出來(lái)。所以我假設(shè)協(xié)議就是我設(shè)計(jì)的那樣,簡(jiǎn)化這部分代碼,便于理解。
變量給一個(gè)固定值
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
private int maxFrameLength = 1024*10;
private int lengthFieldLength = 4;
private int initialBytesToStrip = 0;
private long tooLongFrameLength;
private long bytesToDiscard;
private boolean failFast = true;
然后寫(xiě)decode函數(shù),就這么簡(jiǎn)單。。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
int frameLength = (int) in.getUnsignedInt(0);//獲取頭部
if(in.readableBytes() < frameLength)//當(dāng)ByteBuf沒(méi)有達(dá)到長(zhǎng)度時(shí),return null
{
return null;
}
in.skipBytes(4);//舍棄頭部
int index = in.readerIndex();
ByteBuf frame = in.slice(index, frameLength).retain();//取出自己定義的packet包返回給ChannelRead
in.readerIndex(frameLength);//這一步一定要有,不然其實(shí)bytebuf的readerIndex沒(méi)有變,netty會(huì)一直從這里開(kāi)始讀取,將readerIndex移動(dòng)就相當(dāng)于把前面的數(shù)據(jù)處理過(guò)了廢棄掉了。
return frame;
}
所以其實(shí)我們只要不處理bytebuf的數(shù)據(jù)知道可以讀的數(shù)據(jù)達(dá)到我們需要的長(zhǎng)度在處理就可以了。當(dāng)然包的順序不會(huì)出錯(cuò)是由底層tcp保證的,不用關(guān)心。