SparkStreaming如何維護Kafka消息偏移量

SparkStreaming? ? 維護????Kafka? ? 消息偏移量據個人了解有兩種方式

一、利用????SparkStreaming? ? 自帶的????Checkpoint? ? 方法來維護

二、自己來編寫維護????Kafka? ? 消息偏移量的代碼

首先說明下集群中的各組件版本

Hadoop-version:????2.7.1

Spark-version? ? :????1.6.2

zookeeper-version:????3.4.6

jdk-version:? ? ? ? ? ? ? 1.8

maven-version:? ? ? ? 3.3.3

kafka-version:? ? ? ? ? ?0.10.0

集群搭建方式? ? HDP(ambari)

首先說第一種????SparkStreaming? ? 自帶的????Checkpoint? ? 方法,以下為代碼示例

object Test {

def main(args: Array[String]): Unit = {

//? ? TODO:創(chuàng)建檢查點的位置可以設置為Hdfs如果程序重新啟動spark程序會到此目錄中檢查并恢復

? ? val checkpointDirectory ="";

//? ? TODO:調用getOrCreate方法,這個方法入如果是第一次運行該作業(yè)

? ? //? ? TODO:沒有checkpointDirectory該文件時

? ? //? ? TODO:將會重新創(chuàng)建一個StreamingContext,并從最新或是最老的偏移量處開始消費

? ? val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDirectory, () => {

createStreaming(checkpointDirectory)

})

ssc.start()

ssc.awaitTermination()

}

//? TODO:你的業(yè)務邏輯不應該寫在main函數中,而是應該寫在創(chuàng)建

? //? TODO:StreamingContext的方法中

? def createStreaming(checkpointDirectory:String): StreamingContext = {

? ??????val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667"

? ?? ???val topic =Set("test")

? ??????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)

? ??????val conf: SparkConf =new SparkConf()

? ??????????????????????????????????.setMaster("local")

? ??????????????????????????????????.setAppName(Test.getClass.getSimpleName)

? ??????val ssc: StreamingContext =new StreamingContext(conf,Seconds(3))

//? ? TODO:在這里創(chuàng)建檢查點,至于檢查點在哪里創(chuàng)建,具體需要看個人的業(yè)務需求

? ??? ??ssc.checkpoint(checkpointDirectory)

? ??????val data: InputDStream[(String,String)] =

? ??????????????????????KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)

//? TODO:以下代碼就是你的業(yè)務邏輯了,這里只是循環(huán)遍歷打印了Key,Value

? ??? ??data.foreachRDD(rdd => {

? ??????????????rdd.foreachPartition(partition => {

? ??????????????????????if (partition.isEmpty) {

? ??????????????????????????????for (tuple <- partition) {

? ??????????????????????????????????println(tuple._1 + tuple._2)

? ??????????????????????????}

? ??????????????????????}

? ??????????????????})

? ??????????????})

? ??????????ssc

? ??????????}

? ??????}

說一下這個方法的缺點:

這個方法不適合總是需要迭代升級的應用,因為這個方法會在你建立檢查點時將你的jar包信息以序列化的方式存在此目錄中,

如果你的作業(yè)掛掉重新啟動時,這時候是沒有問題的,因為什么都沒有改變。

但是在你的應用迭代升級時你的代碼發(fā)生了變化,這是程序會發(fā)現其中的變化,你迭代升級后的版本將無法運行,就算是啟動成功了,

運行的也還是迭代升級之前的代碼。所還是以失敗而告終?。?/p>

在Spark官方文檔中給出了兩個解決辦法

第一個:老的作業(yè)不停機,新作業(yè)個老作業(yè)同時運行一段時間,這樣是不安全的?。?!

會導致數據重復消費,也有可能會發(fā)生數據丟失等問題

第二個:就是我要講的自己維護消息偏移量

以下是自己維護消息偏移量代碼示例

