前言
最開始在Kafka 概述中提到了mirc-batch(微批處理),mirc-batch是Kafka 高性能的一個非常重要的原因,這一下子就使Kafka 成為了一個擁有近乎流式處理框架的的高吞吐級別,但是mirc相對于流式處理還是存在很大差異的,但是一些所謂的流式處理框架使用的也有mirc-batch(比如說spark Streaming),當(dāng)然啦一些正統(tǒng)的流式處理框架,比如說storm、Flink使用的都是典型的流式處理。
本文按照 批處理、微批處理、流式處理來說一下為什么Kafka選擇了micr-batch。
在介紹之前先說一下幾個經(jīng)典概念:
響應(yīng)時間:
響應(yīng)時間通常是評定一個系統(tǒng)或者網(wǎng)站最直觀的感受,狹義上來說響應(yīng)時間是指系統(tǒng)對于請求作出響應(yīng)的時間,但是現(xiàn)在對于響應(yīng)時間有了更多表現(xiàn),比如說前端的首屏加載時長等,也是對于響應(yīng)時間的綜合表現(xiàn)(不僅是一個單系統(tǒng)服務(wù),更多是各方綜合的結(jié)果)。
吞吐量&高吞吐:
吞吐量最直觀的概念是指系統(tǒng)在單位時間內(nèi)所處理請求的數(shù)量。對于無并發(fā)的系統(tǒng),吞吐量和響應(yīng)時間是嚴(yán)格的反比關(guān)系。歷史上并發(fā)的出現(xiàn)打破了這個規(guī)律,也為提升吞吐量帶來的新的生機。對于單用戶系統(tǒng)來說,可能響應(yīng)時間是最重要的,而對于現(xiàn)在互聯(lián)網(wǎng)大多數(shù)服務(wù)而言,吞吐量可能是最重要的(當(dāng)然啦可用性什么也非常重要)。所謂的高吞吐就是說可以持有非常高吞吐量的一個表現(xiàn)了。
時延
標(biāo)準(zhǔn)的定義是指數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)或者鏈路從一端到另一端的所消耗的時間,時延其實是分很多種的,發(fā)送時延、傳輸時延等,但是其實概念基本上是類似的,從一個點到另一個點,這個點可能是狀態(tài)也可能是操作與操作之間的時間間隔,在kafka中所指的通常就是消息時延,這個特性對于一個消息系統(tǒng)來說是十分重要的,比如producer所能提交消息的速度。這個在kafka里和吞吐量是十分相關(guān)的。
時延和吞吐量通常是無法同時兼顧的,我們在提升一個指標(biāo)的同時,可能要犧牲另一個,所以要根據(jù)具體的業(yè)務(wù)場景來做一個衡量。
批處理
批處理是指一改當(dāng)初串行處理的模式,作業(yè)出現(xiàn)后就立馬進行處理,而是說按批次對于作業(yè)(請求)進行處理。批處理具有一個典型的特點,就是吞吐量高,CPU利用率十分出色。具體來說就是把具體的要處理的數(shù)據(jù)(作業(yè) || 請求)按照性質(zhì)或者某些屬性進行分組或者分批,再成組或者成批的提交到對應(yīng)的計算系統(tǒng)。批處理出現(xiàn)的非常早,回一下當(dāng)初的操作系統(tǒng)可能就對于早期的多道批處理系統(tǒng)&單道批處理系統(tǒng)有印象了。
通常來說,批處理是一種將作業(yè)提交給計算系統(tǒng)后就不再干預(yù),通常是非常低的交互性或者根本無交互性可言。業(yè)界有非常多經(jīng)典的實現(xiàn)比如說Hadoop(MapReduce) 計算,根據(jù)系統(tǒng)的特性,我們通常會發(fā)現(xiàn)批處理所處理的作業(yè)或者數(shù)據(jù)都是些龐大并且離線已經(jīng)存儲好的數(shù)據(jù)(有界、持久、海量),都是些對實時性幾乎沒有什么要求的場景,比如大數(shù)據(jù)報表的生產(chǎn)、模型的訓(xùn)練。
流式處理
流式處理是指對于隨時可能進入系統(tǒng)的數(shù)據(jù)進行計算處理,相對于批處理來說算是種截然不同的處理方式,無需正對整個數(shù)據(jù)集進行計算操作,而是說來了就干,實時性非常好,處理速度快,結(jié)果立馬可用,同一時間僅處理一條數(shù)據(jù)。
常見的流式處理框架有storm、Flink(Spark Streaming 嚴(yán)格意義上來說不算是流式處理),流式處理通常用于,分析監(jiān)控對實時性要求非常高的系統(tǒng)的錯誤日志,或者其他以時間為衡量標(biāo)準(zhǔn)的數(shù)據(jù)流。
微批處理
micr-batch 是一種借鑒了批處理及流式處理的特性,針對吞吐量及時延做了下兼顧(通常是適當(dāng)?shù)膿p失時延 來提升吞吐量)。批的數(shù)量或者規(guī)則不再那么大,而是劃分為小批次或者微批次,從而提升吞吐量的同時,對于時延方面,別做出那么大的損失。
來看看Kafka的實現(xiàn),因為是一個實時的消息系統(tǒng),所以說純粹的批處理不現(xiàn)實,比較下單純串行處理方式吞吐量又不夠,所以Kafka 采用了micr-batch的處理方式。重新來看看這張producer的圖:

