Websocket協(xié)議原理與實(shí)現(xiàn)(二)

數(shù)據(jù)的封裝與傳輸

上一篇文章講到Websocket握手協(xié)議的處理,現(xiàn)在開(kāi)始說(shuō)數(shù)據(jù)的傳輸。Websocket數(shù)據(jù)幀的封裝和傳輸其實(shí)和處理握手請(qǐng)求的流程差不太多,都需要通過(guò)bytebuffer寫(xiě)入Socket的輸出流或者從輸入流讀取。這里我們從解析數(shù)據(jù)幀開(kāi)始,知道如何解析數(shù)據(jù)幀,封裝也就不成話下。

我們來(lái)重新復(fù)習(xí)一下Websocket的數(shù)據(jù)傳輸協(xié)議

  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-------+-+-------------+-------------------------------+
 |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
 |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
 |N|V|V|V|       |S|             |   (if payload len==126/127)   |
 | |1|2|3|       |K|             |                               |
 +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
 |     Extended payload length continued, if payload len == 127  |
 + - - - - - - - - - - - - - - - +-------------------------------+
 |                               |Masking-key, if MASK set to 1  |
 +-------------------------------+-------------------------------+
 | Masking-key (continued)       |          Payload Data         |
 +-------------------------------- - - - - - - - - - - - - - - - +
 :                     Payload Data continued ...                :
 + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
 |                     Payload Data continued ...                |
 +---------------------------------------------------------------+

具體每一bit的意思
FIN      1bit 表示信息的最后一幀
RSV 1-3  1bit each 以后備用的 默認(rèn)都為 0
Opcode   4bit 幀類(lèi)型,稍后細(xì)說(shuō)
Mask     1bit 掩碼,是否加密數(shù)據(jù),默認(rèn)必須置為1
Payload  7bit 數(shù)據(jù)的長(zhǎng)度
Masking-key      1 or 4 bit 掩碼
Payload data     (x + y) bytes 數(shù)據(jù)
Extension data   x bytes  擴(kuò)展數(shù)據(jù)
Application data y bytes  程序數(shù)據(jù)

其中較為重要的是Opcode字段,這個(gè)字段表示幀的類(lèi)型,例如這個(gè)傳輸?shù)膸俏谋绢?lèi)型還是二進(jìn)制類(lèi)型,二進(jìn)制類(lèi)型傳輸?shù)臄?shù)據(jù)可以是圖片或者語(yǔ)音之類(lèi)的。

OPCODE:4位
解釋PayloadData,如果接收到未知的opcode,接收端必須關(guān)閉連接。
0x0表示附加數(shù)據(jù)幀
0x1表示文本數(shù)據(jù)幀
0x2表示二進(jìn)制數(shù)據(jù)幀
0x3-7暫時(shí)無(wú)定義,為以后的非控制幀保留
0x8表示連接關(guān)閉
0x9表示ping
0xA表示pong
0xB-F暫時(shí)無(wú)定義,為以后的控制幀保留

當(dāng)我們解析完數(shù)據(jù)幀后,需要根據(jù)Opcode字段的類(lèi)型進(jìn)行對(duì)消息的不同回調(diào)處理,幀數(shù)據(jù)可以如下定義,Opcode通過(guò)枚舉定義出來(lái)

public abstract interface Framedata
{
  public abstract boolean isFin();

  public abstract boolean getTransfereMasked();

  public abstract Opcode getOpcode();

  public abstract ByteBuffer getPayloadData();

  public abstract void append(Framedata paramFramedata)
throws InvalidFrameException;

  public static enum Opcode
  {
    CONTINUOUS, TEXT, BINARY, PING, PONG, CLOSING;
  }
}

解析首先得判斷Fin字段和Opcode字段。這兩個(gè)字段跟一個(gè)消息分片(Fragment)的概念有關(guān),一般的已知長(zhǎng)度的消息,F(xiàn)in值為1,表示結(jié)束,Opcode值不能為0,可以看一下上面代碼的枚舉類(lèi)型,0代表CONTINUOUS,就是說(shuō)有連續(xù)的數(shù)據(jù)幀會(huì)發(fā)送過(guò)來(lái)。

而某些未知長(zhǎng)度的消息,則需要把消息分片發(fā)送。根據(jù)我的理解,實(shí)時(shí)的語(yǔ)音聊天就屬于這種情形。這時(shí)候前面幀的Fin值為0,Opcode值為0,最后的結(jié)束幀F(xiàn)in值為1,Opcode不為0。

首先應(yīng)該是讀取二進(jìn)制流解析數(shù)據(jù),從bytebuffer中取出數(shù)據(jù),按byte來(lái)讀取,一個(gè)byte有8個(gè)bit,數(shù)據(jù)幀是按bit來(lái)定義的,我們還得從byte中解析出具體Websocket協(xié)議中的每一個(gè)幀。

