KCP源碼分析

前 言

當(dāng)游戲?qū)崟r性,打擊感要求比較高,或者需要控制大量游戲單位時,往往會使用幀同步。幀同步則需要可靠的UDP協(xié)議,一般會選擇KCP。有關(guān)KCP的優(yōu)點和特性不再贅述了,網(wǎng)上包括官網(wǎng)有關(guān)于KCP的介紹。本文主要分析KCP的源碼以及實現(xiàn)。

源 碼

一、雙向鏈表 IQUEUEHEAD
KCP中設(shè)計的鏈表是一個環(huán)狀雙向鏈表,結(jié)構(gòu)體只有兩個指針,源碼以及操作接口如下:

struct IQUEUEHEAD {
    struct IQUEUEHEAD *next, *prev;
};

// 向后指針和向前指針均指向自己, 形成環(huán)狀雙向鏈表
#define IQUEUE_INIT(ptr) ( \
    (ptr)->next = (ptr), (ptr)->prev = (ptr))

// MEMBER在TYPE中的偏移量
#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)

// ptr所在type類型對象的首地址
#define ICONTAINEROF(ptr, type, member) ( \
        (type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )

#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)

// node插入到head后面, head->node
#define IQUEUE_ADD(node, head) ( \
    (node)->prev = (head), (node)->next = (head)->next, \
    (head)->next->prev = (node), (head)->next = (node))

// node插入到head前面, node->head
#define IQUEUE_ADD_TAIL(node, head) ( \
    (node)->prev = (head)->prev, (node)->next = (head), \
    (head)->prev->next = (node), (head)->prev = (node))

// 刪除p, n之間的節(jié)點
#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n))

// 刪除entry, prev->entry->next  ====>>>>  prev->next
#define IQUEUE_DEL(entry) (\
    (entry)->next->prev = (entry)->prev, \
    (entry)->prev->next = (entry)->next, \
    (entry)->next = 0, (entry)->prev = 0)

#define IQUEUE_DEL_INIT(entry) do { \
    IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0)

// 鏈表是否為空
#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next)

鏈表結(jié)構(gòu)本身并沒有定義任何value字段,而是讓持有IQUEUEHEAD的結(jié)構(gòu)體自己去定義。當(dāng)持有了IQUEUEHEAD,該結(jié)構(gòu)體就可以通過IQUEUEHEAD將自己構(gòu)建成一個環(huán)狀雙向鏈表,如圖:

持有IQUEUEHEAD的結(jié)構(gòu)體

那如何通過IQUEUEHEAD來獲取到其所在的結(jié)構(gòu)體呢?需要看下IQUEUE_ENTRY宏的展開

(type*)( ((char*)((type*)ptr)) - ((size_t) &((type *)0)->member)) )

意思就是將IQUEUEHEAD指針ptr減去member成員在類型type中的偏移量,這樣就得到了所在類型type的首地址。

二、數(shù)據(jù)段結(jié)構(gòu) IKCPSEG
IKCPSEG是發(fā)送包的數(shù)據(jù)結(jié)構(gòu),其包含了兩部分信息:

  • 發(fā)送信息:conv、cmdfrg、wndts、sn、una、lendata,其中cmd包含四種類型
    • IKCP_CMD_PUSH:發(fā)送數(shù)據(jù)包
    • IKCP_CMD_ACK:返回確認包
    • IKCP_CMD_WASK:探測對端可用窗口大小
    • IKCP_CMD_WINS:告知對端自身可用窗口大小
  • 控制信息:其余的為控制信息,不會發(fā)送給對端。暫存于發(fā)送消息隊列中,用來判斷超時重發(fā)、快速重發(fā)。
