傳統(tǒng)游戲項(xiàng)目一般使用TCP協(xié)議進(jìn)行通信,得益于它的穩(wěn)定和可靠,不過(guò)在網(wǎng)絡(luò)不穩(wěn)定的情況下,會(huì)出現(xiàn)丟包嚴(yán)重。
不過(guò)近期有不少基于UDP的應(yīng)用層協(xié)議,聲稱對(duì)UDP的不可靠進(jìn)行了改造,這意味著我們既可以享受網(wǎng)絡(luò)層提供穩(wěn)定可靠的服務(wù),又可以享受它的速度。
KCP就是這樣的一個(gè)協(xié)議
不過(guò)網(wǎng)上說(shuō)的再天花亂墜,我們也得親自調(diào)研,分析源碼和它的機(jī)制,并測(cè)試它的性能,是否滿足項(xiàng)目上線要求。本文從C版本的源碼入手理解KCP的機(jī)制,再研究各種Java版本的實(shí)現(xiàn)
一、KCP協(xié)議
原版源碼(C代碼):https://github.com/skywind3000/kcp
基于底層協(xié)議(一般是UDP)之上,完全在應(yīng)用層實(shí)現(xiàn)類TCP的可靠機(jī)制(快速重傳,擁塞控制等)
二、KCP特性
KCP實(shí)現(xiàn)以下特性,也可參考github中README中對(duì)KCP的定義
| 特性 | 說(shuō)明 | 源碼位置 |
|---|---|---|
| RTO優(yōu)化 | 超時(shí)時(shí)間計(jì)算優(yōu)于TCP | ikcp_update_ack |
| 選擇性重傳 | KCP只重傳真正丟失的數(shù)據(jù)包,TCP會(huì)全部重傳丟失包之后的全部數(shù)據(jù) | ikcp_parse_fastack,ikcp_flush |
| 快速重傳 | 根據(jù)配置,可以在丟失包被跳過(guò)一定次數(shù)后直接重傳,不等RTO超時(shí) | ikcp_parse_fastack,ikcp_flush |
| UNA + ACK | ARQ模型響應(yīng)有兩種,UNA(此編號(hào)前所有包已收到,如TCP),ACK(該編號(hào)包已收到),光用UNA將導(dǎo)致全部重傳,光用ACK則丟失成本太高,以往協(xié)議都是二選其一,而 KCP協(xié)議中,除去單獨(dú)的 ACK包外,所有包都有UNA信息。 | ikcp_flush(每次update,都發(fā)送ACK) |
| 非延遲ACK | KCP可配置是否延遲發(fā)送ACK | ikcp_update_ack |
| 流量控制 | 同TCP的公平退讓原則,發(fā)送窗口大小由:發(fā)送緩存大小、接收端剩余接收緩存大小、丟包退讓及慢啟動(dòng)這四要素決定 | ikcp_input,ikcp_flush |
三、KCP報(bào)文
1. 報(bào)文解析源碼
源碼中對(duì)報(bào)文解析部分代碼如下
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
2. 報(bào)文定義
報(bào)文中標(biāo)識(shí)的定義
| 名詞 | 全稱 | 備注 | 作用 |
|---|---|---|---|
| conv | conversation id | 會(huì)話ID | 每個(gè)連接的唯一標(biāo)識(shí) |
| cmd | command | 命令 | 每個(gè)數(shù)據(jù)包指定邏輯 |
| frg | fragment count | 數(shù)據(jù)分段序號(hào) | 根據(jù)mtu(最大傳輸單元)和mss(最大報(bào)文長(zhǎng)度)的數(shù)據(jù)分段 |
| wnd | window size | 接收窗口大小 | 流量控制 |
| ts | timestamp | 時(shí)間戳 | 數(shù)據(jù)包發(fā)送時(shí)間記錄 |
| sn | serial number | 數(shù)據(jù)報(bào)的序號(hào) | 確保包的有序 |
| una | un-acknowledged serial number | 對(duì)端下一個(gè)要接收的數(shù)據(jù)報(bào)序號(hào) | 確保包的有序 |
3. 消息類型
KCP報(bào)文的四種消息類型
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data: 推送數(shù)據(jù)
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack: 對(duì)推送數(shù)據(jù)的確認(rèn)
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask): 詢問窗口大小
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell): 回復(fù)窗口大小
-
報(bào)文結(jié)構(gòu)

