Kafka+Spark Streaming如何保證exactly once語(yǔ)義

在Kafka、Storm、Flink、Spark Streaming等分布式流處理系統(tǒng)中(沒(méi)錯(cuò),Kafka本質(zhì)上是流處理系統(tǒng),不是單純的“消息隊(duì)列”),存在三種消息傳遞語(yǔ)義(message delivery semantics),分別是:

  • at least once:每條消息會(huì)被收到1次或多次。例如發(fā)送方S在超時(shí)時(shí)間內(nèi)沒(méi)有收到接收方R的通知(如ack),或者收到了R的報(bào)錯(cuò),就會(huì)不斷重發(fā)消息直至R傳回ack。
  • at most once:每條消息會(huì)被收到0次或1次。也就是說(shuō)S只負(fù)責(zé)向R發(fā)送消息,R也沒(méi)有任何通知機(jī)制。無(wú)論R最終是否收到,S都不會(huì)重發(fā)。
  • exactly once:是上面兩個(gè)的綜合,保證S發(fā)送的每一條消息,R都會(huì)“不重不漏”地恰好收到1次。它是最強(qiáng)最精確的語(yǔ)義,也最難實(shí)現(xiàn)。

在我們的日常工作中,90%的流處理業(yè)務(wù)都是通過(guò)Kafka+Spark Streaming+HDFS來(lái)實(shí)現(xiàn)的(這里Kafka的作用是消息隊(duì)列了)。本篇探討保證exactly once語(yǔ)義的方法。


如上面的圖所示,一個(gè)Spark Streaming程序由三步組成:輸入、處理邏輯、輸出。要達(dá)到exactly once的理想狀態(tài),需要三步協(xié)同進(jìn)行,而不是只與處理邏輯有關(guān)。Kafka與Spark Streaming集成時(shí)有兩種方法:舊的基于receiver的方法,新的基于direct stream的方法。下面兩張圖可以清楚地說(shuō)明。

  • 基于receiver的方法


    基于receiver的方法采用Kafka的高級(jí)消費(fèi)者API,每個(gè)executor進(jìn)程都不斷拉取消息,并同時(shí)保存在executor內(nèi)存與HDFS上的預(yù)寫(xiě)日志(write-ahead log/WAL)。當(dāng)消息寫(xiě)入WAL后,自動(dòng)更新ZooKeeper中的offset。
    它可以保證at least once語(yǔ)義,但無(wú)法保證exactly once語(yǔ)義。雖然引入了WAL來(lái)確保消息不會(huì)丟失,但還有可能會(huì)出現(xiàn)消息已經(jīng)寫(xiě)入WAL,但offset更新失敗的情況,Kafka就會(huì)按上一次的offset重新發(fā)送消息。這種方式還會(huì)造成數(shù)據(jù)冗余(Kafka broker中一份,Spark executor中一份),使吞吐量和內(nèi)存利用率降低?,F(xiàn)在基本都使用下面基于direct stream的方法了。

  • 基于direct stream的方法


    基于direct stream的方法采用Kafka的簡(jiǎn)單消費(fèi)者API,它的流程大大簡(jiǎn)化了。executor不再?gòu)腒afka中連續(xù)讀取消息,也消除了receiver和WAL。還有一個(gè)改進(jìn)就是Kafka分區(qū)與RDD分區(qū)是一一對(duì)應(yīng)的,更可控。
    driver進(jìn)程只需要每次從Kafka獲得批次消息的offset range,然后executor進(jìn)程根據(jù)offset range去讀取該批次對(duì)應(yīng)的消息即可。由于offset在Kafka中能唯一確定一條消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,達(dá)到了exactly once。
    不過(guò),由于它采用了簡(jiǎn)單消費(fèi)者API,我們就需要自己來(lái)管理offset。否則一旦程序崩潰,整個(gè)流只能從earliest或者latest點(diǎn)恢復(fù),這肯定是不穩(wěn)妥的。offset管理在之前的文章中提到過(guò),這里不再贅述。

Kafka作為輸入源可以保證exactly once,那么處理邏輯呢?答案是顯然的,Spark Streaming的處理邏輯天生具備exactly once語(yǔ)義。
Spark RDD之所以被稱(chēng)為“彈性分布式數(shù)據(jù)集”,是因?yàn)樗哂?strong>不可變、可分區(qū)、可并行計(jì)算、容錯(cuò)的特征。一個(gè)RDD只能由穩(wěn)定的數(shù)據(jù)集生成,或者從其他RDD轉(zhuǎn)換(transform)得來(lái)。如果在執(zhí)行RDD lineage的過(guò)程中失敗,那么只要源數(shù)據(jù)不發(fā)生變化,無(wú)論重新執(zhí)行多少次lineage,都一定會(huì)得到同樣的、確定的結(jié)果。

最后,我們還需要保證輸出過(guò)程也符合exactly once語(yǔ)義。Spark Streaming的輸出一般是靠foreachRDD()算子來(lái)實(shí)現(xiàn),它默認(rèn)是at least once的。如果輸出過(guò)程中途出錯(cuò),那么就會(huì)重復(fù)執(zhí)行直到寫(xiě)入成功。為了讓它符合exactly once,可以施加兩種限制之一:冪等性寫(xiě)入(idempotent write)、事務(wù)性寫(xiě)入(transactional write)。

  • 冪等性寫(xiě)入
    冪等原來(lái)是數(shù)學(xué)里的概念,即f(f(x))=f(x)。冪等寫(xiě)入就是寫(xiě)入多次與寫(xiě)入一次的結(jié)果完全相同,可以自動(dòng)將at least once轉(zhuǎn)化為exactly once。這對(duì)于自帶主鍵或主鍵組的業(yè)務(wù)比較合適(比如各類(lèi)日志、MySQL binlog等),并且實(shí)現(xiàn)起來(lái)比較簡(jiǎn)單。
    但是它要求處理邏輯是map-only的,也就是只能包含轉(zhuǎn)換、過(guò)濾等操作,不能包含shuffle、聚合等操作。如果條件更嚴(yán)格,就只能采用事務(wù)性寫(xiě)入方法。
    stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)

        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }
  • 事務(wù)性寫(xiě)入
    這里的事務(wù)與DBMS中的事務(wù)含義基本相同,就是對(duì)數(shù)據(jù)進(jìn)行一系列訪問(wèn)與更新操作所組成的邏輯塊。為了符合事務(wù)的ACID特性https://en.wikipedia.org/wiki/ACID_(computer_science)),必須引入一個(gè)唯一ID標(biāo)識(shí)當(dāng)前的處理邏輯,并且將計(jì)算結(jié)果與該ID一起落盤(pán)。ID可以由主題、分區(qū)、時(shí)間、offset等共同組成。
    事務(wù)操作可以在foreachRDD()時(shí)進(jìn)行。如果數(shù)據(jù)寫(xiě)入失敗,或者offset寫(xiě)入與當(dāng)前offset range不匹配,那么這一批次數(shù)據(jù)都將失敗并且回滾。
// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }

      // store offsets
      val offsetRows = sql"""
    update txn_offsets set off = ${osr.untilOffset}
      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
    """.update.apply()
      if (offsetRows != 1) {
        throw new Exception("...")
      }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容