簡介
? Kafka起初是由LinkedIn公司采用Scala語言開發(fā)的一個多分區(qū)、多副本且基于Zookeeper協(xié)調(diào)的分布式消息系統(tǒng),現(xiàn)已捐獻給Apache基金會。目前Kafka已經(jīng)被定為為一個分布式流式處理平臺,它以高吞吐、可持久化、可水平拓展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用。
? Kafka之所以受到越來越多的青睞,與它所”扮演“的三大角色是分不開的:
? 消息系統(tǒng):作為消息中間件,具備系統(tǒng)解耦、冗余存儲、流量削峰、緩沖、異步通信、擴展性、可恢復性等功能。與此同時,Kafka還提供了大多數(shù)消息系統(tǒng)難以實現(xiàn)的消息順序性保障及回溯消費等功能。
? 存儲系統(tǒng):Kafka把消息持久化到磁盤,有效降低了數(shù)據(jù)丟失的風險。也正是得益于Kafka的消息持久化功能和多副本機制,我們可以把Kafka作為長期的數(shù)據(jù)存儲系統(tǒng)來使用,只需要把對應的數(shù)據(jù)保留策略設置為“永久”或啟用主題的日志壓縮功能即可。
? 流式處理平臺:
消息中間件MQ使用場景
· 解耦:
未使用MQ的耦合場景:
? 現(xiàn)有A服務在自己代碼中調(diào)用B服務的接口和C服務的接口發(fā)送數(shù)據(jù)

? 此時新增D服務也需要A服務發(fā)送數(shù)據(jù),則需要A服務在自己代碼里修改,發(fā)送數(shù)據(jù)給D服務,緊接著C服務又說不需要A服務給自己發(fā)送數(shù)據(jù)了

? 負責A服務的人還得考慮,如果調(diào)用的B服務掛了怎么辦?如果D服務訪問超時怎么辦?由于A服務產(chǎn)生了比較關鍵的數(shù)據(jù),許多服務需要A服務發(fā)送該數(shù)據(jù)過來,這也導致了A服務與其他服務的嚴重耦合。
使用MQ解耦場景:

我們自己使用的場景

· 異步:
未使用MQ的同步高延時請求場景:
? 現(xiàn)有一用戶請求,調(diào)用服務A接口

? 我們來計算一下,服務A先是在自己本地執(zhí)行SQL,然后調(diào)用了服務B、服務C和服務D的接口,4個步驟下來,需要耗時的總時長為970ms。用戶通過瀏覽器發(fā)起請求,等待1秒才得到響應,體驗比較差。一般對于用戶的直接的操作,要求是每個請求都必須在200ms內(nèi)完成,對用戶幾乎是無感知的。
使用MQ進行異步化:
[圖片上傳失敗...(image-dc15f-1571550202627)]
? 使用MQ進行異步化之后,此時用戶發(fā)起請求調(diào)用服務A的總耗時變成了20+5=25ms。
· 削峰:
未使用MQ削峰大量用戶請求場景:

使用MQ進行削峰場景:

? MQ中每秒有2000個請求進來,就只有1000個請求出去,結果就是導致在高峰期(假設1個小時)可能有幾十萬甚至上百萬的請求積壓在MQ中,但是高峰期過后,每秒鐘只有20個請求,系統(tǒng)還是會按照每秒1000個請求的速度處理,差不多1個多小時就可以把積壓的上百萬條消息給處理掉,就沒有積壓了。
引入MQ后可能存在的一些問題
系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,越容易掛掉。拿上圖舉例,MQ一旦故障,A服務沒發(fā)發(fā)送消息到MQ了,然后BCD服務也沒發(fā)消費到消息了,整個系統(tǒng)就崩潰了,沒法運轉了。
系統(tǒng)復雜性提高:消息丟失,消息重復,消息順序性問題如何保證?例如A服務本來只需要給B服務發(fā)送一條數(shù)據(jù)就可以了,結果因為A服務和MQ之間協(xié)調(diào)出現(xiàn)問題,A服務不小心把同一條數(shù)據(jù)發(fā)了兩次到MQ中給B服務消費,導致B服務插入兩條一模一樣的數(shù)據(jù)。
一致性問題:如A服務處理完了直接返回成功了,都認為這個請求成功了,但是要BCD服務都寫庫成功才是真正的成功,如果其中有一個寫庫失敗了,這樣數(shù)據(jù)就不一致了。
典型的Kafka體系架構

