Spark讀寫HBase之使用Spark自帶的API以及使用Bulk Load將大量數(shù)據(jù)導(dǎo)入HBase

1. 需要的jar包依賴

<properties>
        <spark.version>2.3.0</spark.version>
        <hbase.version>1.2.6</hbase.version>
        <scala.main.version>2.11</scala.main.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.main.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    <!-- 本文處理數(shù)據(jù)用到的解析json字符串的jar包,非必需 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
</dependencies>

2. 寫數(shù)據(jù)到HBase

(1) 使用saveAsNewAPIHadoopDataset()

package com.bonc.rdpe.spark.hbase

import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author: YangYunhe
  * Description: spark 通過內(nèi)置算子寫數(shù)據(jù)到 HBase:使用saveAsNewAPIHadoopDataset()
  * Create: 2018/7/23 15:49
  */
object WriteHBaseWithNewHadoopAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val input = sc.textFile("file:///D:/data/news_profile_data.txt")
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181")
    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
    val admin = hbaseConn.getAdmin
    val jobConf = new JobConf(hbaseConf, this.getClass)
    // 設(shè)置表名
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news")

    // 如果表不存在則創(chuàng)建表
    if (!admin.tableExists(TableName.valueOf("news"))) {
      val desc = new HTableDescriptor(TableName.valueOf("news"))
      val hcd = new HColumnDescriptor("cf1")
      desc.addFamily(hcd)
      admin.createTable(desc)
    }

    val job = Job.getInstance(jobConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val data = input.map(jsonStr => {
      // 處理數(shù)據(jù)的邏輯
      val jsonObject = JSON.parseObject(jsonStr)
      val newsId = jsonObject.get("id").toString.trim
      val title = jsonObject.get("title").toString.trim
      val put = new Put(Bytes.toBytes(newsId))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title))
      (new ImmutableBytesWritable, put)
    })

    data.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()

  }
}

(2) 使用saveAsHadoopDataset()

package com.bonc.rdpe.spark.hbase

import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author: YangYunhe
  * Description: spark 通過內(nèi)置算子寫數(shù)據(jù)到 HBase:使用saveAsHadoopDataset()
  * Create: 2018/7/24 11:24
  */
object WriteHBaseWithOldHadoopAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val input = sc.textFile("file:///D:/data/news_profile_data.txt")
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181")
    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
    val admin = hbaseConn.getAdmin
    val jobConf = new JobConf(hbaseConf, this.getClass)
    // 設(shè)置表名
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news")
    jobConf.setOutputFormat(classOf[TableOutputFormat])

    // 如果表不存在則創(chuàng)建表
    if (!admin.tableExists(TableName.valueOf("news"))) {
      val desc = new HTableDescriptor(TableName.valueOf("news"))
      val hcd = new HColumnDescriptor("cf1")
      desc.addFamily(hcd)
      admin.createTable(desc)
    }

    val data = input.map(jsonStr => {
      // 處理數(shù)據(jù)的邏輯
      val jsonObject = JSON.parseObject(jsonStr)
      val newsId = jsonObject.get("id").toString.trim
      val title = jsonObject.get("title").toString.trim
      val put = new Put(Bytes.toBytes(newsId))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title))
      (new ImmutableBytesWritable, put)
    })

    data.saveAsHadoopDataset(jobConf)
    sc.stop()

  }

}

以上兩個算子分別是基于Hadoop新版API和hadoop舊版API實(shí)現(xiàn)的,大部分代碼都一樣,需要注意的是新版API使用中Job類,舊版API使用JobConf類,另外導(dǎo)包的時候新版的相關(guān)jar包在org.apache.hadoop.mapreduce下,而舊版的相關(guān)jar包在org.apache.hadoop.mapred下

3. 從HBase讀數(shù)據(jù)

以下代碼使用newAPIHadoopRDD()算子

package com.bonc.rdpe.spark.hbase

import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._

/**
  * Author: YangYunhe
  * Description: spark 通過內(nèi)置算子讀取 HBase
  * Create: 2018/7/23 15:22
  */
object ReadHBase {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))
    val sc = new SparkContext(sparkConf)
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "172.16.13.185:2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "news")

    val hBaseRDD = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    hBaseRDD.take(10).foreach(tuple => {
      val result = tuple._2
      printResult(result)
    })

  }

  def printResult(result: Result): Unit = {
    val cells = result.listCells
    for (cell <- cells) {
      printCell(cell)
    }
  }

  def printCell(cell: Cell): Unit = {
    val str =
      s"rowkey: ${Bytes.toString(CellUtil.cloneRow(cell))}, family:${Bytes.toString(CellUtil.cloneFamily(cell))}, " +
      s"qualifier:${Bytes.toString(CellUtil.cloneQualifier(cell))}, value:${Bytes.toString(CellUtil.cloneValue(cell))}, " +
      s"timestamp:${cell.getTimestamp}"
    println(str)
  }

}