/**
|<------------ 4 bytes ------------>|
+--------+--------+--------+--------+
|               conv                | conv: 會話序號
+--------+--------+--------+--------+ 
|  cmd   |  frg   |       wnd       | cmd:  指令標(biāo)識; frg: 分段序號; wnd: 接收窗口大小
+--------+--------+--------+--------+ 
|                ts                 | ts:   發(fā)送的時間戳
+--------+--------+--------+--------+
|                sn                 | sn:   段序號
+--------+--------+--------+--------+
|                una                | una:  未收到的序號
+--------+--------+--------+--------+
|                len                | len:  data數(shù)據(jù)的長度
+--------+--------+--------+--------+
**/
//=====================================================================
// SEGMENT
// kcp的數(shù)據(jù)段結(jié)構(gòu)
//=====================================================================
struct IKCPSEG
{
    struct IQUEUEHEAD node; // 雙向鏈表定義的隊列
    IUINT32 conv;           // conversation, 會話序號
    IUINT32 cmd;            // command, 指令類型
    IUINT32 frg;            // fragment, 分片序號
    IUINT32 wnd;            // window, 可用窗口大小
    IUINT32 ts;             // timestamp, 發(fā)送的時間戳
    IUINT32 sn;             // sequence number, 段序號
    IUINT32 una;            // unacknowledged, 當(dāng)前未收到的序號
    IUINT32 len;            // length, data數(shù)據(jù)的長度
    IUINT32 resendts;       // 超時重發(fā)的時間戳
    IUINT32 rto;            // 超時重傳的時間間隔
    IUINT32 fastack;        // ack跳過的次數(shù), 用于快速重傳
    IUINT32 xmit;           // 發(fā)送的次數(shù)
    char data[1];
};

三、數(shù)據(jù)塊 IKCPCB
IKCPCB用于存放KCP相關(guān)的配置、數(shù)據(jù)緩存、協(xié)議控制的數(shù)據(jù)結(jié)構(gòu),結(jié)構(gòu)如下:

//---------------------------------------------------------------------
// IKCPCB
// kcp的控制塊
//---------------------------------------------------------------------
struct IKCPCB
{
    // conv:會話ID, mtu:最大傳輸單元, mss:最大分片大小, state:連接狀態(tài)
    IUINT32 conv, mtu, mss, state;
    // sun_una:第一個未確認的包, sen_nxt:待發(fā)送包的序號, rcv_nxt:待接收消息的序號
    IUINT32 snd_una, snd_nxt, rcv_nxt;
    // ts_recent(未使用), ts_lastack(未使用), ssthresh:擁塞窗口的閾值
    IUINT32 ts_recent, ts_lastack, ssthresh;
    // rx_rttval: rtt的變化量, rx_srtt: rtt的平滑值(smoothed), 
    // rx_rto: 由ack接收延遲計算出來的重傳超時時間, rx_minrto: 最小重傳超時時間
    IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
    // sen_wnd: 發(fā)送窗口大小, rcv_wnd: 接收窗口大小, rmt_wnd: 遠端可用窗口大小, 
    // cwnd: 擁塞窗口大小, probe: 探查變量
    IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
    //currunt: 當(dāng)前時間戳, interval: 內(nèi)部flush刷新間隔, ts_flush: 下次刷新時間戳, xmit: 重發(fā)消息的次數(shù)
    IUINT32 current, interval, ts_flush, xmit;
    // nrcv_buf: snd_buf的長度, nsnd_buf: rcv_buf的長度
    IUINT32 nrcv_buf, nsnd_buf;
    // nrcv_que: rcv_queue的長度, nsnd_que: snd_queue的長度
    IUINT32 nrcv_que, nsnd_que;
    // nodelay: 是否啟動無延遲模式, update: 是否調(diào)用過update函數(shù)的標(biāo)識
    IUINT32 nodelay, updated;
    //ts_probe: 下次探測窗口的時間戳, probe_wait: 探測窗口等待的時間
    IUINT32 ts_probe, probe_wait;
    //dead_link: 最大重傳次數(shù), incr: 可發(fā)送的最大數(shù)據(jù)量
    IUINT32 dead_link, incr;
    struct IQUEUEHEAD snd_queue;    //發(fā)送消息的隊列
    struct IQUEUEHEAD rcv_queue;    //接收消息的隊列
    struct IQUEUEHEAD snd_buf;      //發(fā)送消息的緩存
    struct IQUEUEHEAD rcv_buf;      //接收消息的緩存
    IUINT32 *acklist;               //待發(fā)送的ack的列表, 其內(nèi)容是[sn1, ts1, sn2, ts2 …]的列表
    IUINT32 ackcount;               //當(dāng)前ack數(shù)量
    IUINT32 ackblock;               //acklist的大小
    void *user;                     //用戶指針
    char *buffer;                   //儲存消息字節(jié)流的內(nèi)存
    int fastresend;                 //觸發(fā)快速重傳的重復(fù)ack個數(shù)
    int fastlimit;                  //快速重傳次數(shù)上限
    int nocwnd, stream;             //nocwnd: 取消擁塞控制, stream: 是否采用流傳輸模式
    int logmask;                    //日志的類型
    int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); //發(fā)送消息的回調(diào)函數(shù)
    void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);       //寫日志的回調(diào)函數(shù)
};