先簡單介紹下Kafka中的術語:
(1)Producer:生產(chǎn)者,也就是發(fā)送消息的一方。生產(chǎn)者負責創(chuàng)建消息,然后將其投遞到Kafka中。
(2)Consumer:消費者,也就是接收消息的一方。消費者連接到Kafka上并接收消息,進而進行相應的業(yè)務邏輯處理。
(3)Broker:服務代理節(jié)點??梢詫⑵淇醋鲆慌_服務器上部署的一臺Kafka服務器,前提是這臺服務器上只部署了一個Kafka實例。一個或多個Broker組成了一個Kafka集群。
(4)Topic:主題。Kafka中的消息以主題為單位進行歸類,生產(chǎn)者負責將消息發(fā)送到特定的主題,而消費者負責訂閱主題并進行消費。
(5)Partition:分區(qū)。一個topic可以分為多個partition,每個partition是一個有序的隊列。
(6)offset:偏移量。同一個topic下的不同partition包含的消息是不同的,partition在存儲層面可以看作一個可追加的日志文件,消息在被追加到分區(qū)日志的時候都會分配一個特定的偏移量(offset)。offset是消息在分區(qū)中的唯一標識,Kafka通過它來保證消息在分區(qū)中的順序性,不過offset并不跨越分區(qū),也就是說,Kafka保證的是分區(qū)有序而不是主題有序。
如圖,某個主題中有3個分區(qū),消息被順序追加到每個分區(qū)日志文件的尾部。Kafka中的分區(qū)可以分布在不同的broker上,也就是說,一個topic的數(shù)據(jù)可以分布在多個broker上

? Kafka之所以將topic分成多個分區(qū),分布在不同的broker上,就是提供負載均衡的能力。
Kafka多副本機制
? Kafka每個主題可劃分為多個分區(qū),每個分區(qū)又配置有多個副本(Replica)。Kafka為分區(qū)引入了多副本機制,通過增加副本數(shù)量可以提升容災能力。同一分區(qū)的不同副本中保存的是相同的消息(在同一時刻,副本之間并非完全一樣),副本之間是“一主多從”的關系,其中l(wèi)eader副本負責處理讀寫請求,follower副本只負責與leader副本的消息同步。副本處于不同的broker中,當leader副本出現(xiàn)故障時,從follower副本中從新選舉新的leader副本對外提供服務。Kafka通過多副本機制實現(xiàn) 了故障的自動轉移,當Kafka集群中某個新的broker失效時,仍然能保證服務可用。