下面是讀取二進(jìn)制流的代碼,值得關(guān)注的是通過(guò)位與運(yùn)算獲取具體每一個(gè)bit的數(shù)據(jù),另外比較麻煩的是playload的處理,以及還需要根據(jù)Mask掩碼來(lái)解密數(shù)據(jù),傳輸協(xié)議里面有一位Mask,代表是否加密數(shù)據(jù),默認(rèn)設(shè)置為1。這里就不細(xì)說(shuō)了

public Framedata translateSingleFrame(ByteBuffer buffer) throws Draft_10.IncompleteException, InvalidDataException {
    int maxpacketsize = buffer.remaining();
    int realpacketsize = 2;
    if (maxpacketsize < realpacketsize)
        throw new IncompleteException(realpacketsize);
    byte b1 = buffer.get();
    boolean FIN = b1 >> 8 != 0;
    byte rsv = (byte) ((b1 & 0x7F) >> 4);
    if (rsv != 0)
        throw new InvalidFrameException("bad rsv " + rsv);
    byte b2 = buffer.get();
    boolean MASK = (b2 & 0xFFFFFF80) != 0;
    int payloadlength = (byte) (b2 & 0x7F);
    Framedata.Opcode optcode = toOpcode((byte) (b1 & 0xF));

    if ((!FIN) && (
            (optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING))) {
        throw new InvalidFrameException("control frames may no be fragmented");
    }

    if ((payloadlength < 0) || (payloadlength > 125)) {
        if ((optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING)) {
            throw new InvalidFrameException("more than 125 octets");
        }
        if (payloadlength == 126) {
            realpacketsize += 2;
            if (maxpacketsize < realpacketsize)
                throw new IncompleteException(realpacketsize);
            byte[] sizebytes = new byte[3];
            sizebytes[1] = buffer.get();
            sizebytes[2] = buffer.get();
            payloadlength = new BigInteger(sizebytes).intValue();
        } else {
            realpacketsize += 8;
            if (maxpacketsize < realpacketsize)
                throw new IncompleteException(realpacketsize);
            byte[] bytes = new byte[8];
            for (int i = 0; i < 8; i++) {
                bytes[i] = buffer.get();
            }
            long length = new BigInteger(bytes).longValue();
            if (length > 2147483647L) {
                throw new LimitExedeedException("Payloadsize is to big...");
            }
            payloadlength = (int) length;
        }

    }

    realpacketsize += (MASK ? 4 : 0);

    realpacketsize += payloadlength;

    if (maxpacketsize < realpacketsize) {
        throw new IncompleteException(realpacketsize);
    }
    ByteBuffer payload = ByteBuffer.allocate(checkAlloc(payloadlength));
    if (MASK) {
        byte[] maskskey = new byte[4];
        buffer.get(maskskey);
        for (int i = 0; i < payloadlength; i++)
            payload.put((byte) (buffer.get() ^ maskskey[(i % 4)]));
    } else {
        payload.put(buffer.array(), buffer.position(), payload.limit());
        buffer.position(buffer.position() + payload.limit());
    }
    FrameBuilder frame;
    FrameBuilder frame;
    if (optcode == Framedata.Opcode.CLOSING) {
        frame = new CloseFrameBuilder();
    } else {
        frame = new FramedataImpl1();
        frame.setFin(FIN);
        frame.setOptcode(optcode);
    }
    payload.flip();
    frame.setPayload(payload);
    return frame;
}

讀取完后就是根據(jù)這個(gè)數(shù)據(jù)幀的類(lèi)型來(lái)進(jìn)行不同的回調(diào)