四、KCP主要接口
這里主要看下四個核心接口:

// 發(fā)送消息,將本端數(shù)據(jù)寫入緩存
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);

// 刷新數(shù)據(jù),發(fā)送到對端
void ikcp_flush(ikcpcb *kcp);

// 輸入數(shù)據(jù),將對端數(shù)據(jù)寫入緩存
int ikcp_input(ikcpcb *kcp, const char *data, long size);

// 接收消息
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);

4.1 ikcp_send
將發(fā)送數(shù)據(jù)根據(jù)mss進行分片,分片為KCP的數(shù)據(jù)格式,按序插入到發(fā)送隊列中。分片默認構(gòu)建新的KCP包并添加到隊列,如果使用流模式分片,將判斷發(fā)送隊列的最后一個分片是否填滿mss,若沒有,則用新的數(shù)據(jù)填充分片后再分片。

int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;

    // mss = mtu - 24(IKCP_OVERHEAD KCP報文頭部大小)
    assert(kcp->mss > 0);
    if (len < 0) return -1;

    // append to previous segment in streaming mode (if possible)
    // 使用流模式
    if (kcp->stream != 0) {
        if (!iqueue_is_empty(&kcp->snd_queue)) {
            // 判斷發(fā)送隊列的最后一個分片是否填滿mss
            IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
            if (old->len < kcp->mss) {
                // 用新的數(shù)據(jù)填充分片
                int capacity = kcp->mss - old->len; //分片剩余容量大小
                int extend = (len < capacity)? len : capacity; //填充大小
                seg = ikcp_segment_new(kcp, old->len + extend);
                assert(seg);
                if (seg == NULL) {
                    return -2;
                }
                iqueue_add_tail(&seg->node, &kcp->snd_queue);
                memcpy(seg->data, old->data, old->len);
                if (buffer) {
                    memcpy(seg->data + old->len, buffer, extend);
                    buffer += extend;
                }
                seg->len = old->len + extend;
                seg->frg = 0;
                len -= extend;
                iqueue_del_init(&old->node);
                ikcp_segment_delete(kcp, old);
            }
        }
        // 沒有剩余數(shù)據(jù),返回成功
        if (len <= 0) {
            return 0;
        }
    }

    // 計算分片數(shù)量
    if (len <= (int)kcp->mss) count = 1;
    else count = (len + kcp->mss - 1) / kcp->mss;

    // 分片數(shù)量大于對端接收窗口大小
    if (count >= (int)IKCP_WND_RCV) return -2;

    if (count == 0) count = 1;

    // fragment
    // 將數(shù)據(jù)分片并添加到發(fā)送消息隊列
    for (i = 0; i < count; i++) {
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size);
        assert(seg);
        if (seg == NULL) {
            return -2;
        }
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        // 設(shè)置分片序號,frg從大到小
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);
        kcp->nsnd_que++;
        if (buffer) {
            buffer += size;
        }
        len -= size;
    }

    return 0;
}