? 如圖所示,Kafka集群中有3個broker,某個topic中有3個分區(qū),且副本因子(即副本個數(shù))也為3,如此每個分區(qū)便有1個leader副本和2個follower副本。生產(chǎn)者和消費者只與leader副本進行交互,而follower副本只負責消息的同步,很多時候follower副本中的消息相對于leader而言會有一定的滯后。
? 分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Replicas)。所有與leader副本保持一定程度的同步的副本(包括leader副本在內(nèi))組成 ISR(In-Sync Replicas),ISR集合是AR集合的一個子集。消息會先發(fā)送到leader副本,然后follower副本才能從leader副本中拉取消息進行同步,同步期間內(nèi)follower副本相對于leader副本會有一定程度的滯后。與leader副本同步滯后過多的副本(不包括leader副本)組成OSR(Out-of-Sync Replicas),由此可見,AR = ISR + OSR。在正常情況下,所有的follower副本都應該與leader副本保持一定程度的同步,即 AR = ISR,OSR集合為空。
? leader副本負責維護和跟蹤ISR集合中所有follower副本的滯后狀態(tài),當follower副本落后太多或失效時,leader副本會把它從ISR集合中剔除。這里的落后程度并不是指follower副本與leader副本相差的消息數(shù),而是指follower副本寫入消息的速度慢于leader副本持續(xù)超過10s(默認參數(shù)),則認為follower副本落后太多。如果OSR集合中所有的follower副本“追上”了leader副本的進度,那么leader副本會把它從OSR集合轉移至ISR集合。默認情況下,當leader副本發(fā)生故障時,只有在ISR集合中的副本才有資格被選舉為新的leader。
? ISR與HW和LEO也有緊密的關系。HW是High Watermark的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消息只能拉取到這個offset之前的消息。

? 如上圖所示,表示一個分區(qū)中各種偏移量的說明。它代表一個日志文件,這個日志文件中有9條消息,第一條消息的offset為0,最后一條消息的offset為8,offset為9代表下一條待寫入的消息的位置。日志文件的HW為6,表示消費者只能拉取到offset在0至5之間的消息,而offset為6的消息對消費者而言是不可見的。LEO是Log End Offset的縮寫,標識當前日志文件下一條待寫入的消息的offset。分區(qū)ISR集合中的每個副本都會維護自身的的LEO,而集合中最小的LEO即為分區(qū)的HW,對消費者而言,只能消費HW之前的消息。下面舉個例子來更好的說明ISR集合與HW和LEO之間的關系:

? Leader副本接收到生產(chǎn)者發(fā)送的消息,寫入本地磁盤后,會更新其LEO值。

? 在同步過程中,不同的follower副本的同步效率也不盡相同。

? 在某一時刻,follower1完全跟上了leader副本而follower2只同步到了消息3,如此leader副本的LEO為5,follower1的LEO為5,follower2的LEO為4,那么當前分區(qū)的HW取最小值4,此時消費者可以消費到offset為0至3之間的消息。

