kafka-kafka+sparkstreaming不丟數(shù)據(jù)的方案

一、從kafka讀數(shù)據(jù)保證不丟失的方案

?0.8版本
lines.foreachRDD( rdd =>{
      //業(yè)務(wù)邏輯代碼處理
     // rdd.map(_._2).flatMap(_.split(","))
     rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map( o =>{
       println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
       //更新偏移量就可以了。
     })
    })

lines對(duì)象里面有偏移量,但是凡事代碼對(duì)這個(gè)對(duì)象做了任何算子的操作(比如map),偏移量就丟失了.所以調(diào)用了createDirectStream之后,只要在多調(diào)用一個(gè)其他的算子,偏移量就丟失了,只能調(diào)用foreachRDD,然后進(jìn)行業(yè)務(wù)邏輯處理.
但是有一個(gè)小小的遺憾,就是KafkaUtils.createDirectStream返回的其實(shí)是一個(gè)DStream,但是調(diào)用了foreachRDD之后變成了RDD把streaming的處理,變成了spark core 去處理了..雖然問(wèn)題不大,但是streaming特有的算子就不能用了,比如 window、updatestatebykey也就是說(shuō),他不能適用100%的場(chǎng)景

?那么怎么寫(xiě)可以100%適用呢?

整體思想:如果我們能在讀取kafka的時(shí)候,傳遞進(jìn)去offset的信息,并且保證這個(gè)offset的更新跟數(shù)據(jù)處理成功與否是原子性的,就能保證數(shù)據(jù)不丟失

image.png

1、擴(kuò)展createDirectStream方法,新建KafkaManager
2、設(shè)置一個(gè)監(jiān)聽(tīng)器,幫我們完成偏移量的提交(因?yàn)?.8默認(rèn)offset保存在zk,所以我們需要調(diào)用kafka往zk寫(xiě)offset的方法,重點(diǎn)是什么時(shí)候?qū)?
3、在讀取kafka的數(shù)據(jù)之前,先把偏移量更新到zk(setOrUpdateOffsets),然后在讀取偏移量(kc.getConsumerOffsets)
?注意0.8版本是保存在zk的,所以我們只需要把offset寫(xiě)入zk,然后調(diào)用kafka的方法去zk讀取就好了
4、保證提交偏移量和數(shù)據(jù)處理邏輯的原子性

NOTE:最后這個(gè)方案,在架構(gòu)設(shè)計(jì)上有一個(gè)缺點(diǎn):
如果企業(yè)中spark streaming太多(上千個(gè)-->幾十個(gè)其實(shí)是沒(méi)問(wèn)題的),會(huì)頻繁的往zk寫(xiě)數(shù)據(jù),這對(duì)zk來(lái)說(shuō)是一個(gè)高并發(fā)的場(chǎng)景,而zk是不適合高并發(fā)的,所以用redis代替是可以的

分別解釋一下:
新建KafkaManager類,新建一個(gè)createDirectStream方法和setOrUpdateOffsets方法