4.2 ikcp_flush
將數(shù)據(jù)包通過用戶提供的發(fā)送函數(shù)發(fā)送到對端,所有的數(shù)據(jù)分片累計超過mtu后才會發(fā)送出去,同時根據(jù)這發(fā)送消息的情況,調(diào)整kcp配置。

void ikcp_flush(ikcpcb *kcp)
{
    IUINT32 current = kcp->current;
    char *buffer = kcp->buffer;
    char *ptr = buffer;
    int count, size, i;
    IUINT32 resent, cwnd;
    IUINT32 rtomin;
    struct IQUEUEHEAD *p;
    int change = 0;
    int lost = 0;
    // 此IKCPSEG被序列化多次,反復(fù)使用
    IKCPSEG seg;

    // 'ikcp_update' haven't been called. 
    if (kcp->updated == 0) return;

    // 序列化為發(fā)送確認包
    seg.conv = kcp->conv;
    seg.cmd = IKCP_CMD_ACK;
    seg.frg = 0;
    seg.wnd = ikcp_wnd_unused(kcp);
    seg.una = kcp->rcv_nxt;
    seg.len = 0;
    seg.sn = 0;
    seg.ts = 0;

    // flush acknowledges
    // 循環(huán)發(fā)送確認包
    count = kcp->ackcount;
    for (i = 0; i < count; i++) {
        size = (int)(ptr - buffer);
        // 超過mtu才發(fā)送,否則下文會不斷序列化seg到buffer中
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        // 根據(jù)待發(fā)送的確認包列表acklist,填充段序號和發(fā)送時間戳
        ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        // 序列化seg到buffer中
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->ackcount = 0;

    // probe window size (if remote window size equals zero)
    if (kcp->rmt_wnd == 0) {
        // 探測窗口等待時間為0
        if (kcp->probe_wait == 0) { 
            kcp->probe_wait = IKCP_PROBE_INIT; //重新賦值探測窗口等待時間,默認7秒
            kcp->ts_probe = kcp->current + kcp->probe_wait; //重新賦值下次探測窗口時間戳
        }   
        else {
            // 是否到達探測時間
            if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
                // 更新探測窗口等待時間和下次探測窗口時間戳
                // 設(shè)置探測變量為 IKCP_ASK_SEND,發(fā)送窗口探測
                if (kcp->probe_wait < IKCP_PROBE_INIT) 
                    kcp->probe_wait = IKCP_PROBE_INIT;
                kcp->probe_wait += kcp->probe_wait / 2;
                if (kcp->probe_wait > IKCP_PROBE_LIMIT)
                    kcp->probe_wait = IKCP_PROBE_LIMIT;
                kcp->ts_probe = kcp->current + kcp->probe_wait;
                kcp->probe |= IKCP_ASK_SEND;
            }
        }
    }   else {
        // 遠端窗口正常,不需要探測,不發(fā)送窗口探測
        kcp->ts_probe = 0;
        kcp->probe_wait = 0;
    }

    // flush window probing commands
    // 是否需要探測對端窗口
    if (kcp->probe & IKCP_ASK_SEND) {
        // 序列化對端窗口探測包
        seg.cmd = IKCP_CMD_WASK;
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    // flush window probing commands
    // 是否需要告知對端自身窗口大小
    if (kcp->probe & IKCP_ASK_TELL) {
        // 序列化告知對端自身窗口包
        seg.cmd = IKCP_CMD_WINS;
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->probe = 0;

    // calculate window size
    // 計算最小發(fā)送窗口,取自身窗口大小與對端窗口大小的最小值
    cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    // 如果沒有取消擁塞控制,取配置擁塞窗口、自身窗口和對端窗口三者最小值
    if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

    // move data from snd_queue to snd_buf
    // 將滿足窗口大小個數(shù)的數(shù)據(jù)分片,從kcp->snd_queue移動到kcp->snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        IKCPSEG *newseg;
        if (iqueue_is_empty(&kcp->snd_queue)) break;

        newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

        // 從snd_queue中刪除自身
        iqueue_del(&newseg->node);
        // 添加到snd_buf中,注意這里是插入到了snd_buf的前面
        iqueue_add_tail(&newseg->node, &kcp->snd_buf);
        kcp->nsnd_que--;
        kcp->nsnd_buf++;

        // 設(shè)置數(shù)據(jù)分片屬性
        newseg->conv = kcp->conv;
        newseg->cmd = IKCP_CMD_PUSH;
        newseg->wnd = seg.wnd;
        newseg->ts = current;
        newseg->sn = kcp->snd_nxt++;
        newseg->una = kcp->rcv_nxt;
        newseg->resendts = current;
        newseg->rto = kcp->rx_rto;
        newseg->fastack = 0;
        newseg->xmit = 0;
    }

    // calculate resent
    // 重新計算觸發(fā)快速重傳的重復(fù)ack個數(shù)
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    // 重新計算超時重傳時間,rx_rto的1/8
    rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

    // flush data segments
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        // 第一次發(fā)送
        if (segment->xmit == 0) {
            needsend = 1;
            segment->xmit++;
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        // 超過了重發(fā)時間
        else if (_itimediff(current, segment->resendts) >= 0) {
            needsend = 1;
            segment->xmit++;
            kcp->xmit++;
            if (kcp->nodelay == 0) {
                segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
            }   else {
                IINT32 step = (kcp->nodelay < 2)? 
                    ((IINT32)(segment->rto)) : kcp->rx_rto;
                segment->rto += step / 2;
            }
            segment->resendts = current + segment->rto;
            // 發(fā)生超時重發(fā)
            lost = 1;
        }
        // 跳過應(yīng)答的次數(shù)超過了配置
        else if (segment->fastack >= resent) {
            // 發(fā)送次數(shù) <= 快速重傳上限 || 未設(shè)置快速重發(fā)上限
            if ((int)segment->xmit <= kcp->fastlimit || 
                kcp->fastlimit <= 0) {
                needsend = 1;
                segment->xmit++;
                segment->fastack = 0;
                segment->resendts = current + segment->rto;
                // 發(fā)生快速重傳
                change++;
            }
        }

        // 滿足任意以上三個條件,則需要發(fā)送
        if (needsend) {
            int need;
            segment->ts = current;
            segment->wnd = seg.wnd;
            segment->una = kcp->rcv_nxt;

            size = (int)(ptr - buffer);
            need = IKCP_OVERHEAD + segment->len;

            if (size + need > (int)kcp->mtu) {
                ikcp_output(kcp, buffer, size);
                ptr = buffer;
            }

            ptr = ikcp_encode_seg(ptr, segment);

            // 把data數(shù)據(jù)寫入到kcp->buffer
            if (segment->len > 0) {
                memcpy(ptr, segment->data, segment->len);
                ptr += segment->len;
            }

            if (segment->xmit >= kcp->dead_link) {
                kcp->state = (IUINT32)-1;
            }
        }
    }

    // flash remain segments
    // 將剩余的buffer發(fā)送出去
    size = (int)(ptr - buffer);
    if (size > 0) {
        ikcp_output(kcp, buffer, size);
    }

    // update ssthresh
    // 根據(jù)發(fā)送消息情況,調(diào)整kcp配置
    // 是否發(fā)生快速重傳
    if (change) {
        IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
        kcp->ssthresh = inflight / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        kcp->cwnd = kcp->ssthresh + resent;
        kcp->incr = kcp->cwnd * kcp->mss;
    }

    // 是否發(fā)生超時重發(fā)
    if (lost) {
        kcp->ssthresh = cwnd / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

    if (kcp->cwnd < 1) {
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }
}

4.3 ikcp_input
輸入處理對端的消息數(shù)據(jù),根據(jù)不同的操作類型進行不同的處理,寫入緩存。計算并調(diào)整窗口。

int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    IUINT32 maxack = 0, latest_ts = 0;
    int flag = 0;

    if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
        ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
    }

    // 數(shù)據(jù)為空 || 消息大小小于KCP報文
    if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

    while (1) {
        IUINT32 ts, sn, len, una, conv;
        IUINT16 wnd;
        IUINT8 cmd, frg;
        IKCPSEG *seg;

        if (size < (int)IKCP_OVERHEAD) break;

        // 獲取KCP報文頭部信息
        data = ikcp_decode32u(data, &conv);
        if (conv != kcp->conv) return -1;

        data = ikcp_decode8u(data, &cmd);
        data = ikcp_decode8u(data, &frg);
        data = ikcp_decode16u(data, &wnd);
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);
        data = ikcp_decode32u(data, &una);
        data = ikcp_decode32u(data, &len);

        size -= IKCP_OVERHEAD;

        if ((long)size < (long)len || (int)len < 0) return -2;

        // 驗證指令類型
        if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
            return -3;

        // 更新對端接收窗口[1]
        kcp->rmt_wnd = wnd;
        // 確認(刪除)una之前的數(shù)據(jù)包
        ikcp_parse_una(kcp, una);
        // 更新snd_una(未確認包序號)
        ikcp_shrink_buf(kcp);

        if (cmd == IKCP_CMD_ACK) {
            if (_itimediff(kcp->current, ts) >= 0) {
                // 更新rx_rttval rx_srtt rx_rto
                ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
            }
            // 確認sn數(shù)據(jù)包
            ikcp_parse_ack(kcp, sn);
            // 更新snd_una
            ikcp_shrink_buf(kcp);
            // 記錄最后一個包段序號和時間戳
            if (flag == 0) {
                flag = 1;
                maxack = sn;
                latest_ts = ts;
            }   else {
                if (_itimediff(sn, maxack) > 0) {
                #ifndef IKCP_FASTACK_CONSERVE
                    maxack = sn;
                    latest_ts = ts;
                #else
                    if (_itimediff(ts, latest_ts) > 0) {
                        maxack = sn;
                        latest_ts = ts;
                    }
                #endif
                }
            }
            if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
                ikcp_log(kcp, IKCP_LOG_IN_ACK, 
                    "input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, 
                    (long)_itimediff(kcp->current, ts),
                    (long)kcp->rx_rto);
            }
        }
        else if (cmd == IKCP_CMD_PUSH) {
            if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
                ikcp_log(kcp, IKCP_LOG_IN_DATA, 
                    "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
            }
            if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
                // 添加sn確認包
                ikcp_ack_push(kcp, sn, ts);
                // 處理接收窗口范圍內(nèi)的數(shù)據(jù)包
                if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
                    seg = ikcp_segment_new(kcp, len);
                    seg->conv = conv;
                    seg->cmd = cmd;
                    seg->frg = frg;
                    seg->wnd = wnd;
                    seg->ts = ts;
                    seg->sn = sn;
                    seg->una = una;
                    seg->len = len;

                    if (len > 0) {
                        memcpy(seg->data, data, len);
                    }

                    // 處理消息數(shù)據(jù)
                    ikcp_parse_data(kcp, seg);
                }
            }
        }
        else if (cmd == IKCP_CMD_WASK) {
            // ready to send back IKCP_CMD_WINS in ikcp_flush
            // tell remote my window size
            // 更新探測變量
            // 在update時告知對端自身接收窗口的大小
            kcp->probe |= IKCP_ASK_TELL;
            if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
                ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
            }
        }
        else if (cmd == IKCP_CMD_WINS) {
            // do nothing
            // 在上文[1]已經(jīng)更新對端的接收窗口大小
            if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
                ikcp_log(kcp, IKCP_LOG_IN_WINS,
                    "input wins: %lu", (unsigned long)(wnd));
            }
        }
        else {
            return -3;
        }

        data += len;
        size -= len;
    }

    // 是否有處理確認包
    if (flag != 0) {
        // 遍歷發(fā)送緩沖,序號小于maxack消息的fastack++,以觸發(fā)快速重傳
        ikcp_parse_fastack(kcp, maxack, latest_ts);
    }

    if (_itimediff(kcp->snd_una, prev_una) > 0) {
        // 更新cwnd
        if (kcp->cwnd < kcp->rmt_wnd) {
            IUINT32 mss = kcp->mss;
            if (kcp->cwnd < kcp->ssthresh) {
                // 小于閾值,線性增加
                kcp->cwnd++;
                kcp->incr += mss;
            }   else {
                if (kcp->incr < mss) kcp->incr = mss;
                // incr += mss * (1/n + 1/16)
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                #if 1
                    kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
                #else
                    kcp->cwnd++;
                #endif
                }
            }
            // 最大不超過rmt_wnd
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }

    return 0;
}

