這里的SparkSQL是指整合了Hive的spark-sql cli(關(guān)于SparkSQL和Hive的整合,見文章后面的參考閱讀).
本質(zhì)上就是通過Hive訪問HBase表,具體就是通過hive-hbase-handler .
hadoop-2.3.0-cdh5.0.0
apache-hive-0.13.1-bin
spark-1.4.0-bin-hadoop2.3
hbase-0.96.1.1-cdh5.0.0
部署情況如下圖:

測試集群,將Spark Worker部署在每臺DataNode上,是為了最大程度的任務(wù)本地化,Spark集群為Standalone模式部署。
其中有三臺機(jī)器上也部署了RegionServer。
這個(gè)部署情況對理解后面提到的任務(wù)本地化調(diào)度有幫助。
1. 拷貝以下HBase的相關(guān)jar包到Spark Master和每個(gè)Spark Worker節(jié)點(diǎn)上的$SPARK_HOME/lib目錄下.
(我嘗試用–jars的方式添加之后,不work,所以采用這種土辦法)
$HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/htrace-core-2.01.jar
$HBASE_HOME/lib/protobuf-java-2.5.0.jar
$HBASE_HOME/lib/guava-12.0.1.jar
$HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar
2.配置每個(gè)節(jié)點(diǎn)上的$SPARK_HOME/conf/spark-env.sh,將上面的jar包添加到SPARK_CLASSPATH
export?SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/htrace-core-2.01.jar:
$SPARK_HOME/lib/protobuf-java-2.5.0.jar:
$SPARK_HOME/lib/guava-12.0.1.jar:
$SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:
${SPARK_CLASSPATH}
3.將hbase-site.xml拷貝至${HADOOP_CONF_DIR},由于spark-env.sh中配置了Hadoop配置文件目錄${HADOOP_CONF_DIR},因此會(huì)將hbase-site.xml加載。
hbase-site.xml中主要是以下幾個(gè)參數(shù)的配置:
hbase.zookeeper.quorum
zkNode1:2181,zkNode2:2181,zkNode3:2181
HBase使用的zookeeper節(jié)點(diǎn)
hbase.client.scanner.caching
5000
HBase客戶端掃描緩存,對查詢性能有很大幫助
另外還有一個(gè)參數(shù):zookeeper.znode.parent=/hbase
是HBase在zk中的根目錄,默認(rèn)為/hbase,視實(shí)際情況進(jìn)行配置。
4.重啟Spark集群。
?大數(shù)據(jù)學(xué)習(xí)交流群:724693112 歡迎想學(xué)習(xí)大數(shù)據(jù)和需要大數(shù)據(jù)學(xué)習(xí)資料的同學(xué)來一起學(xué)習(xí)。
hbase中有表lxw1234,數(shù)據(jù)如下:
hbase(main):025:0*?scan?'lxw1234'
ROW COLUMN+CELL
lxw1234.com column=f1:c1,?timestamp=1435624625198,?value=name1
lxw1234.com column=f1:c2,?timestamp=1435624591717,?value=name2
lxw1234.com column=f2:c1,?timestamp=1435624608759,?value=age1
lxw1234.com column=f2:c2,?timestamp=1435624635261,?value=age2
lxw1234.com column=f3:c1,?timestamp=1435624662282,?value=job1
lxw1234.com column=f3:c2,?timestamp=1435624697028,?value=job2
lxw1234.com column=f3:c3,?timestamp=1435624697065,?value=job3
1?row(s)?in?0.0350?seconds
進(jìn)入spark-sql,使用如下語句建表:
CREATE EXTERNAL TABLE lxw1234?(
rowkey?string,
f1 map,
f2 map,
f3 map
)?STORED BY?'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES?("hbase.columns.mapping"?=?":key,f1:,f2:,f3:")
TBLPROPERTIES?("hbase.table.name"?=?"lxw1234");
建好之后,就可以查詢了:
spark-sql>?select?*?from?lxw1234;
lxw1234.com?{"c1":"name1","c2":"name2"}?{"c1":"age1","c2":"age2"}?{"c1":"job1","c2":"job2","c3":"job3"}
Time?taken:?4.726?seconds,?Fetched?1?row(s)
spark-sql>?select?count(1)?from?lxw1234;
1
Time?taken:?2.46?seconds,?Fetched?1?row(s)
spark-sql>
大表查詢,消耗的時(shí)間和通過Hive用MapReduce查詢差不多。
spark-sql>?select?count(1)?from?lxw1234_hbase;
53609638
Time?taken:?335.474?seconds,?Fetched?1?row(s)
在spark-sql中通過insert插入數(shù)據(jù)到HBase表時(shí)候報(bào)錯(cuò):
INSERT INTO TABLE lxw1234
SELECT?'row1'?AS rowkey,
map('c3','name3')?AS f1,
map('c3','age3')?AS f2,
map('c4','job3')?AS f3
FROM lxw1234_a
limit?1;
org.apache.spark.SparkException:?Job?aborted due to stage failure:?Task?0?in?stage?10.0?failed?4?times,
most recent failure:?Lost?task?0.3?in?stage?10.0?(TID?23,?slave013.uniclick.cloud):
java.lang.ClassCastException:?org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat?cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Driver?stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
這個(gè)還有待分析。
先看這張圖,該圖為運(yùn)行select * from lxw1234_hbase;這張大表查詢時(shí)候的任務(wù)運(yùn)行圖。

