sqoop增量導(dǎo)入并按時(shí)間分區(qū)

# 變量設(shè)置,之后應(yīng)該是傳入?yún)?shù)

mdb='kaipao'
hdb='zhengyuan'
table='water_friend_rel'
check_col='create_time'
ds='2019-04-22'


# 1.判斷hive中是否有分區(qū)表

hive -e "show columns from ${hdb}.${table}_di" |grep -v 'WARN:' > tmp1.txt
a=`cat tmp1.txt`

# 2.判斷時(shí)間戳的位數(shù)
tf=`cat ${mdb}.${table}.timestamp`
if [ -n "${tf}" ]; then
    echo "時(shí)間戳長(zhǎng)度文件存在"
    l=${#tf}
else
    echo "時(shí)間戳長(zhǎng)度文件不存在,需創(chuàng)建"
    mysql -u datateam -pRJ4f84S4RjfRxRMm -h172.16.8.4 -A ${mdb}  -e "select max(${check_col}) from ${mdb}.${table} where ${check_col} >unix_timestamp(date_sub('${ds}', interval 30 day))" |awk 'NR>1' >${mdb}.${table}.timestamp
    tf=`cat ${mdb}.${table}.timestamp`
    l=${#m1}
fi


# 編寫語(yǔ)句


if [[ ! -n "$a" && l -eq 13 ]]; then
  echo "全量導(dǎo)入"
  #錄入全量導(dǎo)入的代碼
  hive -e "drop table if exists ${hdb}.${table}"
  sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --table ${table}   --compression-codec=snappy     --as-parquetfile  -m 2 --hive-import --hive-overwrite     --hive-database ${hdb}

  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
  b=`cat tmp1.txt| wc -l`

  hive -e "show create table ${hdb}.${table};" >1 
  c=`head -$[$+1] 1`
  # 把MySQL查詢的固定字段拿出來(lái)
  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns

  sed -i '2,500s/^/,/' ${hdb}.${table}.columns

  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"

  hive -e "${c} partitioned by (ds string) stored as parquet;"
  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_di;"

  hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
           set hive.exec.dynamic.partition=true;
           set hive.exec.max.dynamic.partitions.pernode=10000;
           set hive.exec.max.dynamic.partitions=20000;
           set hive.support.quoted.identifiers=none;
           INSERT OVERWRITE TABLE ${hdb}.${table}_di partition(ds) 
           select *,to_date(cast(${check_col}/1000 as timestamp)) from ${hdb}.${table}_tmp;"

  hive -e "drop table ${hdb}.${table}_tmp;"


elif [[ -n "$a" && l -eq 13 ]]; then
  echo "增量導(dǎo)入,有表結(jié)構(gòu),歷史有數(shù)據(jù),本分區(qū)有數(shù)據(jù)"

  hive -e "alter table ${hdb}.${table}_di drop partition (ds='${ds}');"
  hive -e "alter table ${hdb}.${table}_di add partition (ds='${ds}');"
  
  hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
  j1=`head -1 tmp2.txt`
  j=$[$j1*1000]
  f=`cat ${hdb}.${table}.columns`
  # 大于等于這個(gè)分區(qū)的最小值 小于等于這個(gè)分區(qū)的最大值
  CONDITIONS=''

  g="select ${f} from ${table}"
  sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --query " ${g} where ${check_col} > unix_timestamp('${ds}')*1000 and ${check_col} <= unix_timestamp(date_add('${ds}', interval 1 day))*1000 and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}_di/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1


elif [[ ! -n "$a" && l -eq 10 ]]; then
  echo "全量導(dǎo)入"
  #錄入全量導(dǎo)入的代碼
  hive -e "drop table if exists ${hdb}.${table}"
  sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --table ${table}   --compression-codec=snappy     --as-parquetfile  -m 2 --hive-import --hive-overwrite     --hive-database ${hdb}

  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > tmp1.txt
  b=`cat tmp1.txt| wc -l`

  hive -e "show create table ${hdb}.${table};" >1 
  c=`head -$[$+1] 1`
  # 把MySQL查詢的固定字段拿出來(lái)
  hive -e "show columns from ${hdb}.${table}" |grep -v 'WARN:' > ${hdb}.${table}.columns

  sed -i '2,500s/^/,/' ${hdb}.${table}.columns

  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_tmp;"

  hive -e "${c} partitioned by (ds string) stored as parquet;"
  hive -e "alter table ${hdb}.${table} rename to ${hdb}.${table}_di;"

  hive -e "set hive.exec.dynamic.partition.mode=nonstrict;
           set hive.exec.dynamic.partition=true;
           set hive.exec.max.dynamic.partitions.pernode=10000;
           set hive.exec.max.dynamic.partitions=20000;
           set hive.support.quoted.identifiers=none;
           INSERT OVERWRITE TABLE ${hdb}.${table}_di partition(ds) 
           select *,to_date(cast(${check_col}*1.0 as timestamp)) from ${hdb}.${table}_tmp;"

  hive -e "drop table ${hdb}.${table}_tmp;"


elif [[ -n "$a" && l -eq 10 ]]; then
  echo "增量導(dǎo)入,有表結(jié)構(gòu),歷史有數(shù)據(jù),本分區(qū)有數(shù)據(jù)"

  hive -e "alter table ${hdb}.${table}_di drop partition (ds='${ds}');"
  hive -e "alter table ${hdb}.${table}_di add partition (ds='${ds}');"
  
  hive -e "select unix_timestamp(concat('${ds}',' 00:00:00'));" >tmp2.txt
  j=`head -1 tmp2.txt`
  f=`cat ${hdb}.${table}.columns`
  # 大于等于這個(gè)分區(qū)的最小值 小于等于這個(gè)分區(qū)的最大值
  CONDITIONS=''

  g="select ${f} from ${table}"
  sqoop import --connect jdbc:mysql://172.16.8.4:3306/${mdb}?tinyInt1isBit=false --username datateam --password RJ4f84S4RjfRxRMm --query " ${g} where ${check_col} > unix_timestamp('${ds}') and ${check_col} <= unix_timestamp(date_add('${ds}', interval 1 day)) and \$CONDITIONS" --compression-codec=snappy --as-parquetfile --target-dir /user/hive/warehouse/${hdb}.db/${table}_di/ds=${ds} --incremental append --check-column ${check_col} --last-value ${j} -m 1


else
  echo "其他異常"
fi
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容