4.4 ikcp_recv
從rcv_queue中讀取一幀完整的數(shù)據(jù),再將rcv_buf中的連續(xù)分片轉(zhuǎn)移到rcv_queue,最后判斷是否需要告知自身窗口大小。

int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
    struct IQUEUEHEAD *p;
    int ispeek = (len < 0)? 1 : 0;
    int peeksize;
    int recover = 0;
    IKCPSEG *seg;
    assert(kcp);

    if (iqueue_is_empty(&kcp->rcv_queue))
        return -1;

    if (len < 0) len = -len;

    // 獲取一幀的數(shù)據(jù)大小
    peeksize = ikcp_peeksize(kcp);

    if (peeksize < 0) 
        return -2;

    if (peeksize > len) 
        return -3;

    // 標(biāo)記可以開始窗口恢復(fù)
    if (kcp->nrcv_que >= kcp->rcv_wnd)
        recover = 1;

    // merge fragment
    for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
        // 將一幀完整的數(shù)據(jù)從rcv_queue復(fù)制到data緩沖區(qū)
        int fragment;
        seg = iqueue_entry(p, IKCPSEG, node);
        p = p->next;

        if (buffer) {
            memcpy(buffer, seg->data, seg->len);
            buffer += seg->len;
        }

        len += seg->len;
        fragment = seg->frg;

        if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
            ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
        }

        if (ispeek == 0) {
            iqueue_del(&seg->node);
            ikcp_segment_delete(kcp, seg);
            kcp->nrcv_que--;
        }

        if (fragment == 0) 
            break;
    }

    assert(len == peeksize);

    // move available data from rcv_buf -> rcv_queue
    // 將rcv_buf中連續(xù)段序號的數(shù)據(jù)包移動到rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        // 判斷段序號是否連續(xù) && 接收隊列長度小于自身接收窗口
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            // 添加到rcv_queue
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }   else {
            // 待接收數(shù)據(jù)包的序號不連續(xù),break,在下一次接收再提交
            break;
        }
    }

    // fast recover
    if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        // 下一次update時,發(fā)送窗口大小告知包
        kcp->probe |= IKCP_ASK_TELL;
    }

    return len;
}

