# 變量設(shè)置,之后應(yīng)該是傳入?yún)?shù)
mdb='kaipao'
hdb='zhengyuan'
table='water_friend_rel'
check_col='create_time'
ds='2019-04-22'
# 1.判斷hive中是否有分區(qū)表
hive -e "show columns from ${hdb}.${table}_di" |grep -v 'WARN:' > tmp1.txt
a=`cat tmp1.txt`
# 2.判斷時(shí)間戳的位數(shù)
tf=`cat ${mdb}.${table}.timestamp`
if [ -n "${tf}" ]; then
echo "時(shí)間戳長(zhǎng)度文件存在"
l=${#tf}
else
echo "時(shí)間戳長(zhǎng)度文件不存在,需創(chuàng)建"
mysql -u datateam -pRJ4f84S4RjfRxRMm -h172.16.8.4 -A ${mdb} -e "select max(${check_col}) from ${mdb}.${table} where ${check_col} >unix_timestamp(date_sub('${ds}', interval 30 day))" |awk 'NR>1' >${mdb}.${table}.timestamp
tf=`cat ${mdb}.${table}.timestamp`
l=${#m1}
fi
# 編寫語(yǔ)句
if [[ ! -n "$a" && l -eq 13 ]]; then
echo "全量導(dǎo)入"
#錄入全量導(dǎo)入的代碼
hive -e "drop table if exists ${hdb}.${table}"
sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --table ${table} --compression-codec=snappy --as-parquetfile -m 2 --hive-import --hive-overwrite --hive-database ${hdb}
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
b=`cat tmp1.txt| wc -l`
hive -e "show create table ${hdb}.${table};" >1
c=`head -$[$+1] 1`
# 把MySQL查詢的固定字段拿出來(lái)
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns
sed -i '2,500s/^/,/' ${hdb}.${table}.columns
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"
hive -e "${c} partitioned by (ds string) stored as parquet;"
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_di;"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=20000;
set hive.support.quoted.identifiers=none;
INSERT OVERWRITE TABLE ${hdb}.${table}_di partition(ds)
select *,to_date(cast(${check_col}/1000 as timestamp)) from ${hdb}.${table}_tmp;"
hive -e "drop table ${hdb}.${table}_tmp;"
elif [[ -n "$a" && l -eq 13 ]]; then
echo "增量導(dǎo)入,有表結(jié)構(gòu),歷史有數(shù)據(jù),本分區(qū)有數(shù)據(jù)"
hive -e "alter table ${hdb}.${table}_di drop partition (ds='${ds}');"
hive -e "alter table ${hdb}.${table}_di add partition (ds='${ds}');"
hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
j1=`head -1 tmp2.txt`
j=$[$j1*1000]
f=`cat ${hdb}.${table}.columns`
# 大于等于這個(gè)分區(qū)的最小值 小于等于這個(gè)分區(qū)的最大值
CONDITIONS=''
g="select ${f} from ${table}"
sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --query " ${g} where ${check_col} > unix_timestamp('${ds}')*1000 and ${check_col} <= unix_timestamp(date_add('${ds}', interval 1 day))*1000 and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}_di/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1
elif [[ ! -n "$a" && l -eq 10 ]]; then
echo "全量導(dǎo)入"
#錄入全量導(dǎo)入的代碼
hive -e "drop table if exists ${hdb}.${table}"
sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --table ${table} --compression-codec=snappy --as-parquetfile -m 2 --hive-import --hive-overwrite --hive-database ${hdb}
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
b=`cat tmp1.txt| wc -l`
hive -e "show create table ${hdb}.${table};" >1
c=`head -$[$+1] 1`
# 把MySQL查詢的固定字段拿出來(lái)
hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns
sed -i '2,500s/^/,/' ${hdb}.${table}.columns
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"
hive -e "${c} partitioned by (ds string) stored as parquet;"
hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_di;"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=20000;
set hive.support.quoted.identifiers=none;
INSERT OVERWRITE TABLE ${hdb}.${table}_di partition(ds)
select *,to_date(cast(${check_col}*1.0 as timestamp)) from ${hdb}.${table}_tmp;"
hive -e "drop table ${hdb}.${table}_tmp;"
elif [[ -n "$a" && l -eq 10 ]]; then
echo "增量導(dǎo)入,有表結(jié)構(gòu),歷史有數(shù)據(jù),本分區(qū)有數(shù)據(jù)"
hive -e "alter table ${hdb}.${table}_di drop partition (ds='${ds}');"
hive -e "alter table ${hdb}.${table}_di add partition (ds='${ds}');"
hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
j=`head -1 tmp2.txt`
f=`cat ${hdb}.${table}.columns`
# 大于等于這個(gè)分區(qū)的最小值 小于等于這個(gè)分區(qū)的最大值
CONDITIONS=''
g="select ${f} from ${table}"
sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --query " ${g} where ${check_col} > unix_timestamp('${ds}') and ${check_col} <= unix_timestamp(date_add('${ds}', interval 1 day)) and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}_di/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1
else
echo "其他異常"
fi
sqoop增量導(dǎo)入并按時(shí)間分區(qū)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- Sqoop支持兩種方式的全量數(shù)據(jù)導(dǎo)入和增量數(shù)據(jù)導(dǎo)入,同時(shí)可以指定數(shù)據(jù)是否以并發(fā)形式導(dǎo)入。下面依次來(lái)看: 全量數(shù)據(jù)導(dǎo)...
- (2018-06-19-周二 22:03:57)
- sqoop是可以配置job自動(dòng)運(yùn)行的,能自動(dòng)記錄上次同步的時(shí)間,不過如果任務(wù)失敗就不方便重跑了(這方面經(jīng)驗(yàn)不足)。...
- 需求 有2張大的mysql表,量級(jí)分別是1億和4.5億(太大了,DBA的同學(xué)正在考慮分表),而且數(shù)據(jù)是增量的,需要...
- (2017-11-15-周三 23:02:19) ①以百分比的方式,定義導(dǎo)入元素的優(yōu)先級(jí)(Alt+P) 注:0%是...