四、源碼解析
在網(wǎng)絡(luò)四層模型中,KCP和TCP/UDP(傳輸層),IP(網(wǎng)絡(luò)層)等協(xié)議有著本質(zhì)上區(qū)別,理論上KCP是屬于應(yīng)用層協(xié)議。
KCP并不提供協(xié)議實(shí)際收發(fā)處理,它只是在傳輸層只上對(duì)消息和鏈接的一層中間管理。
在KCP的源碼中,它僅僅包含ikcp.c和ikcp.h兩個(gè)文件,僅提供KCP的數(shù)據(jù)管理和數(shù)據(jù)接口,而用戶需要在應(yīng)用層進(jìn)行KCP的調(diào)度
1. 結(jié)構(gòu)體定義
KCP分包結(jié)構(gòu)KCP對(duì)象結(jié)構(gòu)體定義
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //用來(lái)標(biāo)記這個(gè)seg屬于哪個(gè)kcp
IUINT32 cmd;//這個(gè)包的指令是: // 數(shù)據(jù) ack 詢問/應(yīng)答窗口大小
IUINT32 frg; //分包時(shí),分包的序號(hào),0為終結(jié)
IUINT32 wnd;//發(fā)送這個(gè)seg的這個(gè)端的 窗口大小--> 遠(yuǎn)端的接收窗口大小
IUINT32 ts; //我不知道為什么要用時(shí)間軸,這個(gè)都1秒,有什么用 ??
IUINT32 sn;//相當(dāng)于tcp的ack
IUINT32 una;//una 遠(yuǎn)端等待接收的一個(gè)序號(hào)
IUINT32 len; //data的長(zhǎng)度
IUINT32 resendts;//重發(fā)的時(shí)間軸
IUINT32 rto;//等于發(fā)送端kcp的 rx_rto->由 計(jì)算得來(lái)
IUINT32 fastack;//ack跳過(guò)的次數(shù),用于快速重傳
IUINT32 xmit;// fastack resend次數(shù)
char data[1];//當(dāng)malloc時(shí),只需要 malloc(sizeof(IKCPSEG)+datalen) 則,data長(zhǎng)=數(shù)據(jù)長(zhǎng)度+1 剛好用來(lái)放0
};
struct IKCPCB
{
//會(huì)話ID,最大傳輸單元,最大分片大小,狀態(tài) mss=mtu-sizeof(IKCPSEG)
IUINT32 conv, mtu, mss, state;
//第一個(gè)未接收到的包,待發(fā)送的包(可以認(rèn)為是tcp的ack自增),接收消息的序號(hào)-> 用來(lái)賦seg的una值
IUINT32 snd_una, snd_nxt, rcv_nxt;
//前兩個(gè)不知道干嘛 擁塞窗口的閾值 用來(lái)控制cwnd值變化的
IUINT32 ts_recent, ts_lastack, ssthresh;
//這幾個(gè)變量是用來(lái)更新rto的
// rx_rttval 接收ack的浮動(dòng)值
// rx_srtt 接收ack的平滑值
// rx_rto 計(jì)算出來(lái)的rto
// rx_minrto 最小rto
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
//發(fā)送隊(duì)列的窗口大小
//接收隊(duì)列的窗口大小
//遠(yuǎn)端的接收隊(duì)列的窗口大小
//窗口大小
//probe 用來(lái)二進(jìn)制標(biāo)記
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
//時(shí)間軸 時(shí)間間隔 下一次flush的時(shí)間 xmit發(fā)射多少次? 看不到有什么地方用到
IUINT32 current, interval, ts_flush, xmit;
//接收到的數(shù)據(jù)seg個(gè)數(shù)
//需要發(fā)送的seg個(gè)數(shù)
IUINT32 nrcv_buf, nsnd_buf;
//接收隊(duì)列的數(shù)據(jù) seg個(gè)數(shù)
//發(fā)送隊(duì)列的數(shù)據(jù) seg個(gè)數(shù)
IUINT32 nrcv_que, nsnd_que;
//是否為nodelay模式:如果開啟,rto計(jì)算范圍更小
//updated 在調(diào)用flush時(shí),有沒有調(diào)用過(guò)update
IUINT32 nodelay, updated;
//請(qǐng)求訪問窗口的時(shí)間相關(guān) 當(dāng)遠(yuǎn)程端口大小為0時(shí)
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
//發(fā)送隊(duì)列
struct IQUEUEHEAD snd_queue;
//接收隊(duì)列
struct IQUEUEHEAD rcv_queue;
//待發(fā)送隊(duì)列
struct IQUEUEHEAD snd_buf;
//待接收隊(duì)列
struct IQUEUEHEAD rcv_buf;
//用來(lái)緩存自己接收到了多少個(gè)ack
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
//用戶信息
void *user;
//好像就用來(lái)操作數(shù)據(jù)的中轉(zhuǎn)站
char *buffer;
//快速重傳的閾值
int fastresend;
//快速重傳的上限
int fastlimit;
//是否無(wú)視重傳等其它設(shè)置窗口
//steam模式的話,會(huì)將幾個(gè)小包合并成大包
int nocwnd, stream;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
2. 接口分析
分析C源碼,KCP作為中間管理層,主要提供以下接口
//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------
// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
// 創(chuàng)建kcp對(duì)象,conv必須在兩個(gè)端之間相同,user會(huì)被傳遞到output回調(diào),
// output回調(diào)這樣設(shè)置:kcp->output = my_udp_output
ikcpcb* ikcp_create(IUINT32 conv, void *user);
// release kcp control object
// 釋放kcp對(duì)象
void ikcp_release(ikcpcb *kcp);
// set output callback, which will be invoked by kcp
// 設(shè)置kcp調(diào)用的output回調(diào)
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));
// user/upper level recv: returns size, returns below zero for EAGAIN
// 用戶層/上層 接收消息:返回接收長(zhǎng)度,數(shù)據(jù)讀取錯(cuò)誤返回值小于0
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
// user/upper level send, returns below zero for error
// 用戶層/上層 發(fā)送消息,錯(cuò)誤返回值小于0
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
// 更新狀態(tài)(每10ms-100ms調(diào)用一次),或者你可以通過(guò)調(diào)用ikcp_check,
// 來(lái)得知什么時(shí)候再次調(diào)用(不調(diào)用ikcp_input/_send)
// current - 當(dāng)前時(shí)間戳(毫秒)
void ikcp_update(ikcpcb *kcp, IUINT32 current);
// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
// 決定你什么時(shí)候調(diào)用ikcp_update
// 返回你多少毫秒后應(yīng)該調(diào)用ikcp_update,如果沒有ikcp_input/_send調(diào)用,你可以在那個(gè)時(shí)間
// 調(diào)用ikcp_updates來(lái)代替自己驅(qū)動(dòng)update調(diào)用
// 用于減少不必要的ikcp_update調(diào)用。用這個(gè)來(lái)驅(qū)動(dòng)ikcp_update(比如:實(shí)現(xiàn)類epoll的機(jī)制,
// 或者優(yōu)化處理大量kcp連接時(shí)的ikcp_update調(diào)用)
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);
// when you received a low level packet (eg. UDP packet), call it
// 接收下層數(shù)據(jù)包(比如:UDP數(shù)據(jù)包)時(shí)調(diào)用
int ikcp_input(ikcpcb *kcp, const char *data, long size);
// flush pending data
// 刷新數(shù)據(jù)
void ikcp_flush(ikcpcb *kcp);
// check the size of next message in the recv queue
// 檢測(cè)接收隊(duì)列里下條消息的長(zhǎng)度
int ikcp_peeksize(const ikcpcb *kcp);
// change MTU size, default is 1400
// 修改MTU長(zhǎng)度,默認(rèn)1400
int ikcp_setmtu(ikcpcb *kcp, int mtu);
// set maximum window size: sndwnd=32, rcvwnd=32 by default
// 設(shè)置最大窗口大小,默認(rèn)值:sndwnd=32, rcvwnd=32
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
// get how many packet is waiting to be sent
// 獲取準(zhǔn)備發(fā)送的數(shù)據(jù)包
int ikcp_waitsnd(const ikcpcb *kcp);
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
// 快速設(shè)置:ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay:0:使用(默認(rèn)),1:使用
// interval:update時(shí)間(毫秒),默認(rèn)100ms
// resend:0:不適用快速重發(fā)(默認(rèn)), 其他:自己設(shè)置值,若設(shè)置為2(則2次ACK跨越將會(huì)直接重傳)
// nc:0:正常擁塞控制(默認(rèn)), 1:不適用擁塞控制
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);
// setup allocator
// 設(shè)置kcp allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));
// read conv
// 獲取conv
IUINT32 ikcp_getconv(const void *ptr);
3. 調(diào)度邏輯