? 所有的消息都成功寫入了消息3和消息4,整個分區(qū)的HW和LEO都變?yōu)?,因此消費者可以消費到offset為4的消息了。由此可見,Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower副本都復制完,這條消息才會被確認為已成功提交,這種復制方式極大地影響了性能。而在異步復制方式下,follower副本異步地從leader副本中復制數(shù)據(jù),數(shù)據(jù)只要被leader副本寫入就被認為已經(jīng)成功提交了。在這種情況下,如果follower副本都還沒有復制完而落后于leader副本,突然leader副本宕機,則會造成數(shù)據(jù)丟失。Kafka使用的這種ISR的方式則有效地權衡了數(shù)據(jù)可靠性和性能之間的關系。
生產(chǎn)者
? 一個正常的生產(chǎn)邏輯為以下幾個步驟:配置客戶端參數(shù),創(chuàng)建相應的生產(chǎn)者實例,構建待發(fā)送的消息,發(fā)送消息。
? 客戶端參數(shù)配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
? bootstrap.servers:該參數(shù)用來指定生產(chǎn)者客戶端連接Kafka集群所需的broker地址清單,格式為host1:port1,host2:port2,這里不一定需要配置所以的broker地址,因為生產(chǎn)者會從給定的broker里找到其他broker信息。但至少配置2個以上,當其中一個宕機了,能夠保證生產(chǎn)者仍然能連接到kafka集群上。key.serializer和value.serializer指定序列化操作的序列化器。這三個參數(shù)都沒有默認值,所以在配置生產(chǎn)者客戶端時是必填的。
properties.put("acks","0");
? acks,可取值0,1,-1,這個參數(shù)是用來指定分區(qū)中必須要有多少個副本接收到這條消息,之后生產(chǎn)者才會認為這條消息寫入成功。默認值為1,生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader副本成功寫入消息,那么它就會收到來自服務端的成功響應。如果消息寫入leader副本并返回成功響應給生產(chǎn)者,且在被其他follower副本拉取前l(fā)eader副本崩潰了,那么此時消息還是會丟失,因為新選舉的leader副本中并沒有這條對應的消息。acks=0,生產(chǎn)者發(fā)送消息之后不需要等待任何服務端的響應。這樣可以達到最大的吞吐量,但是也存在問題,如果在消息發(fā)送到寫入Kafka的過程中出現(xiàn)某些異常,導致Kafka沒有接收到這條消息,那么生產(chǎn)者也不知道,消息也就丟失了。acks=-1或acks=all,生產(chǎn)者在消息發(fā)送之后,需要等待ISR中的所有副本都成功寫入消息之后才能夠收到來自服務端的成功響應。設置成-1可以達到最強的可靠性,但這并不意味著消息就一定可靠,因為如果ISR中可能只有l(wèi)eader副本,這樣就退化成acks=1的情況了。所以acks默認為1,是消息可靠性和吞吐量之間的一個折中方案。
? 構建消息,即創(chuàng)建ProducerRecord對象。
public class ProducerRecord<K,V> {
private final String topic; //主題
private final Integer partition; //分區(qū)號
private final K key; //鍵
private final V value; //值
private final Long timestamp; //消息的時間戳
...
}
? 其中topic和partition字段分別指代消息要發(fā)往的主題和分區(qū)號。value是指消息體,即你要發(fā)送的內(nèi)容。key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算分區(qū)號進而可以讓消息發(fā)往特定的分區(qū)。消息以主題為單位進行歸類,而這個key可以讓消息再進行二次歸類,同一個key的消息會被劃分到同一個分區(qū)中。說到key,這里如果要保證消息的順序性,可以把需要保證消息消費順序的指定同一個key。消息在通過send()方法發(fā)往broker的過程中,有可能需要經(jīng)過攔截器、序列化器和分區(qū)器。攔截器一般不是必需的,但序列化器是必需的。生產(chǎn)者需要用序列化器把對象轉換成字節(jié)數(shù)組才能通過網(wǎng)絡發(fā)送給Kafka。

? 如果在構造消息時在ProducerRecord中指定了partition字段,那么就不需要分區(qū)器的作用,如果沒有指定,那么就需要依賴分區(qū)器根據(jù)key這個字段來計算partition的值。在默認分區(qū)器的方法中,如果key部位null,那么默認的分區(qū)器會對key進行哈希,最終根據(jù)等到的哈希值來計算分區(qū)號,有相同key的消息會被寫入同一個分區(qū)。如果key為null,那么消息將會以輪詢的方式發(fā)往主題內(nèi)的各個可用分區(qū)。
消費者
消費者(Consumer):負責訂閱Kafka中的主題(topic),并且從訂閱的主題上拉取消息。
消費組(Consumer Group):每個消費者都有一個對應的消費組,消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者。

? 如上圖所示,某個主題共有3個分區(qū),有兩個消費組A和B都訂閱了這個主題。按照Kafka默認的規(guī)則,消費組A中每個消費者分配到1個分區(qū),消費組B中C3分配到兩個分區(qū),C4分配到1個分區(qū)。兩個消費組之間互不影響,每個消費組只能消費所分配到的分區(qū)中的消息,換言之,每一個分區(qū)只能被一個消費組中的一個消費者所消費。消費組是一個邏輯上的概念,它將屬于同一組的消費者歸為一類,每一個消費者只隸屬于一個消費組,課通過消費者客戶端參數(shù)group.id來配置消費組。
對于消息中間件一般有兩種消息投遞模式:
點對點(P2P,Point-to-Point)模式:基于隊列,生產(chǎn)者發(fā)送消息到隊列,消費者從隊列中接收消息。一般是一對一。
發(fā)布/訂閱(Pub/Sub)模式:主題作為消息傳遞的中介,生產(chǎn)者將消息發(fā)布到某個主題,消費者可主題中訂閱消息。該模式在消息的一對多廣播時采用。
? Kafka同時支持兩種消息投遞模式,而這正得益于它的消費者與消費組模型設計:
- 如果所有的消費者都隸屬于同一個消費組,那么所有的消息都會被均勻的投遞給每一個消費者,即每條消息只會被一個消費者處理,這就相當于點對點模式。
- 如果所有的消費者都隸屬于不同的消費組,那么所有的消息都會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就相當于發(fā)布/訂閱模式。
? 一個正常的消費邏輯步驟:配置消費者客戶端參數(shù),創(chuàng)建消費者實例,訂閱主題,拉取消息并消費,提交消費位移等。
? 配置必要的消費者客戶端參數(shù),有4個參數(shù)是必填的。同生產(chǎn)者一樣,bootstrap.servers、key.deserializer和value.deserializer三個參數(shù)是必配的,只不過key、value的變成了反序列化器,還有一個group.id配置消費者隸屬的消費組名稱。
props.put(ConsumerConfig.GROUP_ID_CONFIG, "goupA");
? 消息的消費一般有兩種模式:推模式——是服務端主動將消息推送給消費者,拉模式——是消費者主動向服務端發(fā)起請求來拉取消息。Kafka中的消費基于拉模式的。Kafka中的消息消費是一個不斷輪詢的過程,消費者所要做的就是重復地調(diào)用poll()方法,返回所訂閱的主題(分區(qū))上的一組消息。
? 消費者消費到的每條消息類型為ConsumerRecord,這個和生產(chǎn)者發(fā)送的消息類型ProducerRecord對應,不過字段更豐富:
public class ConsumerRecord<K,V> {
private final String topic; //消息所屬主題名稱
private final int partition; //消息所屬分區(qū)編號
private final long offset; //消息所屬分區(qū)偏移量
private final long timestamp; //時間戳
private final TimestampType timestampType; //兩種類型,CreateTime消息創(chuàng)建的時間戳, //LogAppendTime消息追加到日志的時間戳
private final K key;
private final V value; //一般業(yè)務應用所要讀取的值
...
}
位移提交
? Kafka中每條消息都有唯一的offset,表示該消息處在的partition中的位置,叫作“偏移量”。消費者中也有一個offset概念,表示消費者消費到分區(qū)中某個消息所在的位置,我們把它與消息的區(qū)分開,可叫作“位移”。在舊消費者客戶端(用Scala編寫的客戶端版本)中,消費位移是保存在ZooKeeper中的,而在新消費者客戶端(用Java編寫的客戶端)中,消費位移存儲在Kafka內(nèi)部的主題_consumer_offsets中。這里將消費位移存儲起來(持久化)的動作稱為“提交”。

? 當前消費者消費的位移為X,但它需要提交的消費位移不是X,而是X+1,它表示下一條需要拉取的消息的位置。在Kafka中默認的消費位移提交方式是自動提交,提交時間默認為5秒,可通過auto.commit.interval.ms配置。
? 自動提交位移的方式非常簡便,但是也會帶來重復消費和消息丟失的問題。
? 假設剛剛提交完一次消費位移,然后拉取一批消息進行消費,在下一次自動位移提交之前,消費者崩了,那么等消費者恢復再來消費消息的時候又得從上一次位移提交的地方重新開始,這樣便發(fā)生了重復消費的現(xiàn)象。我們可以通過減小位移提交時間間隔來減小重復消息的窗口,但這樣并不能避免重復消費的發(fā)送,而且也會使得位移提交更加頻繁。這里我們可以在拿數(shù)據(jù)寫庫前,根據(jù)主鍵去庫中查詢,如果已有,就update一下好了,若是寫入redis,用set存儲,去重。
?