sparkstreaming將offset存到hbase

之前一直將offset存到zookeeper,由于streaming程序會對zk有大量的讀取操作,故將offset存到zk不太好,現(xiàn)已將offset都改到hbase中

  • kafka版本:0.10.2.0

  • spark版本:2.0

  • hbase表結構:groupid名字作為表名,topic名字作為rowkey,列族為info,分區(qū)號作為列名

kafka_offset:groupid info:0 info:1 info:2
rowkey(topicName) 10000 10000 10000
  1. 改完后streaming程序中的代碼調用
//先初始化hbase連接對象
HbaseUtil.setConf("zk address", "zk port")
//hbase中存offset的命名空間和表名
val offsetTbName = "kafka_offset:groupId"
HbaseUtil.createTable(offsetTbName, "info")//hbase中不存這個表就創(chuàng)建
//去hbase中獲取topic partition范圍,hbase中不存在也沒關系(第一次用這個groupid的時候)
val fromOffsets: Map[TopicPartition, Long] = OffsetUtil.getFromOffsets
/**
* param offsets :
* offsets to begin at on initial startup.  If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
* 引用源碼的注釋,意思大概就是如果第一次獲取不到topicPartition就用auto.offset.reset這個配置來決定是從earliest還是latest開始讀取kafka數(shù)據,也就是說不用擔心fromOffset第一次取為空的情況
*/
//創(chuàng)建kafkaStream
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc,//StreamingContext
    PreferConsistent,
    Subscribe[String, String](topicsSet, kafkaParams, fromOffsets)
)

kafkaStream.foreachRDD(rdd => {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //忽略邏輯代碼
    pass...
    //提交offset到hbase
    OffsetUtil.saveOffsetToHbase(offsetRanges, "groupId")
})

HbaseUtil.close()
  • 附上上面用到的HbaseUtil.scala和OffsetUtil.scala
  1. HbaseUtil.scala
object HbaseUtil {
  var conf: Configuration = _
  //線程池
  lazy val connection: Connection = ConnectionFactory.createConnection(conf)
  lazy val admin: Admin = connection.getAdmin

  /**
    * hbase conf
    *
    * @param quorum hbase的zk地址
    * @param port   zk端口2181
    * @return
    */
  def setConf(quorum: String, port: String): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", quorum)
    conf.set("hbase.zookeeper.property.clientPort", port)
    this.conf = conf
  }

  /**
    * 如果不存在就創(chuàng)建表
    * @param tableName 命名空間:表名
    * @param columnFamily 列族
    */
  def createTable(tableName: String, columnFamily: String): Unit = {
    val tbName = TableName.valueOf(tableName)
    if (!admin.tableExists(tbName)) {
      val htableDescriptor = new HTableDescriptor(tbName)
      val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
      htableDescriptor.addFamily(hcolumnDescriptor)
      admin.createTable(htableDescriptor)
    }
  }

  /**
    * 獲取hbase單元格內容
    * @param tableName 命名空間:表名
    * @param rowKey rowkey
    * @return 返回單元格組成的List
    */
  def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
    val get = new Get(Bytes.toBytes(rowKey))
    val table = connection.getTable(TableName.valueOf(tableName))
    val result: Result = table.get(get)
    import scala.collection.JavaConverters._
    result.listCells().asScala
  }

  /**
    * 單條插入
    * @param tableName 命名空間:表名
    * @param rowKey rowkey
    * @param family 列族
    * @param qualifier column列
    * @param value 列值
    */
  def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
    //單個插入
    val put: Put = new Put(Bytes.toBytes(rowKey)) //參數(shù)是行健
    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))

    //獲得表對象
    val table: Table = connection.getTable(TableName.valueOf(tableName))
    table.put(put)
    table.close()
  }

  def close(): Unit = {
    admin.close()
    connection.close()
  }
  1. OffsetUtil.scala
object OffsetUtil {
  //從hbase中獲取offset
  def getFromOffsets: Map[TopicPartition, Long] ={
    var fromOffsets: Map[TopicPartition, Long] = Map()
    AnalysisParam.topicSet.foreach(topic => {
      val get = new Get(Bytes.toBytes(topic))
      val table: Table = HbaseUtil.connection.getTable(TableName.valueOf(s"kafka_offset:groupId"))
      if (table.exists(get)) {
        val cells = HbaseUtil.getCell(s"kafka_offset:groupId", topic)
        cells.foreach(cell => {
          val partition = Bytes.toString(CellUtil.cloneQualifier(cell))
          val offset = Bytes.toString(CellUtil.cloneValue(cell))
          val tp = new TopicPartition(topic, partition.toInt)
          fromOffsets += (tp -> offset.toLong)
        })
      }
    })
    fromOffsets
  }

  //將offset存到hbase
  def saveOffsetToHbase(offsetRanges:Array[OffsetRange],groupId:String): Unit ={
    offsetRanges.foreach(o => {
      val topic = o.topic
      val partition = o.partition
      val offset = o.fromOffset
      HbaseUtil.singlePut(s"kafka_offset:$groupId", topic, "info", partition.toString, offset.toString)
    })
  }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Zookeeper用于集群主備切換。 YARN讓集群具備更好的擴展性。 Spark沒有存儲能力。 Spark的Ma...
    Yobhel閱讀 7,597評論 0 34
  • HBase那些事 @(大數(shù)據工程學院)[HBase, Hadoop, 優(yōu)化, HadoopChen, hbase]...
    分癡閱讀 4,104評論 3 17
  • 參考:http://www.itdecent.cn/p/569106a3008f 最近在逐步跟進Hbase的相關...
    博弈史密斯閱讀 927評論 1 1
  • 還是一樣的雨,默默走在雨里,傘靜靜的掛在包上,也許路人見了,會說我有病吧,有傘不用!我回之以淡笑,這雨可不可以來的...
    望而生灰閱讀 169評論 0 0
  • BitmapShader描述:Shader used to draw a bitmap as a texture....
    大大大寒閱讀 1,458評論 1 1

友情鏈接更多精彩內容