KCP關(guān)鍵接口:
-
更新(上層驅(qū)動(dòng)KCP狀態(tài)更新)
ikcp_update:kcp狀態(tài)更新接口,需要上層進(jìn)行調(diào)度,判斷flush時(shí)間,滿足條件調(diào)用ikcp_flush刷新數(shù)據(jù),同時(shí)也負(fù)責(zé)對(duì)收到數(shù)據(jù)的kcp端回復(fù)ACK消息 -
發(fā)送
ikcp_send -> ikcp_update -> ikcp_output
ikcp_send:上層調(diào)用發(fā)送接口,把數(shù)據(jù)根據(jù)mss值進(jìn)行分片,設(shè)置分包編號(hào),放到snd_queue隊(duì)尾
ikcp_flush:發(fā)送數(shù)據(jù)接口,根據(jù)對(duì)端窗口大小,拷貝snd_queue的數(shù)據(jù)到snd_buf,遍歷snd_buf,滿足條件則調(diào)用output回調(diào)(調(diào)用網(wǎng)絡(luò)層的發(fā)送) -
接收
ikcp_input -> ikcp_update -> ikcp_recv
ikcp_input:解析上層輸入數(shù)據(jù),拷貝rcv_buf到rcv_queue
ikcp_recv:數(shù)據(jù)接收接口,上層從rcv_queue中復(fù)制數(shù)據(jù)到網(wǎng)絡(luò)層buffer
五、Java版本
目前github上有幾個(gè)高star的java版本實(shí)現(xiàn),選取最高的三個(gè)進(jìn)行分析
1. https://github.com/szhnet/kcp-netty.git(star:212)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.UkcpServerChannel繼承ServerChannel,UkcpServerBootStrap
3.用Boss線程EventLoopGroup的read事件來(lái)驅(qū)動(dòng)KCP邏輯
優(yōu)點(diǎn):使用Netty的Boss線程Read事件來(lái)驅(qū)動(dòng)KCP,不用while(true)的驅(qū)動(dòng);使用簡(jiǎn)單,只需使用指定的ServerChannel和ServerBootStrap來(lái)啟動(dòng)Netty
缺點(diǎn):無(wú)明顯缺點(diǎn)
2. https://github.com/beykery/jkcp.git(star:172)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.啟動(dòng)指定線程數(shù)的KcpThread自定義IO線程池,進(jìn)行KCP邏輯調(diào)度
3.Netty讀消息時(shí)拋到KcpThread自定義IO線程
// 通過(guò)hash選擇IO線程處理
InetSocketAddress sender = dp.sender();
int hash = sender.hashCode();
hash = hash < 0 ? -hash : hash;
this.workers[hash % workers.length].input(dp);
優(yōu)點(diǎn):代碼簡(jiǎn)單明了,容易理解,核心是翻譯版源碼,外殼套的是Netty+自定義IO線程池
缺點(diǎn):IO線程池會(huì)while(true)的調(diào)用KCP的update
3. https://github.com/l42111996/java-Kcp.git(star:187)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.Netty讀消息時(shí),扔到定時(shí)器,1ms后,拋出任務(wù)到自定義IO線程
優(yōu)點(diǎn):擁有1的全部?jī)?yōu)點(diǎn),也在Netty的讀消息,把消息拋到定時(shí)器去調(diào)用KCP的邏輯,避免了2的無(wú)意義的while(true),同時(shí)實(shí)現(xiàn)功能更全,有上線項(xiàng)目驗(yàn)證(據(jù)作者描述)
缺點(diǎn):Netty相關(guān)邏輯完全封裝起來(lái),不能修改任何Netty參數(shù)(不過(guò)源碼中對(duì)Netty的參數(shù)已配置的很好了)
目前看來(lái),第三種實(shí)現(xiàn)(https://github.com/l42111996/java-Kcp.git)是最理想的方式
如果大家感興趣,后邊會(huì)對(duì)第三種實(shí)現(xiàn)進(jìn)行詳細(xì)的源碼分析
六、性能測(cè)試
近期準(zhǔn)備做性能測(cè)試進(jìn)行對(duì)比,感興趣的朋友可以關(guān)注下
// TODO