一、前言
相信現(xiàn)在很多App都會(huì)有通訊功能,可能它要求是tcp、udp或者websocket等,每次開(kāi)發(fā)者需要自己再去找個(gè)輪子,這樣繁瑣且耗時(shí),所以本文旨意在打造一個(gè)通用的可配置化的IM SDK。文筆有限,如有不妥或者錯(cuò)誤之處,懇請(qǐng)?jiān)谠u(píng)論、私信或者郵箱里指出,萬(wàn)分感謝。
先上圖


這里直接模擬兩個(gè)用戶通訊,詳情使用讀者可以直接移步Github查看NettyIM
二、功能介紹
- 支持TCP協(xié)議
- 支持WebSocket的ws、wss協(xié)議
- 支持UDP協(xié)議
- 內(nèi)置一套默認(rèn)私有協(xié)議實(shí)現(xiàn)
- 支持?jǐn)嗑€重連、連接重試
- 地址自動(dòng)切換
- 支持消息重發(fā)、消息確認(rèn)機(jī)制
- 支持心跳機(jī)制
- tcp協(xié)議、udp協(xié)議、websocket都支持握手鑒權(quán)
- 提供Netty消息處理器注冊(cè)
- 支持自定義編解碼器
- 連接狀態(tài)、消息狀態(tài)監(jiān)聽(tīng)
- 支持單個(gè)消息設(shè)置是否需要確認(rèn)包、是否失敗重發(fā)
- 支持各種參數(shù)配置
三、Netty
什么是Netty?
Netty 是一個(gè)利用 Java 的高級(jí)網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個(gè)易于使用的 API 的客戶端/服務(wù)器框架。
Netty 是一個(gè)廣泛使用的 Java 網(wǎng)絡(luò)編程框架(Netty 在 2011 年獲得了Duke's Choice Award,見(jiàn)https://www.java.net/dukeschoice/2011)。它活躍和成長(zhǎng)于用戶社區(qū),像大型公司 Facebook 和 Instagram 以及流行 開(kāi)源項(xiàng)目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其強(qiáng)大的對(duì)于網(wǎng)絡(luò)抽象的核心代碼。
以上是摘自《Essential Netty In Action》這本書(shū)
為什么選擇Netty?
Netty是業(yè)界最流行的NIO框架之一,它的健壯性、功能、性能、可定制性和可擴(kuò)展性在同類(lèi)框架中都是首屈一指的,它已經(jīng)得到成百上千的商用項(xiàng)目驗(yàn)證,例如Hadoop的RPC框架avro使用Netty作為底層通信框架。很多其它業(yè)界主流的RPC框架,也使用Netty來(lái)構(gòu)建高性能的異步通信能力。
通過(guò)對(duì)Netty的分析,我們將它的優(yōu)點(diǎn)總結(jié)如下:
- API使用簡(jiǎn)單,開(kāi)發(fā)門(mén)檻低;
- 功能強(qiáng)大,預(yù)置了多種編解碼功能,支持多種主流協(xié)議;
- 定制能力強(qiáng),可以通過(guò)ChannelHandler對(duì)通信框架進(jìn)行靈活的擴(kuò)展;
- 性能高,通過(guò)與其它業(yè)界主流的NIO框架對(duì)比,Netty的綜合性能最優(yōu);
- 成熟、穩(wěn)定,Netty修復(fù)了已經(jīng)發(fā)現(xiàn)的所有JDK NIO BUG,業(yè)務(wù)開(kāi)發(fā)人員不需要再為NIO的BUG而煩惱;
- 社區(qū)活躍,版本迭代周期短,發(fā)現(xiàn)的BUG可以被及時(shí)修復(fù),同時(shí),更多的新功能會(huì)被加入;
- 經(jīng)歷了大規(guī)模的商業(yè)應(yīng)用考驗(yàn),質(zhì)量已經(jīng)得到驗(yàn)證。在互聯(lián)網(wǎng)、大數(shù)據(jù)、網(wǎng)絡(luò)游戲、企業(yè)應(yīng)用、電信軟件等眾多行業(yè)得到成功商用,證明了它可以完全滿足不同行業(yè)的商業(yè)應(yīng)用。
正是因?yàn)檫@些優(yōu)點(diǎn),Netty逐漸成為Java NIO編程的首選框架。
以上是摘自[《Netty 權(quán)威指南》—— 選擇Netty的理由
](http://ifeve.com/netty-2-6/)
四、多種協(xié)議支持
1、 TCP
簡(jiǎn)介:
TCP協(xié)議是一種在計(jì)算機(jī)網(wǎng)絡(luò)中常用的傳輸層協(xié)議,它負(fù)責(zé)提供可靠的、面向連接的數(shù)據(jù)傳輸服務(wù)。TCP保證數(shù)據(jù)的可靠傳輸,提供流量控制、擁塞控制和錯(cuò)誤恢復(fù)等功能,以保證網(wǎng)絡(luò)的可靠性和穩(wěn)定性。TCP常用于許多應(yīng)用層協(xié)議,如HTTP、FTP、Telnet等。
優(yōu)點(diǎn):
- 可靠性
- 應(yīng)答機(jī)制:在TCP協(xié)議中,每個(gè)數(shù)據(jù)包都有一個(gè)序號(hào)和確認(rèn)號(hào),接收端會(huì)對(duì)每個(gè)數(shù)據(jù)包進(jìn)行確認(rèn)應(yīng)答。如果發(fā)送端在一定時(shí)間內(nèi)沒(méi)有收到確認(rèn)應(yīng)答,就會(huì)認(rèn)為數(shù)據(jù)包丟失,需要重新發(fā)送數(shù)據(jù)包。
- 重傳機(jī)制:如果某個(gè)數(shù)據(jù)包沒(méi)有按序到達(dá)或者丟失,接收端會(huì)要求發(fā)送端重新發(fā)送該數(shù)據(jù)包。發(fā)送端會(huì)定期重傳未收到確認(rèn)應(yīng)答的數(shù)據(jù)包,直到接收端確認(rèn)收到為止。
- 滑動(dòng)窗口機(jī)制:TCP協(xié)議使用滑動(dòng)窗口機(jī)制進(jìn)行流量控制。發(fā)送端和接收端都有一個(gè)窗口大小,用于控制數(shù)據(jù)包的發(fā)送和接收。發(fā)送端根據(jù)接收端的窗口大小來(lái)控制發(fā)送速率,接收端根據(jù)自己的窗口大小來(lái)控制接收速率,以避免網(wǎng)絡(luò)擁塞和數(shù)據(jù)丟失。
- 擁塞控制機(jī)制:TCP協(xié)議使用擁塞控制機(jī)制來(lái)避免網(wǎng)絡(luò)擁塞。如果網(wǎng)絡(luò)出現(xiàn)擁塞,TCP會(huì)降低發(fā)送速率,以避免數(shù)據(jù)丟失和網(wǎng)絡(luò)崩潰。
- 有序性:
- 序號(hào)機(jī)制:在TCP協(xié)議中,每個(gè)數(shù)據(jù)包都有一個(gè)序號(hào),用于標(biāo)識(shí)數(shù)據(jù)包在數(shù)據(jù)流中的位置。發(fā)送端會(huì)按照序號(hào)將數(shù)據(jù)包進(jìn)行排序,并將序號(hào)添加到數(shù)據(jù)包的首部。接收端會(huì)按照序號(hào)將數(shù)據(jù)包進(jìn)行排序,以保證數(shù)據(jù)的有序傳輸。
- 應(yīng)答機(jī)制:在TCP協(xié)議中,接收端會(huì)對(duì)每個(gè)數(shù)據(jù)包進(jìn)行確認(rèn)應(yīng)答。如果發(fā)送端在一定時(shí)間內(nèi)沒(méi)有收到確認(rèn)應(yīng)答,就會(huì)認(rèn)為數(shù)據(jù)包丟失,需要重新發(fā)送數(shù)據(jù)包。這樣可以保證數(shù)據(jù)包按序到達(dá)。
- 滑動(dòng)窗口機(jī)制:TCP協(xié)議使用滑動(dòng)窗口機(jī)制進(jìn)行流量控制。發(fā)送端和接收端都有一個(gè)窗口大小,用于控制數(shù)據(jù)包的發(fā)送和接收。發(fā)送端根據(jù)接收端的窗口大小來(lái)控制發(fā)送速率,接收端根據(jù)自己的窗口大小來(lái)控制接收速率,以避免網(wǎng)絡(luò)擁塞和數(shù)據(jù)丟失。
- 面向連接:
TCP協(xié)議在傳輸數(shù)據(jù)之前需要建立連接,傳輸完成后需要釋放連接,這樣可以保證數(shù)據(jù)的有序傳輸
缺點(diǎn):
- 傳輸效率低: TCP協(xié)議提供可靠的數(shù)據(jù)傳輸,但是為了保證數(shù)據(jù)的可靠性和完整性,會(huì)進(jìn)行確認(rèn)和重傳等操作,這會(huì)增加網(wǎng)絡(luò)傳輸?shù)难舆t和開(kāi)銷(xiāo),降低傳輸效率。
- 面向連接: TCP協(xié)議需要在傳輸數(shù)據(jù)之前建立連接,傳輸完成后釋放連接,這樣會(huì)增加網(wǎng)絡(luò)開(kāi)銷(xiāo)和復(fù)雜度。
- 不適合實(shí)時(shí)應(yīng)用: 由于TCP協(xié)議對(duì)數(shù)據(jù)傳輸進(jìn)行確認(rèn)和重傳等操作,這會(huì)增加網(wǎng)絡(luò)延遲,不適合實(shí)時(shí)應(yīng)用,如視頻會(huì)議、在線游戲等。
- 安全性差: TCP協(xié)議沒(méi)有提供加密和身份驗(yàn)證等安全機(jī)制,容易受到網(wǎng)絡(luò)攻擊和竊聽(tīng)。
總結(jié):
如果我們通訊對(duì)實(shí)時(shí)要求不高、但對(duì)數(shù)據(jù)可靠性、完整性、有序性有要求,tcp是個(gè)不錯(cuò)的選擇,但注意這里的可靠、有序性?xún)H代表在傳輸層是可靠的,并不能保證我們應(yīng)用層通訊的可靠性,所以在很多采用TCP協(xié)議的通訊上都會(huì)在應(yīng)用層上加上確認(rèn)機(jī)制和重傳機(jī)制或者使用UDP協(xié)議加上TCP的一些可靠機(jī)制去實(shí)現(xiàn)。
2、UDP
簡(jiǎn)介:
UDP協(xié)議是一種用戶數(shù)據(jù)報(bào)協(xié)議,它是一種簡(jiǎn)單的、無(wú)連接的傳輸層協(xié)議,不提供可靠的數(shù)據(jù)傳輸、數(shù)據(jù)有序性和錯(cuò)誤恢復(fù)機(jī)制。UDP協(xié)議直接將應(yīng)用層的數(shù)據(jù)報(bào)發(fā)送到網(wǎng)絡(luò)層,不需要建立連接和維護(hù)狀態(tài),因此傳輸效率高。UDP協(xié)議常用于實(shí)時(shí)應(yīng)用,如音視頻傳輸、在線游戲等。
優(yōu)點(diǎn):
- 傳輸效率高: UDP協(xié)議不需要建立連接和維護(hù)狀態(tài),直接將應(yīng)用層的數(shù)據(jù)報(bào)發(fā)送到網(wǎng)絡(luò)層,因此傳輸效率高。
- 實(shí)時(shí)性好: 由于UDP協(xié)議不提供可靠性和錯(cuò)誤恢復(fù)機(jī)制,因此能夠快速傳輸數(shù)據(jù)。
- 傳輸數(shù)據(jù)較?。?/strong> UDP協(xié)議的數(shù)據(jù)報(bào)頭較小,只有8個(gè)字節(jié),相比TCP協(xié)議的20個(gè)字節(jié)要小很多,因此在傳輸數(shù)據(jù)量較小的情況下,UDP協(xié)議的開(kāi)銷(xiāo)相對(duì)較小。
- 簡(jiǎn)單: UDP協(xié)議是一種簡(jiǎn)單的協(xié)議,實(shí)現(xiàn)起來(lái)比較容易,適合于一些簡(jiǎn)單的應(yīng)用場(chǎng)景
缺點(diǎn):
- 不可靠性: UDP是無(wú)連接的,因此它不提供可靠的數(shù)據(jù)傳輸。它不會(huì)跟蹤數(shù)據(jù)包是否已到達(dá)目標(biāo),也不會(huì)重新發(fā)送丟失的數(shù)據(jù)包。這意味著,如果數(shù)據(jù)包在傳輸過(guò)程中丟失或損壞,接收方將無(wú)法知道,并且無(wú)法要求發(fā)送方重新發(fā)送。
- 無(wú)序性: UDP不保證數(shù)據(jù)包的順序。如果發(fā)送方發(fā)送的數(shù)據(jù)包按照A、B、C的順序發(fā)送,但接收方卻按照C、A、B的順序接收,那么接收方將無(wú)法正確地重構(gòu)原始數(shù)據(jù)。
- 低效性: 由于UDP不提供數(shù)據(jù)包的可靠性和有序性,因此它可能需要發(fā)送更多的數(shù)據(jù)包來(lái)確保數(shù)據(jù)的正確性。這會(huì)導(dǎo)致網(wǎng)絡(luò)擁塞和低效率的數(shù)據(jù)傳輸。
- 難以控制流量: UDP不提供擁塞控制機(jī)制,因此發(fā)送方可能會(huì)發(fā)送過(guò)多的數(shù)據(jù)包,導(dǎo)致網(wǎng)絡(luò)擁塞。這可能會(huì)對(duì)網(wǎng)絡(luò)中的其他流量產(chǎn)生負(fù)面影響。
總結(jié):
UDP是一種無(wú)連接的傳輸協(xié)議,不保證數(shù)據(jù)包的可靠性、完整性和順序。因此,在使用UDP進(jìn)行數(shù)據(jù)傳輸時(shí),需要在應(yīng)用層自行實(shí)現(xiàn)相關(guān)機(jī)制來(lái)檢測(cè)和糾正錯(cuò)誤,例如在每個(gè)數(shù)據(jù)包中添加序列號(hào)和校驗(yàn)和等信息,來(lái)檢測(cè)數(shù)據(jù)包是否有丟失和損壞、添加seq/ack機(jī)制,確保數(shù)據(jù)發(fā)送到對(duì)端、添加超時(shí)重傳等機(jī)制來(lái)實(shí)現(xiàn)可靠性,還有個(gè)點(diǎn)是數(shù)據(jù)報(bào)大小對(duì)傳輸效率的影響,當(dāng)IP數(shù)據(jù)報(bào)大于MTU,這個(gè)時(shí)候發(fā)送方IP層就需要分片。把數(shù)據(jù)報(bào)分成若干片,使每一片都小于MTU,而接收方IP層則需要進(jìn)行數(shù)據(jù)報(bào)的重組。這樣就會(huì)多做許多事情,可能會(huì)導(dǎo)致數(shù)據(jù)包的丟失或延遲,因?yàn)槊總€(gè)分片都是獨(dú)立傳輸?shù)?,可能?huì)按照不同的路徑到達(dá)目的地,也可能會(huì)在傳輸過(guò)程中丟失一些分片。因此,應(yīng)該盡量避免 UDP 分片。鑒于 Internet 上的標(biāo)準(zhǔn) MTU 值為 576 字節(jié),所以最好將 UDP 的數(shù)據(jù)長(zhǎng)度控制在 548 字節(jié)(MTU(576) - IPHeader(20) - UDPHeader(8)),但是考慮到IP頭部選項(xiàng)和一些沒(méi)有預(yù)料到的其他頭部信息,UDP 數(shù)據(jù)包的最大安全負(fù)載應(yīng)該是 508 字節(jié)(MTU(576) - IPHeader(60) - UDPHeader(8))
3、WebSocket
簡(jiǎn)介:
WebSocket是一種應(yīng)用層協(xié)議,它必須依賴(lài) HTTP 協(xié)議進(jìn)行一次握手 ,握手成功后,數(shù)據(jù)就直接從 TCP 通道傳輸。 協(xié)議支持文本和二進(jìn)制數(shù)據(jù)。客戶端和服務(wù)器都可以發(fā)送和接收這兩種類(lèi)型的數(shù)據(jù)。此外,WebSocket 還支持 ping 和 pong 消息,用于檢測(cè)連接是否仍然處于活動(dòng)狀態(tài)。
優(yōu)點(diǎn):
- 兼容性: 更好的支持 Web,并支持 HTTP 代理和中介
- 實(shí)時(shí)性: WebSocket是基于TCP傳輸層協(xié)議 可以在客戶端和服務(wù)器之間實(shí)現(xiàn)實(shí)時(shí)的雙向通信,使得客戶端可以即時(shí)地接收到服務(wù)器端的數(shù)據(jù),從而實(shí)現(xiàn)實(shí)時(shí)更新
- 安全性:支持使用 SSL/TLS(wss協(xié)議) 加密傳輸數(shù)據(jù),這可以確保數(shù)據(jù)在傳輸過(guò)程中不被竊聽(tīng)或篡改。
- 支持?jǐn)U展: WebSocket 定義了擴(kuò)展,用戶可以擴(kuò)展協(xié)議、實(shí)現(xiàn)部分自定義的子協(xié)議,例如壓縮擴(kuò)展、加密擴(kuò)展、認(rèn)證擴(kuò)展...等。
缺點(diǎn):
- 不支持所有瀏覽器: 盡管 WebSockets 已經(jīng)成為現(xiàn)代瀏覽器的標(biāo)準(zhǔn)功能,但某些舊版本的瀏覽器可能不支持它。這可能會(huì)導(dǎo)致應(yīng)用程序無(wú)法在所有瀏覽器中正常工作
- 安全性問(wèn)題: WebSocket 技術(shù)需要在客戶端和服務(wù)器之間建立持久連接,這可能會(huì)導(dǎo)致一些安全問(wèn)題。例如,攻擊者可以利用 WebSocket 連接來(lái)進(jìn)行跨站點(diǎn)腳本攻擊(XSS)和跨站點(diǎn)請(qǐng)求偽造(CSRF)等攻擊
- 容易受到網(wǎng)絡(luò)波動(dòng)的影響: WebSocket 本身是基于 TCP 協(xié)議實(shí)現(xiàn)的,因此在網(wǎng)絡(luò)波動(dòng)較大的情況下,可能會(huì)出現(xiàn)連接斷開(kāi)、傳輸延遲等問(wèn)題
總結(jié):
如果是考慮到兼容web,且需要tcp協(xié)議的一些特性,且不想自己做一些應(yīng)用層的事情,例如握手、認(rèn)證、加密等。websocket是個(gè)不錯(cuò)的選擇。對(duì)于數(shù)據(jù)格式來(lái)說(shuō),websocket支持了文本和二進(jìn)制兩種,使用者也可以直接簡(jiǎn)單的使用。
五、框架設(shè)計(jì)
1、因?yàn)镮M和OKhttp具有一定的共性, 所以本庫(kù)借鑒OKhttp設(shè)計(jì)思想,來(lái)讓我們看一下構(gòu)造一個(gè)IMClient可以有多精簡(jiǎn)。
- 通用配置
IMClient.Builder builder = new IMClient.Builder()
.setConnectTimeout(10, TimeUnit.SECONDS) //設(shè)置連接超時(shí)
.setResendCount(3)//設(shè)置失敗重試數(shù)
.setConnectRetryInterval(1000,TimeUnit.MILLISECONDS)//連接嘗試間隔
.setConnectionRetryEnabled(true)//是否連接重試
.setSendTimeout(6,TimeUnit.SECONDS)//設(shè)置發(fā)送超時(shí)
.setHeartIntervalBackground(30,TimeUnit.SECONDS)//后臺(tái)心跳間隔
.setEventListener(eventListener!=null?eventListener:new DefaultEventListener(userId)) //事件監(jiān)聽(tīng),可選
.setMsgTriggerReconnectEnabled(true) //如果連接已經(jīng)斷開(kāi),消息發(fā)送是否觸發(fā)重連
.setProtocol(protocol) //哪種協(xié)議 IMProtocol.PRIVATE、IMProtocol.WEB_SOCKET、IMProtocol.UDP
.setOpenLog(true);//是否開(kāi)啟日志
- TCP協(xié)議配置
//以下支持兩種數(shù)據(jù)傳輸格式,一種protobuf,一種string格式
builder.setCodec(codecType == 0?new DefaultTcpProtobufCodec():new DefaultTcpStringCodec())//默認(rèn)的編解碼,開(kāi)發(fā)者可以使用自己的protobuf或者其他格式的編解碼
.setShakeHands(codecType == 0? new DefaultProtobufMessageShakeHandsHandler(getDefaultTcpHands()):new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //設(shè)置握手認(rèn)證,可選
.setHeartBeatMsg(codecType == 0? getDefaultProtobufHeart(): getDefaultStringHeart()) //設(shè)置心跳,可選
.setAckConsumer(codecType == 0?new DefaultProtobufAckConsumer():new DefaultStringAckConsumer()) //設(shè)置消息確認(rèn)機(jī)制,如果需要消息回執(zhí),必選
.registerMessageHandler(codecType == 0?new DefaultProtobufMessageReceiveHandler(onMessageArriveListener):new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收處理器
.registerMessageHandler(codecType == 0?new DefaultReplyReceiveHandler(onReplyListener):new DefaultStringMessageReplyHandler(onReplyListener)) //消息狀態(tài)接收處理器
.registerMessageHandler(codecType == 0?new DefaultProtobufHeartbeatRespHandler():new DefaultStringHeartbeatRespHandler()) //心跳接收處理器
.setTCPLengthFieldLength(2)//本庫(kù)拆包采用消息頭包含消息長(zhǎng)度的協(xié)議,裝包拆包的長(zhǎng)度字段的占用字節(jié)數(shù),默認(rèn)值為2
.addAddress(new Address(ip,9081,Address.Type.TCP))
.setMaxFrameLength(65535*100); //設(shè)置最大幀長(zhǎng) //私有tcp和websocket生效
- WebSocket協(xié)議配置
builder.setHeartBeatMsg(getDefaultWsHeart())
.setAckConsumer(new DefaultWSAckConsumer())
.registerMessageHandler(new DefaultWSMessageReceiveHandler(onMessageArriveListener))
.registerMessageHandler(new DefaultWSMessageReplyHandler(onReplyListener))
.registerMessageHandler(new DefaultWsHeartbeatRespHandler())
.addAddress(new Address(ip,8804,Address.Type.WS))
.setMaxFrameLength(65535*100)
// .addAddress(new Address(ip,8804,Address.Type.WSS))//支持WSS協(xié)議,請(qǐng)?jiān)趕cheme帶上wss標(biāo)識(shí)
.addWsHeader("user",userId); //webSocket特有的,可以用來(lái)鑒權(quán)使用
- UDP協(xié)議配置
builder.setCodec(new DefaultUdpStringCodec(new InetSocketAddress(ip,8804), CharsetUtil.UTF_8)) //String的編解碼,開(kāi)發(fā)者可以設(shè)定為自己的格式
.setShakeHands(new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //設(shè)置握手認(rèn)證,可選
.setHeartBeatMsg(getDefaultStringHeart()) //設(shè)置心跳,可選
.setAckConsumer(new DefaultStringAckConsumer()) //設(shè)置確認(rèn)機(jī)制
.registerMessageHandler(new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收處理器
.registerMessageHandler(new DefaultStringMessageReplyHandler(onReplyListener)) //消息狀態(tài)接收處理器
.registerMessageHandler(new DefaultStringHeartbeatRespHandler()) //心跳接收處理器
.addAddress(new Address(ip, 8804, Address.Type.UDP));
上述很多配置項(xiàng)都是可選項(xiàng),例如你沒(méi)有握手的要求、沒(méi)有心跳設(shè)計(jì)、沒(méi)有消息回執(zhí),setShakeHands、setHeartBeatMsg、setAckConsumer都可以是不設(shè)置的。所有的Default開(kāi)頭的實(shí)現(xiàn),開(kāi)發(fā)者都可以替換成自己的實(shí)現(xiàn)類(lèi)。
整個(gè)框架的核心實(shí)現(xiàn)在幾個(gè)內(nèi)置攔截器中:
Response getResponseWithInterceptorChain(SubsequentCallback callback) throws IOException, InterruptedException, AuthException, SendTimeoutException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
if (client.interceptors()!=null&&client.interceptors().size()>0){
interceptors.addAll(client.interceptors());
}
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client));
// interceptors.add(new CacheInterceptor());
interceptors.add(new ConnectInterceptor(client));
interceptors.add(new CallServerInterceptor(callback));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest,this, eventListener,client.connectTimeout(),
client.sendTimeout());
return chain.proceed(originalRequest);
}
是不是似曾相識(shí)的感覺(jué),這里攔截器功能和okhttp雷同,retryAndFollowUpInterceptor進(jìn)行連接重試、發(fā)送重試、地址切換,BridgeInterceptor主要進(jìn)行數(shù)據(jù)的裝配,ConnectInterceptor是真正的進(jìn)行連接和一些編解碼器等一些配置的地方,CallServerInterceptor進(jìn)行數(shù)據(jù)的寫(xiě)和讀,,完成這一套攔截器,那么我們整體一個(gè)從建立連接到消息發(fā)送和接收的大致流程就有了。
五、鑒權(quán)設(shè)計(jì)
鑒權(quán)設(shè)計(jì)是保證通訊的安全性和可靠性的重要手段。一般來(lái)說(shuō),通訊SDK鑒權(quán)設(shè)計(jì)需要考慮以下幾個(gè)方面:
- 身份認(rèn)證:通訊SDK需要驗(yàn)證客戶端的身份,以確保只有合法的客戶端才能使用SDK提供的服務(wù)。身份認(rèn)證可以采用用戶名密碼、API密鑰等方式進(jìn)行。
- 數(shù)據(jù)加密:通訊SDK需要對(duì)通訊過(guò)程中的數(shù)據(jù)進(jìn)行加密,以保證數(shù)據(jù)的機(jī)密性和完整性。數(shù)據(jù)加密可以采用對(duì)稱(chēng)加密、非對(duì)稱(chēng)加密等方式進(jìn)行。
- 防止中間人攻擊:通訊SDK需要防止中間人攻擊,以保證通訊過(guò)程中的數(shù)據(jù)不被篡改。防中間人攻擊可以采用數(shù)字證書(shū)、SSL/TLS等方式進(jìn)行
- ...等
本庫(kù)采用了身份認(rèn)證,即在消息通訊之前要先與服務(wù)端進(jìn)行身份認(rèn)證。如果需要握手認(rèn)證,在TCP和UDP協(xié)議上開(kāi)發(fā)者需要添加以下配置:
builder.setShakeHands(MessageShakeHandsHandler shakeHandler) //配置握手鑒權(quán)機(jī)制
public interface MessageShakeHandsHandler<K extends Object,T extends Object> {
/**
* 發(fā)送給服務(wù)端的握手包
* @return
*/
K ShakeHands();
/**
* 是否是握手包回應(yīng)包 * @param msg
* @return
*/
boolean isShakeHands(Object msg);
/**
* 客戶端端自己判斷返回的握手認(rèn)證回應(yīng)包是否成功
* @param pack
* @return
*/
boolean isShakeHandsOk(T pack) ;
}
接口中的ShakeHands()方法將在連接建立后會(huì)調(diào)用,發(fā)起一個(gè)握手認(rèn)證,當(dāng)服務(wù)端返回消息后會(huì)經(jīng)過(guò)isShakeHands(Object msg)判別是否是握手響應(yīng)包,是則走isShakeHandsOk(T pack),否則消息流轉(zhuǎn)到下一個(gè)消息處理器。當(dāng)isShakeHandsOk(T pack)返回值代表是否成功,true后會(huì)建立心跳機(jī)制如果有設(shè)置的話,fasle會(huì)立馬斷開(kāi)此連接。
在websocket協(xié)議上由于已存在握手過(guò)程,所以我們不需要自己去寫(xiě)這個(gè)過(guò)程,我們可以在websocket協(xié)議頭上帶上我們的header,填上我們的認(rèn)證信息,然后服務(wù)端對(duì)header去做判斷即可。在websocket協(xié)議上開(kāi)發(fā)者需添加以下配置:
builder.addWsHeader(String key, String value) //添加websocket的協(xié)議頭
總結(jié):
在tcp和websocket下,這里的握手認(rèn)證包,建議開(kāi)發(fā)者可以設(shè)置為用戶id+token的組合形式,用戶id可以知道這個(gè)會(huì)話來(lái)自哪個(gè)客戶端,token用來(lái)檢查連接發(fā)起的是否合法。udp協(xié)議雖然沒(méi)有連接,但是依然可以在業(yè)務(wù)上去做握手認(rèn)證,如果服務(wù)端一直收到一個(gè)用戶的包但之前沒(méi)有做握手認(rèn)證,服務(wù)端可以拒絕處理業(yè)務(wù)或者一些其他處理。
六、心跳設(shè)計(jì)
TCP協(xié)議實(shí)現(xiàn)中是有?;顧C(jī)制的,也就是TCP的KeepAlive機(jī)制,大概就是如果一個(gè)TCP連接在7200秒(2小時(shí))內(nèi)沒(méi)有活動(dòng),則內(nèi)核將發(fā)送9個(gè)keepalive消息,每個(gè)消息之間相隔75秒。如果在發(fā)送完所有keepalive消息后仍然沒(méi)有收到響應(yīng),則連接將被關(guān)閉。這些默認(rèn)參數(shù)顯然是不能滿足我們的要求的。另外還幾個(gè)比較重要的原因,例如NAT超時(shí)、服務(wù)器判斷設(shè)備是否還在線等原因, 所以我們需要實(shí)現(xiàn)自己的一個(gè)心跳機(jī)制。
- 設(shè)置心跳包和心跳間隔
//設(shè)置心跳包,heartBeatMsg的數(shù)據(jù)類(lèi)型一定要是你的編解碼器所支持的格式
builder.setHeartBeatMsg(Object heartBeatMsg)
//設(shè)置前臺(tái)的心跳間隔,這里的間隔是指在無(wú)任何消息發(fā)送情況下的空閑時(shí)間,而不是固定的間隔時(shí)間發(fā)送心跳包
builder.setHeartIntervalForeground(int interval, TimeUnit unit)
builder.setHeartIntervalBackground(long interval, TimeUnit unit)//設(shè)置后臺(tái)的心跳間隔
- 設(shè)置讀空閑和讀空閑是否觸發(fā)重連
//設(shè)置讀空閑是否觸發(fā)重連,如果為true則一段時(shí)間內(nèi)一直如果沒(méi)有收到服務(wù)端返回的任何消息,則觸發(fā)重新連接,false的話,設(shè)置讀空閑的配置失效,不觸發(fā)重連。
builder.setReaderIdleReconnectEnabled(boolean readerIdleReconnectEnabled)
builder.setReaderIdleTimeForeground(long interval, TimeUnit unit)//設(shè)置前臺(tái)讀空閑時(shí)間
setReaderIdleTimeBackground(long interval, TimeUnit unit) //設(shè)置后臺(tái)讀空閑時(shí)間
總結(jié):
其實(shí)心跳機(jī)制還有很多事情是可以做的,不僅是?;?、判斷在線這些,我們還可以利用心跳的RTT(Round-Trip Time,往返時(shí)間)去判斷網(wǎng)絡(luò)情況,我們是否要控制消息發(fā)送速度或者更改連接、改變心跳間隔。因?yàn)橐粋€(gè)網(wǎng)絡(luò)不佳的情況下,我們頻繁去做的消息發(fā)送,消息的延遲和阻塞是必然的,還占用了沒(méi)必要的帶寬。
七、消息確認(rèn)設(shè)計(jì)
TCP協(xié)議是個(gè)可靠面向流的傳輸協(xié)議,內(nèi)部既有確認(rèn)機(jī)制且保證數(shù)據(jù)有序,那我們?yōu)槭裁催€要進(jìn)行ACK機(jī)制設(shè)計(jì)呢,TCP是傳輸層協(xié)議它只能保證傳輸層的可靠,是端到端的,但并不能保證應(yīng)用層的可靠性,例如你應(yīng)用在接收到數(shù)據(jù)的時(shí)候發(fā)生異常,這個(gè)消息是不是就丟了。再比如在高并發(fā)、高負(fù)載、高延遲、不穩(wěn)定網(wǎng)絡(luò)等情況下,TCP 協(xié)議的性能會(huì)受到受到很大影響,且整個(gè)IM系統(tǒng)也不能保證可靠性,對(duì)于一個(gè)IM系統(tǒng)來(lái)說(shuō),可靠的定義至少是不丟消息、消息不重復(fù)、不亂序,這樣才算一個(gè)比較穩(wěn)定的IM。
- 不丟消息
要保證消息的不丟失,可以模擬TCP協(xié)議中的ACK機(jī)制,我們定義一套業(yè)務(wù)層的ACK機(jī)制,只要當(dāng)對(duì)方回了ACK我們才認(rèn)為對(duì)方已經(jīng)收到消息。例如有ClientA --> Server--> ClientB,ClientA發(fā)送消息給Server時(shí)候,需要等待Server返回ACK代表已發(fā)送的,而Server轉(zhuǎn)發(fā)消息給ClientB,需要等待ClientB返回ACK才代表ClientB收到,例如ClientB不在線Server可以把消息儲(chǔ)存起來(lái),等ClientB上線主動(dòng)推送離線消息或者等ClientB上線主動(dòng)拉去。如果ACK回執(zhí)的消息也丟失了呢,這需要去做個(gè)消息發(fā)送重試機(jī)制,如果一定的時(shí)間內(nèi)沒(méi)有收到ACK則重發(fā)此消息,重試一定次數(shù)都沒(méi)有收到ACK則認(rèn)為消息發(fā)送失敗。
- 消息不重復(fù)
在上述的重試機(jī)制中,就可能出現(xiàn)消息重復(fù)的問(wèn)題,例如一端發(fā)送消息給服務(wù)端,在等待ACK的超時(shí)后,客服端重新發(fā)送消息,發(fā)送完后才收到ACK,這樣其實(shí)服務(wù)端就收到兩條一樣的消息。消息去重處理方式比較簡(jiǎn)單,每個(gè)消息帶上一個(gè)msgId,如果已經(jīng)接收到同樣的msgId則直接回ACK,不需要額外處理。
- 不亂序
保證消息的有序,可以借鑒TCP協(xié)議中使用序列號(hào),服務(wù)端為每一個(gè)消息都編上一個(gè)序號(hào)seqid,客戶端根據(jù)服務(wù)端返回的消息回執(zhí)中的seqid去為消息進(jìn)行排序。這樣就可以根據(jù)序列號(hào)和確認(rèn)號(hào)來(lái)保證數(shù)據(jù)的有序傳輸。
在本庫(kù)中實(shí)現(xiàn)了消息確認(rèn)機(jī)制和消息重傳機(jī)制,而消息的去重和消息的排序需要業(yè)務(wù)層自己去實(shí)現(xiàn)。
1、注冊(cè)一個(gè)消息確認(rèn)機(jī)制
/**
* 設(shè)置ACK機(jī)制,如果設(shè)置了,在request里有needACK,則必須收到ACK包 不然會(huì)回調(diào)onFailure
*
* @param ackConsumer
* @return
*/
public Builder setAckConsumer(Consumer ackConsumer) {
this.ackConsumer = ackConsumer;
return this;
}
/**
* 用于特定消息的消費(fèi),例子:我發(fā)送了一個(gè)特別的消息,然后想訂閱該特定消息的后續(xù)響應(yīng)
* @param <T>
*/
public interface Consumer<T extends Object> {
/**
*
* @param t 接收的消息
* @param requestTag 消息的唯一標(biāo)識(shí)
* @return
*/
boolean Observable(T t,String requestTag);
/**
* 處理該消息
* @param t
*/
void accept(T t);
}
接口中的Observable(T t,String requestTag)如果返回true,則代表消息已經(jīng)收到ack,走發(fā)送成功回調(diào)。如果是fasle則會(huì)一直等待一個(gè)消息的發(fā)送周期,超時(shí)、重發(fā)、重試,如果一個(gè)周期里都沒(méi)有正確的ack返回則消息發(fā)送失敗。
accept(T t)會(huì)在Observable(T t,String requestTag)方法返回ture的時(shí)候回調(diào),去處理該消息。
2、構(gòu)建一個(gè)需要消息確認(rèn)回執(zhí)的消息
Request.Builder builder = new Request.Builder().
setNeedACK(appMessage.getHead().getMsgId()) //此消息需要ack,且此消息的ack包的tag是msgId
.setSendRetry(true) // 此消息是否超時(shí)重發(fā),如果一定時(shí)間內(nèi)沒(méi)有收到ACK重發(fā)此消息
.setBody(getStringMsgPack(appMessage)) //發(fā)送的消息內(nèi)容
.build();
Builder setAckConsumer(Consumer ackConsumer)
3、發(fā)送消息
public void sendMsg(Request request, Callback callback){
imClient.newCall(request).enqueue(callback);
}
總結(jié):
如果你的消息是有序的,那么你可以通過(guò)Delay ACk即延遲發(fā)送ack,而不是對(duì)每個(gè)消息都要進(jìn)行確認(rèn),你可以在一段時(shí)間內(nèi)回一個(gè)ack(包含序號(hào))或者在傳輸數(shù)據(jù)的時(shí)候順帶攜帶一個(gè)ack信息,這樣可減少帶寬,提供利用率。收到一個(gè)ack代表ack序號(hào)之前的數(shù)據(jù)都準(zhǔn)確無(wú)誤的收到。
八、寫(xiě)在最后
感謝大家的閱讀!也歡迎大家指出問(wèn)題、提交issue或者評(píng)論都可以哈,希望此篇文章對(duì)你有幫助。Github地址