需要注意的是,代碼中對ImmutableBytesWritable這個類進(jìn)行了序列化:

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))

否則程序就會報(bào)錯:

java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable

4. 寫數(shù)據(jù)的優(yōu)化:Bulk Load

以上寫數(shù)據(jù)的過程將數(shù)據(jù)一條條插入到Hbase中,這種方式運(yùn)行慢且在導(dǎo)入的過程的占用Region資源導(dǎo)致效率低下,所以很不適合一次性導(dǎo)入大量數(shù)據(jù),解決辦法就是使用 Bulk Load 方式批量導(dǎo)入數(shù)據(jù)。

Bulk Load 方式由于利用了 HBase 的數(shù)據(jù)信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數(shù)據(jù)格式文件,然后完成巨量數(shù)據(jù)快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產(chǎn)生巨量的寫入 I/O,所以需要較少的 CPU 和網(wǎng)絡(luò)資源。

Bulk Load 的實(shí)現(xiàn)原理是通過一個 MapReduce Job 來實(shí)現(xiàn)的,通過 Job 直接生成一個 HBase 的內(nèi)部 HFile 格式文件,用來形成一個特殊的 HBase 數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運(yùn)行的集群中。與使用HBase API相比,使用Bulkload導(dǎo)入數(shù)據(jù)占用更少的CPU和網(wǎng)絡(luò)資源。

接下來介紹在spark中如何使用 Bulk Load 方式批量導(dǎo)入數(shù)據(jù)到 HBase 中。

package com.bonc.rdpe.spark.hbase

import com.alibaba.fastjson.JSON
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author: YangYunhe
  * Description: 
  * Create: 2018/7/24 13:14
  */
object BulkLoad {

  val zookeeperQuorum = "172.16.13.185:2181"
  val dataSourcePath = "file:///D:/data/news_profile_data.txt"
  val hdfsRootPath = "hdfs://beh/"
  val hFilePath = "hdfs://beh/test/yyh/hbase/bulkload/hfile/"
  val tableName = "news"
  val familyName = "cf1"
  val qualifierName = "title"

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.defaultFS", hdfsRootPath)
    val fileSystem = FileSystem.get(hadoopConf)
    val hbaseConf = HBaseConfiguration.create(hadoopConf)
    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
    val admin = hbaseConn.getAdmin

    // 0. 準(zhǔn)備程序運(yùn)行的環(huán)境
    // 如果 HBase 表不存在,就創(chuàng)建一個新表
    if (!admin.tableExists(TableName.valueOf(tableName))) {
      val desc = new HTableDescriptor(TableName.valueOf(tableName))
      val hcd = new HColumnDescriptor(familyName)
      desc.addFamily(hcd)
      admin.createTable(desc)
    }
    // 如果存放 HFile文件的路徑已經(jīng)存在,就刪除掉
    if(fileSystem.exists(new Path(hFilePath))) {
      fileSystem.delete(new Path(hFilePath), true)
    }

    // 1. 清洗需要存放到 HFile 中的數(shù)據(jù),rowKey 一定要排序,否則會報(bào)錯:
    // java.io.IOException: Added a key not lexically larger than previous.

    val data = sc.textFile(dataSourcePath)
      .map(jsonStr => {
        // 處理數(shù)據(jù)的邏輯
        val jsonObject = JSON.parseObject(jsonStr)
        val rowkey = jsonObject.get("id").toString.trim
        val title = jsonObject.get("title").toString.trim
        (rowkey, title)
      })
      .sortByKey()
      .map(tuple => {
        val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(qualifierName), Bytes.toBytes(tuple._2))
        (new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
      })

    // 2. Save Hfiles on HDFS
    val table = hbaseConn.getTable(TableName.valueOf(tableName))
    val job = Job.getInstance(hbaseConf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    data.saveAsNewAPIHadoopFile(
      hFilePath,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      hbaseConf
    )

    //  3. Bulk load Hfiles to Hbase
    val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()
    fileSystem.close()
    sc.stop()
  }
}

說明:

  • rowkey一定要進(jìn)行排序
  • 上面的代碼使用了saveAsNewAPIHadoopFile(),也可以使用saveAsNewAPIHadoopDataset(),把以下代碼:
data.saveAsNewAPIHadoopFile(
  hFilePath,
  classOf[ImmutableBytesWritable],
  classOf[KeyValue],
  classOf[HFileOutputFormat2],
  hbaseConf
)

替換為:

job.getConfiguration.set("mapred.output.dir", hFilePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)

即可。

參考文章:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容