spark直接讀取Hfile

如果spark在讀取hbase的時(shí)候感覺速度達(dá)不到需求,可以直接讀取hfile進(jìn)行操作,看代碼

package com.yoyi.data.user_profile

import com.yoyi.data.common.Application
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.mapreduce.Job

object Test extends Application {

  def main(args: Array[String]): Unit = {


    val hconf = CommonLabelFunc.createHbaseConf()
    hconf.set("hbase.rootdir", "hdfs://host:8020/hbase")
//    hconf.set(TableInputFormat.INPUT_TABLE, "demo_label_test")
    val sc = createSparkContext("", Seq())

//    val rdd = sc.textFile("hdfs://ns20.data.yoyi:8020/hbase/data/default/demo_label_test/fd47c44b1b341703f0ab40a1c47959f6/c")
//    rdd.foreach(println)

//    val hbaseContext = new HBaseContext(sc, hconf)
    val scan = new Scan()
//    val rdd = hbaseContext.hbaseRDD(TableName.valueOf("demo_label_test"), scan)
    val proto = ProtobufUtil.toScan(scan)
    hconf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()))
    val job = Job.getInstance(hconf)
    val snapName = "demo_label_test_snapshot"
    val path = new Path("hdfs://host:8020/tmp/snapshot")
    TableSnapshotInputFormat.setInput(job, snapName, path)
    val rdd = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.foreach(t => {
    val result = t._2
    val rowkey = new String(result.getRow)
    println(rowkey)
    val cells = tp._3.rawCells()
    for (cell <- cells) {
        val qualifierArray = cell.getQualifierArray
        val valueArray = cell.getValueArray
        val key = new String(qualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
        val value = new String(valueArray, cell.getValueOffset, cell.getValueLength)
        println("key: " + key)
        println("value: " + value)
    }
    
    })
//        println(rdd.count())

    sc.stop()
  }

}
  • 為了保證讀取的hfile在處理期間不會(huì)變化,需要將待處理的表就行快照處理
  • spark直接通過newAPIHadoopRDD的api讀起快照后的表,通過mr的方式讀取并解析hfile

歡迎對(duì)技術(shù)感興趣的小伙伴一起交流學(xué)習(xí)^^

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容