Sparkstreaming數(shù)據(jù)零丟失之手動維護offset到MySQL

版本信息:

spark:2.2.0
kakfa:0.10.1.0
scala:2.11.8
scalikejdbc:3.3.2

Pom文件:

<properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <scalikejdbc.version>3.3.2</scalikejdbc.version>
</properties>

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

      <!--scalikejdbc 依賴 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>
       <!--Spark 依賴 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

         <!--mysql 依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
</dependencies>

application.conf文件

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource


#Kafka信息
metadata.broker.list = "192.168.245.100:9092"
#從老數(shù)據(jù)開始消費
auto.offset.reset = "smallest"
group.id = "baidu_offset_group"
kafka.topics = "baidu"
serializer.class = "kafka.serializer.StringEncoder"
request.required.acks = "1"

ValueUtils

package com.soul.bigdata.spark.streaming01

import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils

object ValueUtils {
  val load = ConfigFactory.load()

  def getStringValue(key: String, defaultValue: String = "") = {
    val value = load.getString(key)
    if (StringUtils.isNotEmpty(value)) {
      value
    } else {
      defaultValue
    }
  }
}

MySQL Offset表

 create table baidu_offset(
        topic varchar(32),
        groupid varchar(50),
        partitions int,
        fromoffset bigint,
        untiloffset bigint,
        primary key(topic,groupid,partitions)
        );

代碼:

package com.soul.bigdata.spark.streaming01


import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs

object StreamingOffsetMySQL {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingOffsetMySQL")

    val ssc = new StreamingContext(conf, Seconds(10))

    //Topic
    val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet

    //kafka參數(shù)
    //這里應用了自定義的ValueUtils工具類,來獲取application.conf里的參數(shù),方便后期修改
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> ValueUtils.getStringValue("metadata.broker.list"),
      "auto.offset.reset" -> ValueUtils.getStringValue("auto.offset.reset"),
      "group.id" -> ValueUtils.getStringValue("group.id")
    )


    //先使用scalikejdbc從MySQL數(shù)據(jù)庫中讀取offset信息
    //+------------+------------------+------------+------------+-------------+
    //| topic      | groupid          | partitions | fromoffset | untiloffset |
    //+------------+------------------+------------+------------+-------------+
    //MySQL表結構如上,將“topic”,“partitions”,“untiloffset”列讀取出來
    //組成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到


    DBs.setup()
    val fromOffset = DB.readOnly(implicit session => {
      SQL("select * from baidu_offset").map(rs => {
        (TopicAndPartition(rs.string("topic"), rs.int("partitions")), rs.long("untiloffset"))
      }).list().apply()
    }).toMap


    //如果MySQL表中沒有offset信息,就從0開始消費;如果有,就從已經(jīng)存在的offset開始消費
    val messages = if (fromOffset.isEmpty) {
      println("從頭開始消費...")
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    } else {
      println("從已存在記錄開始消費...")
      val messageHandler = (mm: MessageAndMetadata[String, String]) => (mm.key(), mm.message())
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
    }


    messages.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        //輸出rdd的數(shù)據(jù)量
        println("數(shù)據(jù)統(tǒng)計記錄為:" + rdd.count())
        //官方案例給出的獲得rdd offset信息的方法,offsetRanges是由一系列offsetRange組成的數(shù)組
        //          trait HasOffsetRanges {
        //            def offsetRanges: Array[OffsetRange]
        //          }
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(x => {
          //輸出每次消費的主題,分區(qū),開始偏移量和結束偏移量
          println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")
          //將最新的偏移量信息保存到MySQL表中
          DB.autoCommit(implicit session => {
            SQL("replace into baidu_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")
              .bind(x.topic, ValueUtils.getStringValue("group.id"), x.partition, x.fromOffset, x.untilOffset)
              .update().apply()
          })
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

運行


image.png

停掉程序,重新運行,開始offset是從411開始消費的就達到了我們的目的


image.png
image.png
image.png
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容