class KakfaManager {
  def createDirectStream(){
  // 在zookeeper上讀取offsets前先根據(jù)實(shí)際情況更新offsets
  setOrUpdateOffsets()
  ...
  //獲取offset
  val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
  //還是調(diào)用的sparkStermaing的API
  KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
}

setOrUpdateOffsets(){
...
//根據(jù)實(shí)際消費(fèi)情況更新消費(fèi)offsets
kc.setConsumerOffsets(groupId, offsets)
...
}

監(jiān)聽(tīng)器(我們只需要把offset放入zk就可以了)

class MyListener implements StreamingListener(){
  @Override
  //batchCompleted對(duì)象里面是有任務(wù)的offset信息
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
    //提交偏移量
    //這個(gè)方法是 0.8的依賴提供的API
    //這樣話,就自動(dòng)幫我把offset存到了ZK
   kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
    /**
     * 批次完成時(shí)調(diào)用的方法
     * @param batchCompleted
     *
     * batchCompleted 對(duì)象里面帶有了偏移量對(duì)信息,所以我提交偏移量對(duì)時(shí)候
     * 就是從這個(gè)對(duì)象里面讀取offset就可以了。
     *
     * 注意:任務(wù)運(yùn)行完了,但是有可能是成功的,也有可能是失敗的
     */
    @Override
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {//這個(gè)對(duì)象里面是有任務(wù)的offset信息
      
        //任務(wù)運(yùn)行完了以后會(huì)(任務(wù)成功,)提交偏移量
        //
        //如果本批次里面有任務(wù)失敗了,那么就終止偏移量提交
        scala.collection.immutable.Map<Object, OutputOperationInfo> opsMap = batchCompleted.batchInfo().outputOperationInfos();
//
        Map<Object, OutputOperationInfo> javaOpsMap = JavaConversions.mapAsJavaMap(opsMap);
        for (Map.Entry<Object, OutputOperationInfo> entry : javaOpsMap.entrySet()) {
            //failureReason不等于None(是scala中的None),說(shuō)明有異常,不保存offset
            //OutputOperationInfo task
            //task -> failureReason
            if (!"None".equalsIgnoreCase(entry.getValue().failureReason().toString())) {
                return;
            }
        }
      //  long batchTime = batchCompleted.batchInfo().batchTime().milliseconds();
        /**
         * topic,分區(qū),偏移量
         */
        Map<String, Map<Integer, Long>> offset = getOffset(batchCompleted);

        for (Map.Entry<String, Map<Integer, Long>> entry : offset.entrySet()) {
            String topic = entry.getKey();
            Map<Integer, Long> paritionToOffset = entry.getValue();
            //我只需要在這里把offset放入到zookeeper就可以了。
            for(Map.Entry<Integer,Long> p2o : paritionToOffset.entrySet()){
                Map<TopicAndPartition, Object> map = new HashMap<TopicAndPartition, Object>();

                TopicAndPartition topicAndPartition =
                        new TopicAndPartition(topic,p2o.getKey());

                map.put(topicAndPartition,p2o.getValue());
                scala.collection.immutable.Map<TopicAndPartition, Object>
                        topicAndPartitionObjectMap = TypeHelper.toScalaImmutableMap(map);
                //提交偏移量

                //這個(gè)方法是 0.8的依賴提供的API
                //這樣話,就自動(dòng)幫我把offset存到了ZK
                kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
            }

        }
    }

private Map<String, Map<Integer, Long>> getOffset(StreamingListenerBatchCompleted batchCompleted) {
        Map<String, Map<Integer, Long>> map = new HashMap<>();

        scala.collection.immutable.Map<Object, StreamInputInfo> inputInfoMap =
                batchCompleted.batchInfo().streamIdToInputInfo();

        Map<Object, StreamInputInfo> infos = JavaConversions.mapAsJavaMap(inputInfoMap);

        infos.forEach((k, v) -> {
            Option<Object> optOffsets = v.metadata().get("offsets");
            if (!optOffsets.isEmpty()) {
                Object objOffsets = optOffsets.get();
                if (List.class.isAssignableFrom(objOffsets.getClass())) {
                    List<OffsetRange> scalaRanges = (List<OffsetRange>) objOffsets;

                    Iterable<OffsetRange> ranges = JavaConversions.asJavaIterable(scalaRanges);
                    for (OffsetRange range : ranges) {
                        if (!map.containsKey(range.topic())) {
                            map.put(range.topic(), new HashMap<>());
                        }
                        map.get(range.topic()).put(range.partition(), range.untilOffset());
                    }
                }
            }
        });

        return map;
    }
 }

0.10版本

subscribe方式

1、創(chuàng)建一個(gè)監(jiān)聽(tīng)器,繼承StreamingListener,在onBatchCompleted方法中去提交offset

//獲取數(shù)據(jù)源(主題里面讀取offset)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    //設(shè)置監(jiān)聽(tīng)器
    ssc.addStreamingListener(new MyListener(stream))
    val result = stream.map(_.value()).flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
    result.foreachRDD( rdd =>{
      /**
       * 其它的操作
       */
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
if(offsetRanges != null){
     提交偏移量(Kafka)
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
    }

因?yàn)?.10版本的kafka的offset默認(rèn)已經(jīng)不下zk,而是寫(xiě)kafka了(_consumer_offset)

如何保證數(shù)據(jù)exactly-once語(yǔ)義

我們程序是通過(guò)kafka的offset去讀取數(shù)據(jù),我們一般是處理數(shù)據(jù),然后寫(xiě)offset,
正常情況是沒(méi)問(wèn)題的,但是如果處理完了,但是offset沒(méi)寫(xiě)成功,或者處理失敗,offset寫(xiě)成功了
就出現(xiàn)了,重復(fù)消費(fèi),或者丟數(shù)據(jù)的情況;
所以原則上就是保證事務(wù)或者保證原子性,就是處理寫(xiě)庫(kù)操作(保存mysql、HBase)和寫(xiě)offset操作要么一起成功,要么一起失敗就行了
還有就是設(shè)計(jì)成冪等操作,重復(fù)了也無(wú)所謂

import scalikejdbc.{ConnectionPool, DB, _}
//第一步:在Driver端創(chuàng)建數(shù)據(jù)庫(kù)連接池
ConnectionPool.singleton("jdbc:mysql://**", "", "")
//第二步:從數(shù)據(jù)庫(kù)讀取offset
//1)初次啟動(dòng)或重啟時(shí),從指定的Partition、Offset構(gòu)建TopicPartition
//2)運(yùn)行過(guò)程中,每個(gè)Partition、Offset保存在內(nèi)部currentOffsets = Map[TopicPartition, Long]()變量中
//3)后期Kafka Topic分區(qū)動(dòng)態(tài)擴(kuò)展,在運(yùn)行過(guò)程中不能自動(dòng)感知
val initOffset=DB.readOnly(implicit session=>{
      sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
        .map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
        .list().apply().toMap
    })
//第三步:CreateDirectStream
//從指定的Topic、Partition、Offset開(kāi)始消費(fèi)
    val sourceDStream =KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
    )
sourceDStream.foreachRDD(rdd =>{
//業(yè)務(wù)處理
//在Driver端存儲(chǔ)數(shù)據(jù)、提交Offset
//結(jié)果存儲(chǔ)與Offset提交在同一事務(wù)中原子執(zhí)行
//這里將偏移量保存在Mysql中
//事務(wù)
DB.localTx(
//儲(chǔ)存結(jié)果到mysql
//儲(chǔ)存offset到mysql
)
})

參考文檔:奈學(xué)教育

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容