object Test{

? ??????def main(args: Array[String]): Unit = {

//? ? TODO: Zookeeper 集群地址和端口

??? ??????????? val zkHost:String ="192.168.1.1:2181,192.168.1.1:2181,192.168.1.1:2181"

? ? //? ? TODO:Kafka集群地址及端口

??? ??????????? val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667,192.168.1.1:6667"

? ? //? ? TODO:指定消費 Kafka 主題

??? ??????????? val topic:String ="test"

? ??? ??????????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)

? ??? ??????????val conf: SparkConf =new SparkConf()

? ??????????????????????????????.setMaster("local[8]")

? ??????????????????????????????.setAppName(NGBoss_Dcc_Analysis.getClass.getSimpleName)

//? ? TODO:獲取一個StreamingContext對象

? ?? ???????????val ssc: StreamingContext =new StreamingContext(conf,Seconds(5))

//? ? TODO:將獲取的Zookeeper客戶端

? ??? ??????????val zkClient =new ZkClient(zkHost)

? ??????????????var offsetRanges: Array[OffsetRange] =Array[OffsetRange]()

//? ? TODO:設置在zookeeper中存儲offset的路徑

? ??? ??????????val topicDirs: ZKGroupTopicDirs =new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming", topic)

? ??? ??????????val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")


? ??? ??????????var fromOffsets:Map[TopicAndPartition, Long] =Map()

? ??????????????var kafkaStream: InputDStream[(String, Array[Byte])] =null

? ??? ? ? ? ? ? ? ? ? ? ??if (children >0) {

//TODO:如果 zookeeper 中有保存 offset,我們會利用這個 offset 作為 kafkaStream 的起始位置

//TODO:如果保存過 offset,這里更好的做法,還應該和? kafka 上最小的 offset 做對比,不然會報 OutOfRange 的錯誤

? ? ?? ?????????????????????????for (i <-0 until children) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")

? ??????????????????????????????????val tp =TopicAndPartition(topic, i)

//TODO:將不同 partition 對應的 offset 增加到 fromOffsets 中

? ? ? ?? ???????????fromOffsets += (tp -> partitionOffset.toLong)

? ??????????????????}

//TODO:這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (topic_name, message) 這樣的 tuple

? ? ?? ???????val messageHandler: (MessageAndMetadata[String, Array[Byte]]) => (String, Array[Byte]) = (mmd:?? ??????????????????????????????MessageAndMetadata[String, Array[Byte]]) => (mmd.topic, mmd.message())

? ????????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder,?? ??????????????????????????????(String, Array[Byte])](ssc, kafkaParams, fromOffsets, messageHandler)

? ??????????}else {

//TODO:如果未保存,根據 kafkaParam 的配置使用最新或者最舊的 offset

? ? ?? ???????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder]? ??????????????????????????????(ssc, kafkaParams,Set(topic))

? ??????????????}

? ??????????kafkaStream.foreachRDD(rdd => {

? ??????????????????if (!rdd.isEmpty()) {

? ??????????????????????rdd.foreachPartition(partition=> {

? ??????????????????????????????partition.foreach(tuple => {

? ??????????????????????????????????println(tuple._1 + tuple._2)

? ??????????????????????????????})

? ??????????????????????})

? ??????????????????offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

? ??????????????????????????????for (o <- offsetRanges) {

? ??????????????????????????????val zkPath:String =s"${topicDirs.consumerOffsetDir}/${o.partition}"

? ? ? ? ? //TODO:將該 partition 的 offset 保存到 zookeeper

? ? ? ? ?? ??????????????????????ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)

? ??????????????????????????}

? ??????????????????????}

? ??????????????????})

? ??????????ssc.start()

? ??????ssc.awaitTermination()

? ??????}

? ??}

第二種方法中offset可以存儲在zookeeper中也可以存儲在數據庫中

自己維護offset是不會發(fā)生Checkpoint?中的問題,是我目前知道的最好的一個解決方案。

但是也有其中的缺點,如果從失敗中恢復運行時不能獲取到Key值,默認它的Key就是主題。

還有就是不能同時消費維護多個主題中的偏移量。

如果你有跟好的解決方案,還請不要吝嗇你的知識?。。。?!

謝謝你可以在百忙之中看完我寫的文章!?。。。。?/b>

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容