如果spark在讀取hbase的時(shí)候感覺速度達(dá)不到需求,可以直接讀取hfile進(jìn)行操作,看代碼
package com.yoyi.data.user_profile
import com.yoyi.data.common.Application
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.mapreduce.Job
object Test extends Application {
def main(args: Array[String]): Unit = {
val hconf = CommonLabelFunc.createHbaseConf()
hconf.set("hbase.rootdir", "hdfs://host:8020/hbase")
// hconf.set(TableInputFormat.INPUT_TABLE, "demo_label_test")
val sc = createSparkContext("", Seq())
// val rdd = sc.textFile("hdfs://ns20.data.yoyi:8020/hbase/data/default/demo_label_test/fd47c44b1b341703f0ab40a1c47959f6/c")
// rdd.foreach(println)
// val hbaseContext = new HBaseContext(sc, hconf)
val scan = new Scan()
// val rdd = hbaseContext.hbaseRDD(TableName.valueOf("demo_label_test"), scan)
val proto = ProtobufUtil.toScan(scan)
hconf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()))
val job = Job.getInstance(hconf)
val snapName = "demo_label_test_snapshot"
val path = new Path("hdfs://host:8020/tmp/snapshot")
TableSnapshotInputFormat.setInput(job, snapName, path)
val rdd = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.foreach(t => {
val result = t._2
val rowkey = new String(result.getRow)
println(rowkey)
val cells = tp._3.rawCells()
for (cell <- cells) {
val qualifierArray = cell.getQualifierArray
val valueArray = cell.getValueArray
val key = new String(qualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
val value = new String(valueArray, cell.getValueOffset, cell.getValueLength)
println("key: " + key)
println("value: " + value)
}
})
// println(rdd.count())
sc.stop()
}
}
- 為了保證讀取的hfile在處理期間不會(huì)變化,需要將待處理的表就行快照處理
- spark直接通過newAPIHadoopRDD的api讀起快照后的表,通過mr的方式讀取并解析hfile
歡迎對(duì)技術(shù)感興趣的小伙伴一起交流學(xué)習(xí)^^