1. 需要的jar包依賴
<properties>
<spark.version>2.3.0</spark.version>
<hbase.version>1.2.6</hbase.version>
<scala.main.version>2.11</scala.main.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.main.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- 本文處理數(shù)據(jù)用到的解析json字符串的jar包,非必需 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
2. 寫數(shù)據(jù)到HBase
(1) 使用saveAsNewAPIHadoopDataset()
package com.bonc.rdpe.spark.hbase
import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: YangYunhe
* Description: spark 通過內(nèi)置算子寫數(shù)據(jù)到 HBase:使用saveAsNewAPIHadoopDataset()
* Create: 2018/7/23 15:49
*/
object WriteHBaseWithNewHadoopAPI {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = sc.textFile("file:///D:/data/news_profile_data.txt")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
val jobConf = new JobConf(hbaseConf, this.getClass)
// 設(shè)置表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news")
// 如果表不存在則創(chuàng)建表
if (!admin.tableExists(TableName.valueOf("news"))) {
val desc = new HTableDescriptor(TableName.valueOf("news"))
val hcd = new HColumnDescriptor("cf1")
desc.addFamily(hcd)
admin.createTable(desc)
}
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val data = input.map(jsonStr => {
// 處理數(shù)據(jù)的邏輯
val jsonObject = JSON.parseObject(jsonStr)
val newsId = jsonObject.get("id").toString.trim
val title = jsonObject.get("title").toString.trim
val put = new Put(Bytes.toBytes(newsId))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title))
(new ImmutableBytesWritable, put)
})
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
(2) 使用saveAsHadoopDataset()
package com.bonc.rdpe.spark.hbase
import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: YangYunhe
* Description: spark 通過內(nèi)置算子寫數(shù)據(jù)到 HBase:使用saveAsHadoopDataset()
* Create: 2018/7/24 11:24
*/
object WriteHBaseWithOldHadoopAPI {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = sc.textFile("file:///D:/data/news_profile_data.txt")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
val jobConf = new JobConf(hbaseConf, this.getClass)
// 設(shè)置表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news")
jobConf.setOutputFormat(classOf[TableOutputFormat])
// 如果表不存在則創(chuàng)建表
if (!admin.tableExists(TableName.valueOf("news"))) {
val desc = new HTableDescriptor(TableName.valueOf("news"))
val hcd = new HColumnDescriptor("cf1")
desc.addFamily(hcd)
admin.createTable(desc)
}
val data = input.map(jsonStr => {
// 處理數(shù)據(jù)的邏輯
val jsonObject = JSON.parseObject(jsonStr)
val newsId = jsonObject.get("id").toString.trim
val title = jsonObject.get("title").toString.trim
val put = new Put(Bytes.toBytes(newsId))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title))
(new ImmutableBytesWritable, put)
})
data.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
以上兩個算子分別是基于Hadoop新版API和hadoop舊版API實(shí)現(xiàn)的,大部分代碼都一樣,需要注意的是新版API使用中Job類,舊版API使用JobConf類,另外導(dǎo)包的時候新版的相關(guān)jar包在org.apache.hadoop.mapreduce下,而舊版的相關(guān)jar包在org.apache.hadoop.mapred下
3. 從HBase讀數(shù)據(jù)
以下代碼使用newAPIHadoopRDD()算子
package com.bonc.rdpe.spark.hbase
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
/**
* Author: YangYunhe
* Description: spark 通過內(nèi)置算子讀取 HBase
* Create: 2018/7/23 15:22
*/
object ReadHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName(s"${this.getClass.getSimpleName}")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "172.16.13.185:2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "news")
val hBaseRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.take(10).foreach(tuple => {
val result = tuple._2
printResult(result)
})
}
def printResult(result: Result): Unit = {
val cells = result.listCells
for (cell <- cells) {
printCell(cell)
}
}
def printCell(cell: Cell): Unit = {
val str =
s"rowkey: ${Bytes.toString(CellUtil.cloneRow(cell))}, family:${Bytes.toString(CellUtil.cloneFamily(cell))}, " +
s"qualifier:${Bytes.toString(CellUtil.cloneQualifier(cell))}, value:${Bytes.toString(CellUtil.cloneValue(cell))}, " +
s"timestamp:${cell.getTimestamp}"
println(str)
}
}
需要注意的是,代碼中對ImmutableBytesWritable這個類進(jìn)行了序列化:
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))
否則程序就會報(bào)錯:
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
4. 寫數(shù)據(jù)的優(yōu)化:Bulk Load
以上寫數(shù)據(jù)的過程將數(shù)據(jù)一條條插入到Hbase中,這種方式運(yùn)行慢且在導(dǎo)入的過程的占用Region資源導(dǎo)致效率低下,所以很不適合一次性導(dǎo)入大量數(shù)據(jù),解決辦法就是使用 Bulk Load 方式批量導(dǎo)入數(shù)據(jù)。
Bulk Load 方式由于利用了 HBase 的數(shù)據(jù)信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數(shù)據(jù)格式文件,然后完成巨量數(shù)據(jù)快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產(chǎn)生巨量的寫入 I/O,所以需要較少的 CPU 和網(wǎng)絡(luò)資源。
Bulk Load 的實(shí)現(xiàn)原理是通過一個 MapReduce Job 來實(shí)現(xiàn)的,通過 Job 直接生成一個 HBase 的內(nèi)部 HFile 格式文件,用來形成一個特殊的 HBase 數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運(yùn)行的集群中。與使用HBase API相比,使用Bulkload導(dǎo)入數(shù)據(jù)占用更少的CPU和網(wǎng)絡(luò)資源。
接下來介紹在spark中如何使用 Bulk Load 方式批量導(dǎo)入數(shù)據(jù)到 HBase 中。
package com.bonc.rdpe.spark.hbase
import com.alibaba.fastjson.JSON
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: YangYunhe
* Description:
* Create: 2018/7/24 13:14
*/
object BulkLoad {
val zookeeperQuorum = "172.16.13.185:2181"
val dataSourcePath = "file:///D:/data/news_profile_data.txt"
val hdfsRootPath = "hdfs://beh/"
val hFilePath = "hdfs://beh/test/yyh/hbase/bulkload/hfile/"
val tableName = "news"
val familyName = "cf1"
val qualifierName = "title"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc = new SparkContext(sparkConf)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
val fileSystem = FileSystem.get(hadoopConf)
val hbaseConf = HBaseConfiguration.create(hadoopConf)
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
// 0. 準(zhǔn)備程序運(yùn)行的環(huán)境
// 如果 HBase 表不存在,就創(chuàng)建一個新表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(familyName)
desc.addFamily(hcd)
admin.createTable(desc)
}
// 如果存放 HFile文件的路徑已經(jīng)存在,就刪除掉
if(fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
}
// 1. 清洗需要存放到 HFile 中的數(shù)據(jù),rowKey 一定要排序,否則會報(bào)錯:
// java.io.IOException: Added a key not lexically larger than previous.
val data = sc.textFile(dataSourcePath)
.map(jsonStr => {
// 處理數(shù)據(jù)的邏輯
val jsonObject = JSON.parseObject(jsonStr)
val rowkey = jsonObject.get("id").toString.trim
val title = jsonObject.get("title").toString.trim
(rowkey, title)
})
.sortByKey()
.map(tuple => {
val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(qualifierName), Bytes.toBytes(tuple._2))
(new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
})
// 2. Save Hfiles on HDFS
val table = hbaseConn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
data.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConf
)
// 3. Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
hbaseConn.close()
fileSystem.close()
sc.stop()
}
}
說明:
- rowkey一定要進(jìn)行排序
- 上面的代碼使用了saveAsNewAPIHadoopFile(),也可以使用saveAsNewAPIHadoopDataset(),把以下代碼:
data.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConf
)
替換為:
job.getConfiguration.set("mapred.output.dir", hFilePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
即可。
參考文章: