一、背景
隨著monggo中數(shù)據(jù)量越來越大,全量同步到數(shù)倉,已不太現(xiàn)實(shí),考慮增量同步的方式,我們在探索增量同步的過程中,方案不斷在改進(jìn)優(yōu)化,這里記錄一下我們mongo增量同步的變遷史吧
二、方案一,通過BSON文件映射到臨時表,然后insert overwrite到正式表
具體思路是:首先針對存量數(shù)據(jù),通過mongodump,dump一份完整的bson文件,put到HDFS,然后建一個原始表映射到對應(yīng)的bson文件,然后通過insert overwrite table final_table select * from origin_table轉(zhuǎn)存到正式表,順便還可以建分區(qū),樣例腳本如下:
1、先從monggo dump一份bson文件到本地
mongodump --host $host --port $port --username=$username --password=$password --collection $coll --db $db --out ${localPath}/ --authenticationDatabase=$db
2、然后從本地put到HDFS
hadoop fs -put ${localPath}/$db/$coll.bson $hdfsPath/$db/$coll/
3、然后在hive中建表,跟HDFS上的bson做映射
CREATE external TABLE if not exists table_origin(
`_id` string,
`batch` string,
`content` string,
`createtime` timestamp,
`mobile` string,
`type` string,
`updatetime` timestamp
)
comment '注釋'
row format serde 'com.mongodb.hadoop.hive.BSONSerDe'
stored as inputformat 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
outputformat 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
location 'HDFS上bson文件所在目錄';
4、創(chuàng)建正式表,指定存儲目錄
CREATE TABLE if not exists table(
`_id` string,
`batch` string,
`content` string,
`createtime` timestamp,
`mobile` string,
`type` string,
`updatetime` timestamp,
)
comment '注釋'
partitioned by (pyear int,pmonth int,pday int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/user/hive/warehouse/mongodb/data/sdk_pro/table'
TBLPROPERTIES ( 'orc.compress'='snappy');
5、insert overwrite到正式表
INSERT INTO TABLE table
PARTITION (pyear,pmonth,pday)
SELECT
t.`_id` ,
t.`batch` ,
t.`content` ,
t.`createtime` ,
t.`mobile` ,
t.`type` ,
t.`updatetime` ,
year(t.createTime) pyear,month(t.createTime) pmonth,day(t.createTime) pday from table_origin t;
6、對于增量數(shù)據(jù),根據(jù)條件dump增量數(shù)據(jù),put到HDFS,建臨時表(腳本同上),關(guān)鍵的不同點(diǎn)在于合并增量數(shù)據(jù),合并腳本如下:
--合并數(shù)據(jù)到總表
with t_delta as (SELECT t.*,year(t.createAt) pyear,month(t.createAt) pmonth,day(t.createAt) pday from ${table_name} t),
t_base as (select b.* from sdk_call_nxcloud_voice_sms b where b.pyear =${pt_year} and b.pmonth = ${pt_month} and b.pday = ${pt_day})
INSERT OVERWRITE TABLE sdk_call_nxcloud_voice_sms
PARTITION (pyear,pmonth,pday)
select
coalesce(base.id, delta.id) id,
if(delta.id is NULL, base.countryCode,delta.countryCode) countryCode,
if(delta.id is NULL, base.voiceType,delta.voiceType) voiceType,
if(delta.id is NULL, base.messageid,delta.messageid) messageid,
if(delta.id is NULL, base.thirdNotifyState,delta.thirdNotifyState) thirdNotifyState,
if(delta.id is NULL, base.firstData,delta.firstData) firstData,
if(delta.id is NULL, base.secondData,delta.secondData) secondData,
if(delta.id is NULL, base.pyear,delta.pyear) pyear,
if(delta.id is NULL, base.pmonth,delta.pmonth) pmonth,
if(delta.id is NULL, base.pday,delta.pday) pday
from t_base base
full outer join t_delta delta on base.pyear = delta.pyear and base.pmonth = delta.pmonth and base.pday = delta.pday and base.id = delta.id;
三、存在的問題
此方案雖然簡單易懂易上手,但是過程復(fù)雜,重復(fù)占用大量存儲空間,有待改進(jìn)。