MongoDB數(shù)據(jù)增量同步到Hive(方案一通過BSON文件映射)

一、背景

隨著monggo中數(shù)據(jù)量越來越大,全量同步到數(shù)倉,已不太現(xiàn)實(shí),考慮增量同步的方式,我們在探索增量同步的過程中,方案不斷在改進(jìn)優(yōu)化,這里記錄一下我們mongo增量同步的變遷史吧

二、方案一,通過BSON文件映射到臨時表,然后insert overwrite到正式表

具體思路是:首先針對存量數(shù)據(jù),通過mongodump,dump一份完整的bson文件,put到HDFS,然后建一個原始表映射到對應(yīng)的bson文件,然后通過insert overwrite table final_table select * from origin_table轉(zhuǎn)存到正式表,順便還可以建分區(qū),樣例腳本如下:
1、先從monggo dump一份bson文件到本地

 mongodump --host $host --port $port --username=$username  --password=$password --collection $coll --db $db --out ${localPath}/ --authenticationDatabase=$db

2、然后從本地put到HDFS

hadoop fs -put ${localPath}/$db/$coll.bson  $hdfsPath/$db/$coll/

3、然后在hive中建表,跟HDFS上的bson做映射

CREATE external TABLE if not exists table_origin(
`_id` string, 
`batch` string, 
`content` string, 
`createtime` timestamp, 
`mobile` string, 
`type` string, 
`updatetime` timestamp
)
comment '注釋'
row format serde 'com.mongodb.hadoop.hive.BSONSerDe'
stored as inputformat 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
outputformat 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
location 'HDFS上bson文件所在目錄';

4、創(chuàng)建正式表,指定存儲目錄

CREATE TABLE if not exists table(
`_id` string, 
`batch` string, 
`content` string, 
`createtime` timestamp, 
`mobile` string,
`type` string, 
`updatetime` timestamp,
)
comment '注釋'
partitioned by (pyear int,pmonth int,pday int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/user/hive/warehouse/mongodb/data/sdk_pro/table'
TBLPROPERTIES ( 'orc.compress'='snappy'); 

5、insert overwrite到正式表

INSERT INTO TABLE table
PARTITION (pyear,pmonth,pday)
SELECT 
t.`_id` , 
t.`batch` , 
t.`content` , 
t.`createtime` , 
t.`mobile` ,
t.`type` , 
t.`updatetime` ,
year(t.createTime) pyear,month(t.createTime) pmonth,day(t.createTime) pday from table_origin t;

6、對于增量數(shù)據(jù),根據(jù)條件dump增量數(shù)據(jù),put到HDFS,建臨時表(腳本同上),關(guān)鍵的不同點(diǎn)在于合并增量數(shù)據(jù),合并腳本如下:

--合并數(shù)據(jù)到總表
with t_delta as (SELECT t.*,year(t.createAt) pyear,month(t.createAt) pmonth,day(t.createAt) pday from ${table_name} t),
t_base as (select b.* from sdk_call_nxcloud_voice_sms b where b.pyear =${pt_year} and b.pmonth = ${pt_month} and b.pday = ${pt_day})
INSERT OVERWRITE TABLE sdk_call_nxcloud_voice_sms
PARTITION (pyear,pmonth,pday)
select  
coalesce(base.id, delta.id) id,
if(delta.id is NULL, base.countryCode,delta.countryCode) countryCode,
if(delta.id is NULL, base.voiceType,delta.voiceType) voiceType,
if(delta.id is NULL, base.messageid,delta.messageid) messageid,
if(delta.id is NULL, base.thirdNotifyState,delta.thirdNotifyState) thirdNotifyState,
if(delta.id is NULL, base.firstData,delta.firstData) firstData,
if(delta.id is NULL, base.secondData,delta.secondData) secondData,
if(delta.id is NULL, base.pyear,delta.pyear) pyear,
if(delta.id is NULL, base.pmonth,delta.pmonth) pmonth,
if(delta.id is NULL, base.pday,delta.pday) pday 
from t_base base 
full outer join t_delta delta on base.pyear = delta.pyear and base.pmonth = delta.pmonth and base.pday = delta.pday and base.id = delta.id;

三、存在的問題

此方案雖然簡單易懂易上手,但是過程復(fù)雜,重復(fù)占用大量存儲空間,有待改進(jìn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容