private void decodeFrames(ByteBuffer socketBuffer) {
    try {
        List frames = this.draft.translateFrame(socketBuffer);//讀取二進(jìn)制流
        for (Framedata f : frames) {
            if (DEBUG)
                System.out.println("matched frame: " + f);
            Framedata.Opcode curop = f.getOpcode();
            boolean fin = f.isFin();

            if (curop == Framedata.Opcode.CLOSING) {  //關(guān)閉幀
                int code = 1005;
                String reason = "";
                if ((f instanceof CloseFrame)) {
                    CloseFrame cf = (CloseFrame) f;
                    code = cf.getCloseCode();
                    reason = cf.getMessage();
                }
                if (this.readystate == WebSocket.READYSTATE.CLOSING) {
                    closeConnection(code, reason, true);
                } else if (this.draft.getCloseHandshakeType() == Draft.CloseHandshakeType.TWOWAY)
                    close(code, reason, true);
                else {
                    flushAndClose(code, reason, false);
                }
            } else if (curop == Framedata.Opcode.PING) {  //Ping
                this.wsl.onWebsocketPing(this, f);
            } else if (curop == Framedata.Opcode.PONG) {    //Pong
                this.wsl.onWebsocketPong(this, f);
            } else if ((!fin) || (curop == Framedata.Opcode.CONTINUOUS)) {    //分片消息
                if (curop != Framedata.Opcode.CONTINUOUS) {
                    if (this.current_continuous_frame_opcode != null)
                        throw new InvalidDataException(1002, "Previous continuous frame sequence not completed.");
                    this.current_continuous_frame_opcode = curop;
                } else if (fin) {
                    if (this.current_continuous_frame_opcode == null)
                        throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
                    this.current_continuous_frame_opcode = null;
                } else if (this.current_continuous_frame_opcode == null) {
                    throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
                }
                try {
                    this.wsl.onWebsocketMessageFragment(this, f);
                } catch (RuntimeException e) {
                    this.wsl.onWebsocketError(this, e);
                }
            } else {  //普通消息
                if (this.current_continuous_frame_opcode != null)
                    throw new InvalidDataException(1002, "Continuous frame sequence not completed.");
                if (curop == Framedata.Opcode.TEXT)   //文本消息
                    try {
                        this.wsl.onWebsocketMessage(this, Charsetfunctions.stringUtf8(f.getPayloadData()));
                    } catch (RuntimeException e) {
                        this.wsl.onWebsocketError(this, e);
                    }
                else if (curop == Framedata.Opcode.BINARY)    //二進(jìn)制消息
                    try {
                        this.wsl.onWebsocketMessage(this, f.getPayloadData());
                    } catch (RuntimeException e) {
                        this.wsl.onWebsocketError(this, e);
                    }
                else
                    throw new InvalidDataException(1002, "non control or continious frame expected");
            }
        }
    } catch (InvalidDataException e1) {
        this.wsl.onWebsocketError(this, e1);
        close(e1);
        return;
    }
}

現(xiàn)在說(shuō)一下控制幀的處理,WebSocket控制幀有3種:Close(關(guān)閉幀)、Ping以及Pong。Close關(guān)閉幀很容易理解,客戶端如果接受到了就關(guān)閉連接,客戶端也可以發(fā)送關(guān)閉幀給服務(wù)端。Ping和Pong是websocket里的心跳,用來(lái)保證客戶端是在線的,一般來(lái)說(shuō)只有服務(wù)端給客戶端發(fā)送Ping,然后客戶端發(fā)送Pong來(lái)回應(yīng),表明自己仍然在線。

我們來(lái)看一下Nathan RajlichJava-Websocket代碼,客戶端對(duì)Ping的處理很簡(jiǎn)單,把收到的Ping幀改一下Opcode的類(lèi)型,就可以發(fā)送回給服務(wù)端了。可以看到客戶端處理服務(wù)端發(fā)送的Pong回調(diào)的方法是空的。

public void onWebsocketPing(WebSocket conn, Framedata f) {
    FramedataImpl1 resp = new FramedataImpl1(f);
    resp.setOptcode(Framedata.Opcode.PONG);
    conn.sendFrame(resp);
}

public void onWebsocketPong(WebSocket conn, Framedata f) {
}

這篇暫時(shí)寫(xiě)到這里好了,下一篇寫(xiě)寫(xiě)Websocket Client和Websocket Server的實(shí)現(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 上篇介紹了HTTP1.1協(xié)議的基本內(nèi)容,這篇文章將繼續(xù)分析WebSocket協(xié)議,然后對(duì)這兩個(gè)進(jìn)行簡(jiǎn)單的比較。 W...
    TheAlchemist閱讀 36,900評(píng)論 15 113
  • 前言 本文為初入研究 Websocket協(xié)議,對(duì)于真正應(yīng)用中,各種語(yǔ)言都有實(shí)現(xiàn)庫(kù),建議采用庫(kù),而不是自己實(shí)現(xiàn),本文...
    MeIsLZHua閱讀 8,665評(píng)論 6 39
  • 明天和意外, 也許真的不知道誰(shuí)會(huì)先來(lái), 有些事, 如果現(xiàn)在不做, 也許就再也不會(huì)去做了。 有沒(méi)有人和我徒步直到泰山...
    璟遷閱讀 483評(píng)論 0 49
  • 一撇一捺是個(gè)人,人字好寫(xiě)人難做。一捺一撇,寫(xiě)錯(cuò)了方向變成X,錯(cuò)敗一生。 簡(jiǎn)簡(jiǎn)單單的兩個(gè)筆畫(huà),寫(xiě)起來(lái)容易做起來(lái)卻千辛...
    馨蘭若雪閱讀 1,366評(píng)論 0 8
  • 前幾天在群里聊天,看到一個(gè)不經(jīng)常冒泡的M姐突然發(fā)言,她準(zhǔn)備回國(guó)離婚。原因是因?yàn)榧彝ケ┝Α?起初大家以為她是開(kāi)玩笑,...
    狗蛋先生閱讀 1,230評(píng)論 0 1

友情鏈接更多精彩內(nèi)容