前 言
當(dāng)游戲?qū)崟r性,打擊感要求比較高,或者需要控制大量游戲單位時,往往會使用幀同步。幀同步則需要可靠的UDP協(xié)議,一般會選擇KCP。有關(guān)KCP的優(yōu)點和特性不再贅述了,網(wǎng)上包括官網(wǎng)有關(guān)于KCP的介紹。本文主要分析KCP的源碼以及實現(xiàn)。
源 碼
一、雙向鏈表 IQUEUEHEAD
KCP中設(shè)計的鏈表是一個環(huán)狀雙向鏈表,結(jié)構(gòu)體只有兩個指針,源碼以及操作接口如下:
struct IQUEUEHEAD {
struct IQUEUEHEAD *next, *prev;
};
// 向后指針和向前指針均指向自己, 形成環(huán)狀雙向鏈表
#define IQUEUE_INIT(ptr) ( \
(ptr)->next = (ptr), (ptr)->prev = (ptr))
// MEMBER在TYPE中的偏移量
#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
// ptr所在type類型對象的首地址
#define ICONTAINEROF(ptr, type, member) ( \
(type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )
#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)
// node插入到head后面, head->node
#define IQUEUE_ADD(node, head) ( \
(node)->prev = (head), (node)->next = (head)->next, \
(head)->next->prev = (node), (head)->next = (node))
// node插入到head前面, node->head
#define IQUEUE_ADD_TAIL(node, head) ( \
(node)->prev = (head)->prev, (node)->next = (head), \
(head)->prev->next = (node), (head)->prev = (node))
// 刪除p, n之間的節(jié)點
#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n))
// 刪除entry, prev->entry->next ====>>>> prev->next
#define IQUEUE_DEL(entry) (\
(entry)->next->prev = (entry)->prev, \
(entry)->prev->next = (entry)->next, \
(entry)->next = 0, (entry)->prev = 0)
#define IQUEUE_DEL_INIT(entry) do { \
IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0)
// 鏈表是否為空
#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next)
鏈表結(jié)構(gòu)本身并沒有定義任何value字段,而是讓持有IQUEUEHEAD的結(jié)構(gòu)體自己去定義。當(dāng)持有了IQUEUEHEAD,該結(jié)構(gòu)體就可以通過IQUEUEHEAD將自己構(gòu)建成一個環(huán)狀雙向鏈表,如圖:

那如何通過IQUEUEHEAD來獲取到其所在的結(jié)構(gòu)體呢?需要看下IQUEUE_ENTRY宏的展開
(type*)( ((char*)((type*)ptr)) - ((size_t) &((type *)0)->member)) )
意思就是將IQUEUEHEAD指針ptr減去member成員在類型type中的偏移量,這樣就得到了所在類型type的首地址。
二、數(shù)據(jù)段結(jié)構(gòu) IKCPSEG
IKCPSEG是發(fā)送包的數(shù)據(jù)結(jié)構(gòu),其包含了兩部分信息:
- 發(fā)送信息:
conv、cmd、frg、wnd、ts、sn、una、len、data,其中cmd包含四種類型-
IKCP_CMD_PUSH:發(fā)送數(shù)據(jù)包 -
IKCP_CMD_ACK:返回確認包 -
IKCP_CMD_WASK:探測對端可用窗口大小 -
IKCP_CMD_WINS:告知對端自身可用窗口大小
-
- 控制信息:其余的為控制信息,不會發(fā)送給對端。暫存于發(fā)送消息隊列中,用來判斷超時重發(fā)、快速重發(fā)。
/**
|<------------ 4 bytes ------------>|
+--------+--------+--------+--------+
| conv | conv: 會話序號
+--------+--------+--------+--------+
| cmd | frg | wnd | cmd: 指令標(biāo)識; frg: 分段序號; wnd: 接收窗口大小
+--------+--------+--------+--------+
| ts | ts: 發(fā)送的時間戳
+--------+--------+--------+--------+
| sn | sn: 段序號
+--------+--------+--------+--------+
| una | una: 未收到的序號
+--------+--------+--------+--------+
| len | len: data數(shù)據(jù)的長度
+--------+--------+--------+--------+
**/
//=====================================================================
// SEGMENT
// kcp的數(shù)據(jù)段結(jié)構(gòu)
//=====================================================================
struct IKCPSEG
{
struct IQUEUEHEAD node; // 雙向鏈表定義的隊列
IUINT32 conv; // conversation, 會話序號
IUINT32 cmd; // command, 指令類型
IUINT32 frg; // fragment, 分片序號
IUINT32 wnd; // window, 可用窗口大小
IUINT32 ts; // timestamp, 發(fā)送的時間戳
IUINT32 sn; // sequence number, 段序號
IUINT32 una; // unacknowledged, 當(dāng)前未收到的序號
IUINT32 len; // length, data數(shù)據(jù)的長度
IUINT32 resendts; // 超時重發(fā)的時間戳
IUINT32 rto; // 超時重傳的時間間隔
IUINT32 fastack; // ack跳過的次數(shù), 用于快速重傳
IUINT32 xmit; // 發(fā)送的次數(shù)
char data[1];
};
三、數(shù)據(jù)塊 IKCPCB
IKCPCB用于存放KCP相關(guān)的配置、數(shù)據(jù)緩存、協(xié)議控制的數(shù)據(jù)結(jié)構(gòu),結(jié)構(gòu)如下:
//---------------------------------------------------------------------
// IKCPCB
// kcp的控制塊
//---------------------------------------------------------------------
struct IKCPCB
{
// conv:會話ID, mtu:最大傳輸單元, mss:最大分片大小, state:連接狀態(tài)
IUINT32 conv, mtu, mss, state;
// sun_una:第一個未確認的包, sen_nxt:待發(fā)送包的序號, rcv_nxt:待接收消息的序號
IUINT32 snd_una, snd_nxt, rcv_nxt;
// ts_recent(未使用), ts_lastack(未使用), ssthresh:擁塞窗口的閾值
IUINT32 ts_recent, ts_lastack, ssthresh;
// rx_rttval: rtt的變化量, rx_srtt: rtt的平滑值(smoothed),
// rx_rto: 由ack接收延遲計算出來的重傳超時時間, rx_minrto: 最小重傳超時時間
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
// sen_wnd: 發(fā)送窗口大小, rcv_wnd: 接收窗口大小, rmt_wnd: 遠端可用窗口大小,
// cwnd: 擁塞窗口大小, probe: 探查變量
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
//currunt: 當(dāng)前時間戳, interval: 內(nèi)部flush刷新間隔, ts_flush: 下次刷新時間戳, xmit: 重發(fā)消息的次數(shù)
IUINT32 current, interval, ts_flush, xmit;
// nrcv_buf: snd_buf的長度, nsnd_buf: rcv_buf的長度
IUINT32 nrcv_buf, nsnd_buf;
// nrcv_que: rcv_queue的長度, nsnd_que: snd_queue的長度
IUINT32 nrcv_que, nsnd_que;
// nodelay: 是否啟動無延遲模式, update: 是否調(diào)用過update函數(shù)的標(biāo)識
IUINT32 nodelay, updated;
//ts_probe: 下次探測窗口的時間戳, probe_wait: 探測窗口等待的時間
IUINT32 ts_probe, probe_wait;
//dead_link: 最大重傳次數(shù), incr: 可發(fā)送的最大數(shù)據(jù)量
IUINT32 dead_link, incr;
struct IQUEUEHEAD snd_queue; //發(fā)送消息的隊列
struct IQUEUEHEAD rcv_queue; //接收消息的隊列
struct IQUEUEHEAD snd_buf; //發(fā)送消息的緩存
struct IQUEUEHEAD rcv_buf; //接收消息的緩存
IUINT32 *acklist; //待發(fā)送的ack的列表, 其內(nèi)容是[sn1, ts1, sn2, ts2 …]的列表
IUINT32 ackcount; //當(dāng)前ack數(shù)量
IUINT32 ackblock; //acklist的大小
void *user; //用戶指針
char *buffer; //儲存消息字節(jié)流的內(nèi)存
int fastresend; //觸發(fā)快速重傳的重復(fù)ack個數(shù)
int fastlimit; //快速重傳次數(shù)上限
int nocwnd, stream; //nocwnd: 取消擁塞控制, stream: 是否采用流傳輸模式
int logmask; //日志的類型
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); //發(fā)送消息的回調(diào)函數(shù)
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); //寫日志的回調(diào)函數(shù)
};
四、KCP主要接口
這里主要看下四個核心接口:
// 發(fā)送消息,將本端數(shù)據(jù)寫入緩存
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
// 刷新數(shù)據(jù),發(fā)送到對端
void ikcp_flush(ikcpcb *kcp);
// 輸入數(shù)據(jù),將對端數(shù)據(jù)寫入緩存
int ikcp_input(ikcpcb *kcp, const char *data, long size);
// 接收消息
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
4.1 ikcp_send
將發(fā)送數(shù)據(jù)根據(jù)mss進行分片,分片為KCP的數(shù)據(jù)格式,按序插入到發(fā)送隊列中。分片默認構(gòu)建新的KCP包并添加到隊列,如果使用流模式分片,將判斷發(fā)送隊列的最后一個分片是否填滿mss,若沒有,則用新的數(shù)據(jù)填充分片后再分片。
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;
// mss = mtu - 24(IKCP_OVERHEAD KCP報文頭部大小)
assert(kcp->mss > 0);
if (len < 0) return -1;
// append to previous segment in streaming mode (if possible)
// 使用流模式
if (kcp->stream != 0) {
if (!iqueue_is_empty(&kcp->snd_queue)) {
// 判斷發(fā)送隊列的最后一個分片是否填滿mss
IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
if (old->len < kcp->mss) {
// 用新的數(shù)據(jù)填充分片
int capacity = kcp->mss - old->len; //分片剩余容量大小
int extend = (len < capacity)? len : capacity; //填充大小
seg = ikcp_segment_new(kcp, old->len + extend);
assert(seg);
if (seg == NULL) {
return -2;
}
iqueue_add_tail(&seg->node, &kcp->snd_queue);
memcpy(seg->data, old->data, old->len);
if (buffer) {
memcpy(seg->data + old->len, buffer, extend);
buffer += extend;
}
seg->len = old->len + extend;
seg->frg = 0;
len -= extend;
iqueue_del_init(&old->node);
ikcp_segment_delete(kcp, old);
}
}
// 沒有剩余數(shù)據(jù),返回成功
if (len <= 0) {
return 0;
}
}
// 計算分片數(shù)量
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
// 分片數(shù)量大于對端接收窗口大小
if (count >= (int)IKCP_WND_RCV) return -2;
if (count == 0) count = 1;
// fragment
// 將數(shù)據(jù)分片并添加到發(fā)送消息隊列
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
// 設(shè)置分片序號,frg從大到小
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}
return 0;
}
4.2 ikcp_flush
將數(shù)據(jù)包通過用戶提供的發(fā)送函數(shù)發(fā)送到對端,所有的數(shù)據(jù)分片累計超過mtu后才會發(fā)送出去,同時根據(jù)這發(fā)送消息的情況,調(diào)整kcp配置。
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current;
char *buffer = kcp->buffer;
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0;
int lost = 0;
// 此IKCPSEG被序列化多次,反復(fù)使用
IKCPSEG seg;
// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;
// 序列化為發(fā)送確認包
seg.conv = kcp->conv;
seg.cmd = IKCP_CMD_ACK;
seg.frg = 0;
seg.wnd = ikcp_wnd_unused(kcp);
seg.una = kcp->rcv_nxt;
seg.len = 0;
seg.sn = 0;
seg.ts = 0;
// flush acknowledges
// 循環(huán)發(fā)送確認包
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
// 超過mtu才發(fā)送,否則下文會不斷序列化seg到buffer中
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
// 根據(jù)待發(fā)送的確認包列表acklist,填充段序號和發(fā)送時間戳
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
// 序列化seg到buffer中
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
// probe window size (if remote window size equals zero)
if (kcp->rmt_wnd == 0) {
// 探測窗口等待時間為0
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT; //重新賦值探測窗口等待時間,默認7秒
kcp->ts_probe = kcp->current + kcp->probe_wait; //重新賦值下次探測窗口時間戳
}
else {
// 是否到達探測時間
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
// 更新探測窗口等待時間和下次探測窗口時間戳
// 設(shè)置探測變量為 IKCP_ASK_SEND,發(fā)送窗口探測
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND;
}
}
} else {
// 遠端窗口正常,不需要探測,不發(fā)送窗口探測
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}
// flush window probing commands
// 是否需要探測對端窗口
if (kcp->probe & IKCP_ASK_SEND) {
// 序列化對端窗口探測包
seg.cmd = IKCP_CMD_WASK;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
// flush window probing commands
// 是否需要告知對端自身窗口大小
if (kcp->probe & IKCP_ASK_TELL) {
// 序列化告知對端自身窗口包
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->probe = 0;
// calculate window size
// 計算最小發(fā)送窗口,取自身窗口大小與對端窗口大小的最小值
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
// 如果沒有取消擁塞控制,取配置擁塞窗口、自身窗口和對端窗口三者最小值
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
// move data from snd_queue to snd_buf
// 將滿足窗口大小個數(shù)的數(shù)據(jù)分片,從kcp->snd_queue移動到kcp->snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
// 從snd_queue中刪除自身
iqueue_del(&newseg->node);
// 添加到snd_buf中,注意這里是插入到了snd_buf的前面
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
// 設(shè)置數(shù)據(jù)分片屬性
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
// calculate resent
// 重新計算觸發(fā)快速重傳的重復(fù)ack個數(shù)
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
// 重新計算超時重傳時間,rx_rto的1/8
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
// flush data segments
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
// 第一次發(fā)送
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
// 超過了重發(fā)時間
else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)?
((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
}
segment->resendts = current + segment->rto;
// 發(fā)生超時重發(fā)
lost = 1;
}
// 跳過應(yīng)答的次數(shù)超過了配置
else if (segment->fastack >= resent) {
// 發(fā)送次數(shù) <= 快速重傳上限 || 未設(shè)置快速重發(fā)上限
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
// 發(fā)生快速重傳
change++;
}
}
// 滿足任意以上三個條件,則需要發(fā)送
if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;
size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;
if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, segment);
// 把data數(shù)據(jù)寫入到kcp->buffer
if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}
// flash remain segments
// 將剩余的buffer發(fā)送出去
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}
// update ssthresh
// 根據(jù)發(fā)送消息情況,調(diào)整kcp配置
// 是否發(fā)生快速重傳
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}
// 是否發(fā)生超時重發(fā)
if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
}
4.3 ikcp_input
輸入處理對端的消息數(shù)據(jù),根據(jù)不同的操作類型進行不同的處理,寫入緩存。計算并調(diào)整窗口。
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
IUINT32 maxack = 0, latest_ts = 0;
int flag = 0;
if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
}
// 數(shù)據(jù)為空 || 消息大小小于KCP報文
if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;
while (1) {
IUINT32 ts, sn, len, una, conv;
IUINT16 wnd;
IUINT8 cmd, frg;
IKCPSEG *seg;
if (size < (int)IKCP_OVERHEAD) break;
// 獲取KCP報文頭部信息
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
size -= IKCP_OVERHEAD;
if ((long)size < (long)len || (int)len < 0) return -2;
// 驗證指令類型
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;
// 更新對端接收窗口[1]
kcp->rmt_wnd = wnd;
// 確認(刪除)una之前的數(shù)據(jù)包
ikcp_parse_una(kcp, una);
// 更新snd_una(未確認包序號)
ikcp_shrink_buf(kcp);
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
// 更新rx_rttval rx_srtt rx_rto
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
// 確認sn數(shù)據(jù)包
ikcp_parse_ack(kcp, sn);
// 更新snd_una
ikcp_shrink_buf(kcp);
// 記錄最后一個包段序號和時間戳
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
ikcp_log(kcp, IKCP_LOG_IN_ACK,
"input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn,
(long)_itimediff(kcp->current, ts),
(long)kcp->rx_rto);
}
}
else if (cmd == IKCP_CMD_PUSH) {
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
// 添加sn確認包
ikcp_ack_push(kcp, sn, ts);
// 處理接收窗口范圍內(nèi)的數(shù)據(jù)包
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
// 處理消息數(shù)據(jù)
ikcp_parse_data(kcp, seg);
}
}
}
else if (cmd == IKCP_CMD_WASK) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
// 更新探測變量
// 在update時告知對端自身接收窗口的大小
kcp->probe |= IKCP_ASK_TELL;
if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
}
}
else if (cmd == IKCP_CMD_WINS) {
// do nothing
// 在上文[1]已經(jīng)更新對端的接收窗口大小
if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
ikcp_log(kcp, IKCP_LOG_IN_WINS,
"input wins: %lu", (unsigned long)(wnd));
}
}
else {
return -3;
}
data += len;
size -= len;
}
// 是否有處理確認包
if (flag != 0) {
// 遍歷發(fā)送緩沖,序號小于maxack消息的fastack++,以觸發(fā)快速重傳
ikcp_parse_fastack(kcp, maxack, latest_ts);
}
if (_itimediff(kcp->snd_una, prev_una) > 0) {
// 更新cwnd
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) {
// 小于閾值,線性增加
kcp->cwnd++;
kcp->incr += mss;
} else {
if (kcp->incr < mss) kcp->incr = mss;
// incr += mss * (1/n + 1/16)
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
// 最大不超過rmt_wnd
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}
return 0;
}
4.4 ikcp_recv
從rcv_queue中讀取一幀完整的數(shù)據(jù),再將rcv_buf中的連續(xù)分片轉(zhuǎn)移到rcv_queue,最后判斷是否需要告知自身窗口大小。
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);
if (iqueue_is_empty(&kcp->rcv_queue))
return -1;
if (len < 0) len = -len;
// 獲取一幀的數(shù)據(jù)大小
peeksize = ikcp_peeksize(kcp);
if (peeksize < 0)
return -2;
if (peeksize > len)
return -3;
// 標(biāo)記可以開始窗口恢復(fù)
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;
// merge fragment
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
// 將一幀完整的數(shù)據(jù)從rcv_queue復(fù)制到data緩沖區(qū)
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}
len += seg->len;
fragment = seg->frg;
if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}
if (fragment == 0)
break;
}
assert(len == peeksize);
// move available data from rcv_buf -> rcv_queue
// 將rcv_buf中連續(xù)段序號的數(shù)據(jù)包移動到rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
// 判斷段序號是否連續(xù) && 接收隊列長度小于自身接收窗口
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
// 添加到rcv_queue
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
// 待接收數(shù)據(jù)包的序號不連續(xù),break,在下一次接收再提交
break;
}
}
// fast recover
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
// 下一次update時,發(fā)送窗口大小告知包
kcp->probe |= IKCP_ASK_TELL;
}
return len;
}