Spark和Hadoop MapReduce一樣,在任務(wù)調(diào)度時(shí)候都會(huì)考慮數(shù)據(jù)本地化,即”任務(wù)向數(shù)據(jù)靠攏”,盡量將任務(wù)分配到數(shù)據(jù)所在的節(jié)點(diǎn)上運(yùn)行。
基于這點(diǎn),lxw1234_hbase為HBase中的外部表,Spark在解析時(shí)候,通過org.apache.hadoop.hive.hbase.HBaseStorageHandler獲取到表lxw1234_hbase在HBase中的region所在的RegionServer,即:slave004、slave005、slave006 (上面的部署圖中提到了,總共只有三臺RegionServer,就是這三臺),所以,在調(diào)度任務(wù)時(shí)候,首先考慮要往這三臺節(jié)點(diǎn)上分配任務(wù)。
表lxw1234_hbase共有10個(gè)region,因此需要10個(gè)map task來運(yùn)行。
再看一張圖,這是spark-sql cli指定的Executor配置:

每臺機(jī)器上Worker的實(shí)例為2個(gè),每個(gè)Worker實(shí)例中運(yùn)行的Executor為1個(gè),因此,每臺機(jī)器上運(yùn)行兩個(gè)Executor.
那么salve004、slave005、slave006上各運(yùn)行2個(gè)Executor,總共6個(gè),很好,Spark已經(jīng)第一時(shí)間將這6個(gè)Task交給這6個(gè)Executor去執(zhí)行了(NODE_LOCAL Tasks)。
剩下4個(gè)Task,沒辦法,想NODE_LOCAL運(yùn)行,但那三臺機(jī)器上沒有剩余的Executor了,只能分配給其他Worker上的Executor,這4個(gè)Task為ANY Tasks。
正如那張任務(wù)運(yùn)行圖中所示。
通過Hive和spark-sql去訪問HBase表,只是為統(tǒng)計(jì)分析提供了一定的便捷性,個(gè)人覺得性能上的優(yōu)勢并不明顯。
可能Spark通過API去讀取HBase數(shù)據(jù),性能更好些吧,以后再試。
另外,spark-sql有一點(diǎn)好處,就是可以先把HBase中的數(shù)據(jù)cache到一張內(nèi)存表中,然后在這張內(nèi)存表中,
通過SQL去統(tǒng)計(jì)分析,那就爽多了。