之前一直將offset存到zookeeper,由于streaming程序會對zk有大量的讀取操作,故將offset存到zk不太好,現(xiàn)已將offset都改到hbase中
kafka版本:0.10.2.0
spark版本:2.0
hbase表結構:groupid名字作為表名,topic名字作為rowkey,列族為info,分區(qū)號作為列名
| kafka_offset:groupid | info:0 | info:1 | info:2 |
|---|---|---|---|
| rowkey(topicName) | 10000 | 10000 | 10000 |
- 改完后streaming程序中的代碼調用
//先初始化hbase連接對象
HbaseUtil.setConf("zk address", "zk port")
//hbase中存offset的命名空間和表名
val offsetTbName = "kafka_offset:groupId"
HbaseUtil.createTable(offsetTbName, "info")//hbase中不存這個表就創(chuàng)建
//去hbase中獲取topic partition范圍,hbase中不存在也沒關系(第一次用這個groupid的時候)
val fromOffsets: Map[TopicPartition, Long] = OffsetUtil.getFromOffsets
/**
* param offsets :
* offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
* 引用源碼的注釋,意思大概就是如果第一次獲取不到topicPartition就用auto.offset.reset這個配置來決定是從earliest還是latest開始讀取kafka數(shù)據,也就是說不用擔心fromOffset第一次取為空的情況
*/
//創(chuàng)建kafkaStream
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,//StreamingContext
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams, fromOffsets)
)
kafkaStream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//忽略邏輯代碼
pass...
//提交offset到hbase
OffsetUtil.saveOffsetToHbase(offsetRanges, "groupId")
})
HbaseUtil.close()
- 附上上面用到的HbaseUtil.scala和OffsetUtil.scala
- HbaseUtil.scala
object HbaseUtil {
var conf: Configuration = _
//線程池
lazy val connection: Connection = ConnectionFactory.createConnection(conf)
lazy val admin: Admin = connection.getAdmin
/**
* hbase conf
*
* @param quorum hbase的zk地址
* @param port zk端口2181
* @return
*/
def setConf(quorum: String, port: String): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", quorum)
conf.set("hbase.zookeeper.property.clientPort", port)
this.conf = conf
}
/**
* 如果不存在就創(chuàng)建表
* @param tableName 命名空間:表名
* @param columnFamily 列族
*/
def createTable(tableName: String, columnFamily: String): Unit = {
val tbName = TableName.valueOf(tableName)
if (!admin.tableExists(tbName)) {
val htableDescriptor = new HTableDescriptor(tbName)
val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
htableDescriptor.addFamily(hcolumnDescriptor)
admin.createTable(htableDescriptor)
}
}
/**
* 獲取hbase單元格內容
* @param tableName 命名空間:表名
* @param rowKey rowkey
* @return 返回單元格組成的List
*/
def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
val get = new Get(Bytes.toBytes(rowKey))
val table = connection.getTable(TableName.valueOf(tableName))
val result: Result = table.get(get)
import scala.collection.JavaConverters._
result.listCells().asScala
}
/**
* 單條插入
* @param tableName 命名空間:表名
* @param rowKey rowkey
* @param family 列族
* @param qualifier column列
* @param value 列值
*/
def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
//單個插入
val put: Put = new Put(Bytes.toBytes(rowKey)) //參數(shù)是行健
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))
//獲得表對象
val table: Table = connection.getTable(TableName.valueOf(tableName))
table.put(put)
table.close()
}
def close(): Unit = {
admin.close()
connection.close()
}
- OffsetUtil.scala
object OffsetUtil {
//從hbase中獲取offset
def getFromOffsets: Map[TopicPartition, Long] ={
var fromOffsets: Map[TopicPartition, Long] = Map()
AnalysisParam.topicSet.foreach(topic => {
val get = new Get(Bytes.toBytes(topic))
val table: Table = HbaseUtil.connection.getTable(TableName.valueOf(s"kafka_offset:groupId"))
if (table.exists(get)) {
val cells = HbaseUtil.getCell(s"kafka_offset:groupId", topic)
cells.foreach(cell => {
val partition = Bytes.toString(CellUtil.cloneQualifier(cell))
val offset = Bytes.toString(CellUtil.cloneValue(cell))
val tp = new TopicPartition(topic, partition.toInt)
fromOffsets += (tp -> offset.toLong)
})
}
})
fromOffsets
}
//將offset存到hbase
def saveOffsetToHbase(offsetRanges:Array[OffsetRange],groupId:String): Unit ={
offsetRanges.foreach(o => {
val topic = o.topic
val partition = o.partition
val offset = o.fromOffset
HbaseUtil.singlePut(s"kafka_offset:$groupId", topic, "info", partition.toString, offset.toString)
})
}
}