mysql數(shù)據(jù)增量同步到hive

一、背景

同步業(yè)務(wù)庫(kù)的數(shù)據(jù)到ODS層,之前一直是全量同步數(shù)據(jù),主要考慮IO太大,耗時(shí)太長(zhǎng),重復(fù)拉取同樣的數(shù)據(jù),現(xiàn)在考慮增量同步的方式實(shí)現(xiàn),同時(shí)對(duì)庫(kù)表數(shù)據(jù)做分區(qū)。

二、同步方案

增量同步主要分為兩步,第一步,存量數(shù)據(jù)一次性同步;第二步,在存量數(shù)據(jù)的基礎(chǔ)之上,做增量;后期的每一次同步都是增量同步。以下是具體同步方案:

1)MYSQL存量數(shù)據(jù)同步

用Sqoop同步表中全部數(shù)據(jù)到Hive表中;

2)MYSQL增量數(shù)據(jù)同步

a.根據(jù)hive中最大更新時(shí)間,用Sqoop提取更新時(shí)間為這個(gè)時(shí)間之后的增量數(shù)據(jù);

三、用sqoop實(shí)現(xiàn)mysql數(shù)據(jù)增量同步到數(shù)倉(cāng)

1、存量數(shù)據(jù)同步,腳本如下:

1)獲取表的所有列,把datetime和timestamp類型,統(tǒng)一在java中映射成TIMESTAMP類型,腳本如下:

getcloumns(){
    a=''
    b=false
    cloumns=''
    #sqoop eval  --connect $conn --username $uname --password $pwd --query 'desc '$table
    result=`sqoop eval  --connect $conn --username $uname --password $pwd --query 'desc '$table `
    for line in $result
    do
        if [ ${line} == '|' ]; then
            continue
        fi
        if [ ${line} == '---------------------------------------------------------------------------------------------------------' ]; then
            b=true
        fi
        if [ $ == 'false' ]; then
            continue
        fi
        
        if [ ${line} == 'datetime' ] || [ ${line} == 'timestamp' ]; then
        if [[ ${cloumns} != '' ]]; then
            cloumns=$cloumns','
        fi
            cloumns=$cloumns$a'=TIMESTAMP'
        fi
        a=$line
    done
    echo $cloumns
}

2) 用sqoop import拉取數(shù)據(jù),腳本如下:

rcloumns=$(getcloumns)
hive_database="mysql_"$database
hive_table=$table

sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect $conn \  #連接地址
--username $uname \ 
--password $pwd \
--table $table \
--delete-target-dir \ #首次同步,清空目標(biāo)目錄
--fields-terminated-by '\001' \ # 指定列分隔符
--lines-terminated-by "\n" \ # 指定行分隔符
--hive-import \ #直達(dá)hive庫(kù)表,不需要從HDFS映射建表
--hive-overwrite \  #覆蓋模式
--create-hive-table \ # sqoop自動(dòng)建表
--hive-database "$hive_database" \ #指定hive 庫(kù)
--hive-table "$hive_table" \ #指定hive表名
--m 1 \ #指定map任務(wù)個(gè)數(shù)
--hive-drop-import-delims \ # 刪除特殊字符
--null-string '\\N' \ #空字符在hive中轉(zhuǎn)義
--null-non-string '\\N' \ #空字符在hive中轉(zhuǎn)義
--map-column-hive $rcloumns

2、增量數(shù)據(jù)同步

1)創(chuàng)建增量同步的sqoop job,腳本如下:
a、從hive中獲取表的最大更新時(shí)間

sql="select max(update_time) from mysql_"$database"."$table";"
if [[ $? -ne 0 ]];then
       exit 1
fi
echo $sql
result=`hive -e "$sql"`
if [[ $? -ne 0 ]];then
       exit 1
fi
echo "result="$result
last_value=${result:0-19}
echo "last_value="$last_value

b、以上面獲取的最大更新時(shí)間,作為起點(diǎn),創(chuàng)建sqoop job,腳本如下:

jobname="myjob_"$database"_"$table
echo $jobname
hive_database="mysql_"$database
warehouse_dir='/user/hive/warehouse/'$hive_database'.db/'

sqoop job \
--create $jobname \
-- import  \
--connect $conn \
--username $uname \
--password $pwd \
--table $table \
--fields-terminated-by '\001' \
--lines-terminated-by "\n" \
--input-fields-terminated-by '\001' \
--input-lines-terminated-by "\n" \
--warehouse-dir $warehouse_dir \
--incremental lastmodified \
--check-column update_time \
--merge-key id \
--last-value "$last_value" \
--m 1 \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N'  \
--input-null-string '\\N' \
--input-null-non-string '\\N' 

c、創(chuàng)建sqoop job之后,就是執(zhí)行job了,腳本如下:

sqoop job -exec jobName

具體參數(shù)詳解,參考:https://www.cnblogs.com/Alcesttt/p/11432547.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容