一、背景
同步業(yè)務(wù)庫(kù)的數(shù)據(jù)到ODS層,之前一直是全量同步數(shù)據(jù),主要考慮IO太大,耗時(shí)太長(zhǎng),重復(fù)拉取同樣的數(shù)據(jù),現(xiàn)在考慮增量同步的方式實(shí)現(xiàn),同時(shí)對(duì)庫(kù)表數(shù)據(jù)做分區(qū)。
二、同步方案
增量同步主要分為兩步,第一步,存量數(shù)據(jù)一次性同步;第二步,在存量數(shù)據(jù)的基礎(chǔ)之上,做增量;后期的每一次同步都是增量同步。以下是具體同步方案:
1)MYSQL存量數(shù)據(jù)同步
用Sqoop同步表中全部數(shù)據(jù)到Hive表中;
2)MYSQL增量數(shù)據(jù)同步
a.根據(jù)hive中最大更新時(shí)間,用Sqoop提取更新時(shí)間為這個(gè)時(shí)間之后的增量數(shù)據(jù);
三、用sqoop實(shí)現(xiàn)mysql數(shù)據(jù)增量同步到數(shù)倉(cāng)
1、存量數(shù)據(jù)同步,腳本如下:
1)獲取表的所有列,把datetime和timestamp類型,統(tǒng)一在java中映射成TIMESTAMP類型,腳本如下:
getcloumns(){
a=''
b=false
cloumns=''
#sqoop eval --connect $conn --username $uname --password $pwd --query 'desc '$table
result=`sqoop eval --connect $conn --username $uname --password $pwd --query 'desc '$table `
for line in $result
do
if [ ${line} == '|' ]; then
continue
fi
if [ ${line} == '---------------------------------------------------------------------------------------------------------' ]; then
b=true
fi
if [ $ == 'false' ]; then
continue
fi
if [ ${line} == 'datetime' ] || [ ${line} == 'timestamp' ]; then
if [[ ${cloumns} != '' ]]; then
cloumns=$cloumns','
fi
cloumns=$cloumns$a'=TIMESTAMP'
fi
a=$line
done
echo $cloumns
}
2) 用sqoop import拉取數(shù)據(jù),腳本如下:
rcloumns=$(getcloumns)
hive_database="mysql_"$database
hive_table=$table
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect $conn \ #連接地址
--username $uname \
--password $pwd \
--table $table \
--delete-target-dir \ #首次同步,清空目標(biāo)目錄
--fields-terminated-by '\001' \ # 指定列分隔符
--lines-terminated-by "\n" \ # 指定行分隔符
--hive-import \ #直達(dá)hive庫(kù)表,不需要從HDFS映射建表
--hive-overwrite \ #覆蓋模式
--create-hive-table \ # sqoop自動(dòng)建表
--hive-database "$hive_database" \ #指定hive 庫(kù)
--hive-table "$hive_table" \ #指定hive表名
--m 1 \ #指定map任務(wù)個(gè)數(shù)
--hive-drop-import-delims \ # 刪除特殊字符
--null-string '\\N' \ #空字符在hive中轉(zhuǎn)義
--null-non-string '\\N' \ #空字符在hive中轉(zhuǎn)義
--map-column-hive $rcloumns
2、增量數(shù)據(jù)同步
1)創(chuàng)建增量同步的sqoop job,腳本如下:
a、從hive中獲取表的最大更新時(shí)間
sql="select max(update_time) from mysql_"$database"."$table";"
if [[ $? -ne 0 ]];then
exit 1
fi
echo $sql
result=`hive -e "$sql"`
if [[ $? -ne 0 ]];then
exit 1
fi
echo "result="$result
last_value=${result:0-19}
echo "last_value="$last_value
b、以上面獲取的最大更新時(shí)間,作為起點(diǎn),創(chuàng)建sqoop job,腳本如下:
jobname="myjob_"$database"_"$table
echo $jobname
hive_database="mysql_"$database
warehouse_dir='/user/hive/warehouse/'$hive_database'.db/'
sqoop job \
--create $jobname \
-- import \
--connect $conn \
--username $uname \
--password $pwd \
--table $table \
--fields-terminated-by '\001' \
--lines-terminated-by "\n" \
--input-fields-terminated-by '\001' \
--input-lines-terminated-by "\n" \
--warehouse-dir $warehouse_dir \
--incremental lastmodified \
--check-column update_time \
--merge-key id \
--last-value "$last_value" \
--m 1 \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N' \
--input-null-string '\\N' \
--input-null-non-string '\\N'
c、創(chuàng)建sqoop job之后,就是執(zhí)行job了,腳本如下:
sqoop job -exec jobName
具體參數(shù)詳解,參考:https://www.cnblogs.com/Alcesttt/p/11432547.html