在producer端消息被生產(chǎn)之后并不是直接發(fā)送的,而是在accumulator上緩存一下,然后集中發(fā)送出,這樣就簡單的實現(xiàn)了micr-batch,那帶來的改善是什么樣子的呢?繼續(xù)看
比如說Kafka處理一條消息需要2ms,那么對應(yīng)的吞吐量最多500,時延為2ms。
現(xiàn)在把Kafka 消息不立即發(fā)送而是說等一等一塊發(fā),等大約8ms,假設(shè)這段時間積攢了500條消息。
看一下吞吐量的變化 :5000/(0.002s +0.008s)=50000,提升了大約100倍,如果積攢的消息數(shù)量是100條,那么帶來的提升就可能是200倍,這個跟kafka producer 消息的生產(chǎn)速度是非常相關(guān)的(決定了所能帶來的提升,需要根據(jù)具體的場景來確定等待的數(shù)量,這里producer是通過對應(yīng)參數(shù)來控制的:
batch:
buffer.memory 指定producer待發(fā)送消息緩沖區(qū)的內(nèi)存大小,默認(rèn)32m,如果需要更改就使用這個參數(shù)進行修改。這里需要注意的是當(dāng)producer端寫消息的速度超過了專屬IO線程發(fā)送消息的速度,并且緩沖區(qū)的消息數(shù)量超過buffer.memory指定的大小時,producer會拋出異常通知用戶介入處理,這個緩沖區(qū)的大小需要根據(jù)實際場景來確定。
batch.size 指一個batch的大小,它直接決定了一個batch中存在的消息數(shù)量,這個直接與producer的吞吐量及延時等直接相關(guān)。
linger.size
producer端會專門劃出一部分內(nèi)存用于待發(fā)送消息的緩存,batch.size決定了發(fā)送消息數(shù)量,同時間接決定了消息緩存時存在的延時。linger.size 就是針對這一點設(shè)計出來的,它決定了消息被投放進緩沖區(qū)時是否立馬被發(fā)送,默認(rèn)參數(shù)是0(立即發(fā)送),這個大多數(shù)情況下是合理的,但是會很大程度上拉低kafka的吞吐量。
關(guān)于上述的一些處理特性,我們除了需要了解之后更好的去使用Kafka,感覺更需要學(xué)會這種解決問題的思路,對于一些需要吞吐量的場景也可以去借鑒這種micr-batch 的實現(xiàn)。