Kafka常見問題整理

1、Kafka如何防止數(shù)據(jù)丟失

1)消費端弄丟數(shù)據(jù)

? 消費者在消費完消息之后需要執(zhí)行消費位移的提交,該消費位移表示下一條需要拉取的消息的位置。Kafka默認位移提交方式是自動提交,但它不是在你每消費一次數(shù)據(jù)之后就提交一次位移,而是每隔5秒將拉取到的每個分區(qū)中的最大的消費位移進行提交。自動位移提交在正常情況下不會發(fā)生消息丟失或重復(fù)消費的現(xiàn)象,唯一可能的情況,你拉取到消息后,消費者那邊剛好進行了位移提交,Kafka那邊以為你已經(jīng)消費了這條消息,其實你剛開始準備對這條消息進行業(yè)務(wù)處理,但你還沒處理完,然后因為某些原因,自己掛掉了,當你服務(wù)恢復(fù)后再去消費,那就是消費下一條消息了,那么這條未處理的消息就相當于丟失了。所以,很多時候并不是說拉取到消息就算消費完成,而是將消息寫入數(shù)據(jù)庫或緩存中,或者是更加復(fù)雜的業(yè)務(wù)處理,在這些情況下,所有的業(yè)務(wù)處理完成才能認為消息被成功消費。Kafka也提供了對位移提交進行手動提交的方式,開啟手動提交的前提是消費者客戶端參數(shù)enable.auto.commit配置為false,

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

? 消費者端手動提交方式提供了兩種,commitSync()同步提交方式和commitAsync()異步提交方式。commitSync()同步提交方式在調(diào)用時Consumer程序會處于阻塞狀態(tài),直到遠端的broker返回提交結(jié)果,這個狀態(tài)才會結(jié)束,這樣會對消費者的性能有一定的影響。commitAsync()異步提交方式在執(zhí)行后會立刻返回,不會被阻塞,但是它也有相應(yīng)的問題產(chǎn)生,如果異步提交失敗后,它雖然也有重試,但是重試提交的位移值可能早已經(jīng)“過期”或者不是最新的值了,因此異步提交的重試其實沒有意義。這里我們可以把同步提交和異步提交相結(jié)合,以達到最理想的效果。

 try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    // 處理消息 record
                }
                consumer.commitAsync();
            }
        } catch (Exception e){
            // 處理異常
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

2)Kafka端弄丟數(shù)據(jù)

? 如下圖,副本A為leader副本,副本B為follower副本,它們的HW和LEO都為4。

?
image

? 此時,A中寫入一條消息,它的LEO更新為5,B從A中同步了這條數(shù)據(jù),自己的LEO也更新為5

image

? 之后B再向A發(fā)起請求以拉取數(shù)據(jù),該FetchRequest請求中帶上了B中的LEO信息,A在收到該請求后根據(jù)B的LEO值更新了自己的HW為5,A中雖然沒有更多的消息,但還是在延時一段時間之后返回FetchRresponse,其中也包含了HW信息,最后B根據(jù)返回的HW信息更新自己的HW為5。

image

? 可以看到整個過程中兩者之間的HW同步有一個間隙,B在同步A中的消息之后需要再一輪的FetchRequest/FetchResponse才能更新自身的HW為5。如果在更新HW之前,B宕機了,那么B在重啟之后會根據(jù)之前HW位置進行日志截斷,這樣便會將4這條消息截斷,然后再向A發(fā)送請求拉取消息。此時若A再宕機,那么B就會被選舉為新的leader。B恢復(fù)之后會成為follower,由于follower副本的HW不能比leader副本的HW高,所以還會做一次日志截斷,以此將HW調(diào)整為4。這樣一來4這條數(shù)據(jù)就丟失了(就算A不能恢復(fù),這條數(shù)據(jù)也同樣丟失了)。

image

? 對于這種情況,一般要求起碼設(shè)置如下4個參數(shù):

1)給這個topic設(shè)置replication.factor參數(shù):這個值必須大于1,要求每個partition必須有至少2個副本

2)在kafka服務(wù)端設(shè)置min.insync.replicas參數(shù):這個值必須大于1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯(lián)系,沒掉隊,這樣才能確保leader掛了還有一個follower

3)在producer端設(shè)置acks=all或-1:這個是要求每條數(shù)據(jù),必須是寫入所有replica之后,才能認為是寫成功了

4)在producer端設(shè)置retries為很大的一個值:這個是要求一旦寫入失敗,就無限重試,它默認為0,即在發(fā)生異常之后不進行任何重試。

? 當然,設(shè)置了acks等于all或-1之后,會影響一定的性能。Kafka從0.11.0.0(我們公司現(xiàn)在用的版本為0.10.0.0)開始引入了leader epoch的概念,在需要截斷數(shù)據(jù)的時候使用leader epoch作為參考依據(jù)而不是原本的HW。leader epoch代表leader的紀元信息,初始值為0,每當leader變更一次,leader epoch的值就會加1,相當于為leader增設(shè)了一個版本號。引入leader epoch很好的解決了前面所說的數(shù)據(jù)丟失問題,也就不需要去設(shè)置acks=all了。

3)生產(chǎn)者端不會丟失數(shù)據(jù)

? 如果你配置了上面場景的參數(shù),就是當數(shù)據(jù)寫入leader副本和所有follower副本成功后才返回響應(yīng)給生產(chǎn)者,如果寫入不成功,生產(chǎn)者會不斷重試。

2、Kafka 怎么防止重復(fù)消費

? 消費者的自動位移提交方式會帶來重復(fù)消費的問題。假設(shè)剛剛提交完一次消費位移,然后拉取一批消息進行消費,在下一次自動位移提交之前,消費者崩了,那么等消費者恢復(fù)再來消費消息的時候又得從上一次位移提交的地方重新開始,這樣便發(fā)生了重復(fù)消費的現(xiàn)象。

? 其實這里可以類似上面消費端丟失數(shù)據(jù)的情況,很多時候并不是說拉取到消息就算消費完成,而是將消息寫入數(shù)據(jù)庫或緩存中,或者是更加復(fù)雜的業(yè)務(wù)處理,重復(fù)消費也同樣如此,重復(fù)消費不可怕,可怕的是你沒考慮到重復(fù)消費之后,怎么保證冪等性,通俗點說,就一個數(shù)據(jù),或者一個請求,給你重復(fù)來多次,你得確保對應(yīng)的數(shù)據(jù)是不會改變的,不能出錯。這里防止重復(fù)消費,你可以像上面一樣把自動提交改為手動提交,或者是保證消息消費的冪等性。

保證消費消息冪等性

1)如果你是要插入mysql中,可以對其設(shè)置唯一鍵,插入重復(fù)的數(shù)據(jù)只會插入報錯,不會有重復(fù)數(shù)據(jù)產(chǎn)生

2)如果你是要寫入redis中,每次都是set操作,可以保證冪等性

? 如何保證消息消費是冪等性的,需要結(jié)合具體的業(yè)務(wù)來看。

3、Kafka為什么這么快?

1)消息壓縮

? Kafka在對消息進行壓縮,Producer 端壓縮、Broker 端保持、Consum進行解壓縮。它秉承了用時間去換空間的思想,具體來說就是用CPU時間去換磁盤空間或網(wǎng)絡(luò)I/O傳輸量,希望以較小的CPU開銷帶來更少的磁盤占用或更少的網(wǎng)絡(luò)I/O傳輸。Kafka支持多種壓縮算法,如GZIP、Snappy 和 LZ4。

2)數(shù)據(jù)讀寫

? Kafka會把收到的消息都寫入到磁盤中,它絕對不會丟失數(shù)據(jù)。因為磁盤是機械結(jié)構(gòu),每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是最耗時的。所以磁盤最討厭隨機I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。

image

? 如上圖,每個partition在存儲層面可以看作一個可追加的日志文件,收到消息后Kafka會把數(shù)據(jù)順序?qū)懭胛募┪病?/p>

? 即便是順序?qū)懭氪疟P,磁盤的訪問速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實時的寫入磁盤,它充分利用了現(xiàn)代操作系統(tǒng)的頁緩存,就是把磁盤中的數(shù)據(jù)緩存到內(nèi)存中,把對磁盤的訪問變?yōu)閷?nèi)存的訪問,來利用內(nèi)存提高I/O效率。

