1、過期時間(TTL)
????????通過消息的timestamp字段和ConsumerInterface接口的onConsumer()方法可實現(xiàn)消息的TTL功能,但是有一個局限,就是每條消息的超時時間都是一樣的。
????????我們可以通過消息中的headers字段來做配合,可以將TTL的值設(shè)置為鍵值對的形式保存在消息的headers字段中
2、延時隊列
????????使用攔截器可以來實現(xiàn)延時隊列,但是局限性很大。比如拉取到的消息集合中有一條消息的延時時間很長,其他的消息延時時間很短而被消費,那么該如何處理?
????????如果此時提交消費位移,那么延時時間很長的那條消息會丟失。
????????如果這時不繼續(xù)拉取消息而等待這條延時時間很長的時間到達(dá)延時時間,這樣又會導(dǎo)致消費滯后很多,而且如果這條消息后面的很多消息的延時時間很短,那么也會被這條消息無端地拉長延時時間,從而大大地降低了延時的精度。
????????如果這個時候不提交消費位移而繼續(xù)拉取消息,等待這條延時時間很長的消息滿足條件再提交消費位移,那么在此期間這條消息要駐留在內(nèi)存中,而且需要一個定時機(jī)制來定時檢測是否滿足被消費的條件,那么這類消息很多時必定會引起內(nèi)存的暴漲。
????????有一種方案,在發(fā)送延時消息的時候先不是投遞到要發(fā)送的真實主題(real_topic)中,而是先投遞到一些Kafka內(nèi)部的主題(delay_topic)中,這些內(nèi)部主題對用戶不可見,然后通過一個自定義的服務(wù)拉取這些內(nèi)部主題中的消息,并將滿足條件的消息再投遞到要發(fā)送的真實主題中,消費者所訂閱的還是真實的主題。
????????一般是按照不同的延時等級來劃分的,比如設(shè)定5s、10s、2min、10min等等這些按照延時時間遞增的延時等級,延時的消息按照延時時間投遞到不同等級的主題中,投遞到同一主題中的延時時間會被強(qiáng)制轉(zhuǎn)換為與此延時等級一致的延時時間。雖然有一定的延時誤差,但是誤差可控。
????????每條消息的延時時間可借用timestamp、headers字段。
3、消息路由
????????Kafka默認(rèn)按照主題來路由,也就是說,消息發(fā)往主題之后會被訂閱的消費者全盤接收,這里沒有類似消息路由的功能進(jìn)行二次路由。
????????具體實現(xiàn)方法,可在消息的headers字段中加入一個鍵為“routingkey”、值為特定業(yè)務(wù)標(biāo)示的Header,然后在消費端中使用攔截器挑選出特定業(yè)務(wù)標(biāo)示的信息。多個消費者組訂閱,不同的消費者組根據(jù)不同的routingkey來進(jìn)行消費。
4、消息中間件的選型
? ? ? ? 1、功能維度
? ? ? ? 首先是功能維度,這個決定了能否最大限度地實現(xiàn)開箱即用,繼而縮短項目周期、降低成本等。如果一款消息中間件達(dá)不到需求,那么就需要進(jìn)行二次開發(fā),會增加項目的技術(shù)難度、復(fù)雜度以及延長項目周期。比如優(yōu)先隊列、延時隊列、重試隊列、消費模式、廣播消費、回溯消費、消息堆積+持久化、消息順序性等等。
? ? ? ? 2、性能維度
????????雖然從功能維度上來說,RabbitMQ的優(yōu)勢要大于Kafka,但是Kafka的吞吐量要比RabbitMQ高出1至2個數(shù)量級。
? ? ????3、可靠性和可用性
????????消息丟失是消息中間件不得不面對的一個痛點,消息可靠性是衡量消息中間件好壞的一個關(guān)鍵因素。消息可靠性是指對消息不丟失的保障程度,而消息中間件的可用性是指無故障運行的時間百分比,通常用幾個9來衡量。
? ? ????4、運維管理
? ? ? ? 5、社區(qū)力度及生態(tài)發(fā)展
????????消息中間件選型切記一位地追求性能或功能,性能可以優(yōu)化,功能可以二次開發(fā)。如果要在功能和性能方面做一個抉擇,那么首選性能,因為總體上來說性能優(yōu)化的空間沒有功能擴(kuò)展的空間大。