參考:
KCP - A Fast and Reliable ARQ Protocol
kcp源碼分析-知乎

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概念: FastDFS是余慶(前阿里巴巴架構(gòu)師,現(xiàn)易到用車架構(gòu)師)開發(fā)的一個開源的輕量級分布式文件系統(tǒng),對于小文件...
    yingyingguigui閱讀 5,988評論 0 2
  • 分布式緩存 客戶端緩存的使用場景: 1、對數(shù)據(jù)一致性要求不高 2、需要的緩存數(shù)量是可控的 3、提升一些并發(fā)能力 緩...
    IM后海大鯊魚閱讀 1,198評論 0 0
  • 面試官:為什么用Okhttp,而不選擇其它網(wǎng)絡(luò)框架? 支持HTTP2/SPDY,允許所有同一個主機地址的請求共享同...
    鵬城十八少閱讀 2,074評論 1 4
  • channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu),通常與goroutine一起使用,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)...
    cfanbo閱讀 333評論 0 0
  • redis是什么 redis是一個非關(guān)系型數(shù)據(jù)庫,以KV結(jié)構(gòu)存儲數(shù)據(jù),提供了多種數(shù)據(jù)類型和各自的本地方法,單線程處...
    小丸子的呆地閱讀 629評論 0 2

友情鏈接更多精彩內(nèi)容