? 除了消息順序追加、頁緩存等技術(shù),Kafka還使用了零拷貝(Zero-Copy)技術(shù)來進一步提升性能。所謂的零拷貝是指將數(shù)據(jù)直接從磁盤文件復(fù)制到網(wǎng)卡設(shè)備中,而不需要經(jīng)由應(yīng)用程序之手,這樣大大提高了應(yīng)用程序的性能,減少了內(nèi)核和用戶模式之間的上下文切換。

4、消息隊列時間開銷最大的在哪兒?

? 根據(jù)上面對Kafka的分析,可以類推作為一個消息中間件所需的時間開銷主要在以下兩個方面:1)消息讀寫 2)網(wǎng)絡(luò)傳輸

5、Kafka跟其他消息隊列的差異與適應(yīng)的場景是哪些?

? 簡單介紹下比較常用的消息中間件:

? RabbitMQ是采用Erlang語言實現(xiàn)的AMQP協(xié)議的消息中間件,最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲和轉(zhuǎn)發(fā)消息。RabbitMQ發(fā)展到今天,被越來越多的人認可,這和它在可靠性、可用性、擴展性、功能豐富等方面的卓越表現(xiàn)是分不開的。

? RocketMQ是阿里開源的消息中間件,目前已捐獻給Apache基金會,它是由Java語言開發(fā)的,具備高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用等特點,經(jīng)歷過“雙十一”的洗禮,實力不容小覷。

從以下幾個方面來分析Kafka與其它常用的消息中間件的差異:

可靠性:Kafka的ISR機制保證其高可用,一主多從,leader副本掛掉后,可以自動選舉新的leader;RocketMQ也支持主從機制保證其高可用,通過設(shè)定brokerId=0來設(shè)置master,不支持主從切換,master失效以后,從slave中進行消費;RabbitMQ也是支持主從機制保證高可用,master掛掉以后,最早加入集群的slave成為master,支持主從自動切換。

單機吞吐量:RabbitMQ單機吞吐量在萬級別之內(nèi),吞吐量比RocketMQ和Kafka要低了一個數(shù)量級;RocketMQ和Kafka單機吞吐量可以維持在十萬級別。

應(yīng)用場景:RabbitMQ在金融支付領(lǐng)域使用較多,而在日志處理、大數(shù)據(jù)等方面Kafka使用居多,而RocketMQ目前在阿里集團被廣泛應(yīng)用于交易、充值、流計算、消息推送、日志流式處理、binglog分發(fā)等場景,支撐了阿里多次雙十一活動。

6、Kafka在我們系統(tǒng)中的應(yīng)用,生產(chǎn)者、消費者分別是什么?groupid是什么,topic是什么?

image

?我們的BinaryBinlogKafkaProducer

 while (true) {
            try {
                if(BinlogUtil.isCache(binlog.getHeader().getTableName())){
                    if(!TopicUtil.isExit(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName())){
                        //創(chuàng)建topic
                        TopicUtil.createTopic(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName());
                    }
                    send(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName(),binlog.toByteArray());
                }else {
                    send(topic, binlog.toByteArray());
                }
                break;
            } catch (FailedToSendMessageException e) {
               // 處理異常
          }
}

topic設(shè)置有個判斷,如果環(huán)境變量中有設(shè)置cacheTable這個參數(shù),則設(shè)置topic為“庫名.表名",若沒有,則使用“Binlog”作為topic

1571622783461.png

消費者

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MessageConsumer {

    String groupId() default "";

    String zkHost() default "127.0.0.1";

}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MessageConsumerAction {

    String topic() default "Binlog";

    String eventType();

}

消費者的groupId設(shè)置在MessageConsumerServiceImpl中,設(shè)置的是className

           String className = context.getIface() instanceof Proxy ? ((Class) 
           ifaceClass.getMethod("getTargetClass").invoke(context.getIface())).getName() : ifaceClass.getName();
            groupId = "".equals(groupId) ? className : ifaceClass.getName();
           
1571630301269.png

消費者的消費是通過注解使用的

@MessageConsumer
@Transactional(value ="iplm_reportdb", rollbackFor = Array(classOf[Exception]))
class ReportBinlogServiceImplextends ReportBinlogService{
         def init(): Unit ={
                     // 初始化加載緩存
        }
      @MessageConsumerAction(topic = "Binlog")
      override def onReportModuleBinlogMessage(message: ByteBuffer): Unit = {
                // 處理binlog
     }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容