有一個(gè)設(shè)想
當(dāng)有持續(xù)不斷的結(jié)構(gòu)化或非結(jié)構(gòu)化大數(shù)據(jù)集以流(streaming)的方式進(jìn)入分布式計(jì)算平臺(tái), 能夠保存在大規(guī)模分布式存儲(chǔ)上,并且能夠提供準(zhǔn)實(shí)時(shí)SQL查詢(xún),這個(gè)系統(tǒng)多少人求之不得。
今天,咱們就來(lái)介紹一下這個(gè)計(jì)算框架和過(guò)程。
問(wèn)題分解一下
數(shù)據(jù)哪里來(lái)?
假設(shè),你已經(jīng)有一個(gè)數(shù)據(jù)收集的引擎或工具(不在本博客討論范圍內(nèi),請(qǐng)出門(mén)左轉(zhuǎn)Google右轉(zhuǎn)百度),怎么都行, 反正數(shù)據(jù)能以流的方式給出來(lái),塞進(jìn)Kafka類(lèi)似的消息系統(tǒng)。
結(jié)構(gòu)化?非結(jié)構(gòu)化?如何識(shí)別業(yè)務(wù)信息?
關(guān)于結(jié)構(gòu)化或非結(jié)構(gòu)化,也不在今天的主要討論范圍,但是,必須要說(shuō)明的是, 你的數(shù)據(jù)能夠以某種規(guī)則進(jìn)行正則化,比如:空格分隔,CSV,JSON等。咱們今天以Apache網(wǎng)站日志數(shù)據(jù)作為參照。
類(lèi)似如下:
124.67.32.161 - - [10/Apr/2016:05:37:36 +0800] "GET /blog/app_backend.html HTTP/1.1" 200 26450
如何處理?寫(xiě)到哪里去?
拿到數(shù)據(jù),我們需要一些處理,將業(yè)務(wù)邏輯分離開(kāi)來(lái),做成二維表,行列分明,就像是關(guān)系型數(shù)據(jù)庫(kù)的表。這個(gè)事情有Spark DataFrame來(lái)完成。
就像寫(xiě)入關(guān)系型數(shù)據(jù)庫(kù)一樣,我們需要將DataFrame寫(xiě)入某處,這里,就是Parquet文件,天然支持schema,太棒了。
怎么取出來(lái)?還能是SQL?
我們的數(shù)據(jù)已經(jīng)被當(dāng)做“二維表,Table”寫(xiě)入了Parquet,取出來(lái)當(dāng)然也得是“表”或其他什么的,當(dāng)然最好是能暴露出JDBC SQL,相關(guān)人員使用起來(lái)就方便了。
這個(gè)事情交給Spark的 SparkThriftServer 來(lái)完成。
設(shè)計(jì)藍(lán)圖
以上分解似乎完美,一起來(lái)看看“設(shè)計(jì)框架”或“藍(lán)圖”。
[圖片上傳失敗...(image-fe3774-1542717299721)]
算了,不解釋了,圖,自己看。
Coding Style
從Kafka Stream獲取數(shù)據(jù)
// 從Kafka Stream獲取數(shù)據(jù)
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
寫(xiě)入Parquet
accessLogsDStream.foreachRDD(rdd -> {
// 如果DF不為空,寫(xiě)入(增加模式)到Parquet文件
DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
if (df.count() > 0) {
df.write().mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());
}
return null;
});
創(chuàng)建Hive表
使用spark-shell,獲取Parquet文件, 寫(xiě)入一個(gè)臨時(shí)表;
scala代碼如下:
import sqlContext.implicits._
val parquetFile = sqlContext.read.parquet("/user/spark/apachelog.parquet")
parquetFile.registerTempTable("logs")
復(fù)制schema到新表鏈接到Parquet文件。
在Hive中復(fù)制表,這里你會(huì)發(fā)現(xiàn),文件LOCATION位置還是原來(lái)的路徑,目的就是這個(gè),使得新寫(xiě)入的文件還在Hive模型中。
我總覺(jué)得這個(gè)方法有問(wèn)題,是不是哪位Hive高人指點(diǎn)一下,有沒(méi)有更好的辦法來(lái)完成這個(gè)工作?
CREATE EXTERNAL TABLE apachelog LIKE logs STORED AS PARQUET LOCATION '/user/spark/apachelog.parquet';
啟動(dòng)你的SparkThriftServer
當(dāng)然,在集群中啟用ThriftServer是必須的工作,SparkThriftServer其實(shí)暴露的是Hive2服務(wù)器,用JDBC驅(qū)動(dòng)就可以訪問(wèn)了。
我們都想要的結(jié)果
本博客中使用的SQL查詢(xún)工具是SQuirreL SQL,具體JDBC配置方法請(qǐng)參照前面說(shuō)的向左向右轉(zhuǎn)。
[圖片上傳失敗...(image-5c31f4-1542717299719)]
結(jié)果看似簡(jiǎn)單,但是經(jīng)歷還是很有挑戰(zhàn)的。
至此,本例已完成。完成代碼見(jiàn) GitHub