優(yōu)雅地使用 DataX 和 Python 同步數(shù)據(jù)

0. 前置條件

  • MySQL數(shù)據(jù)庫
  • Python環(huán)境

1. 下載 DataX 工具 (可選)

cd /opt/
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
tar -zxvf datax.tar.gz

2. 配置項(xiàng)

3. 執(zhí)行流程-DataX

  1. 由xxl-job或其他調(diào)度工具發(fā)起同步
  2. 同步任務(wù) 讀取作業(yè)信息
  3. 數(shù)據(jù)庫 讀取對(duì)應(yīng) param_server_id, from_server_id, to_server_id 的連接方式
  4. (如有) 讀取動(dòng)態(tài)參數(shù). 例如 param_sql 為
        select max(update_time) as start_time from ods_order
    
    則解析運(yùn)行參數(shù) run_param = {'start_time':'2026-01-01 11:11:11'}
  5. 生成最終取數(shù)sql, 例如 from_sql 為
        select a,b,c,d from erp_order where update_time>'{start_time}'
    
    則生成最終執(zhí)行sql
        select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11'
    
  6. 用上述參數(shù)生成 datax json 配置文件到臨時(shí)目錄
  7. 生成 datax 命令行并執(zhí)行任務(wù)
  8. 解析輸出結(jié)果, 獲取同步行數(shù). 或讀取報(bào)錯(cuò)信息
  9. 記錄日志

4. 執(zhí)行流程-Pandas

  1. 由xxl-job或其他調(diào)度工具發(fā)起同步
  2. 同步任務(wù) 讀取作業(yè)信息
  3. 數(shù)據(jù)庫 讀取對(duì)應(yīng) param_server_id, from_server_id, to_server_id 的連接方式
  4. (如有) 讀取動(dòng)態(tài)參數(shù). 例如 param_sql 為
        select max(update_time) as start_time from ods_order
    
    則解析運(yùn)行參數(shù) run_param = {'start_time':'2026-01-01 11:11:11'}
  5. 生成最終取數(shù)sql, 例如 from_sql 為
        select a,b,c,d from erp_order where update_time>'{start_time}'
    
    則生成最終執(zhí)行sql
        select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11'
    
  6. 執(zhí)行 before_write
  7. 讀取 最終執(zhí)行sql 結(jié)果到 DataFrame
  8. 寫入目標(biāo)數(shù)據(jù)
  9. 執(zhí)行 after_write
  10. 記錄日志

5. 完整部署步驟

5.1. 下載代碼

  • git clone https://github.com/ts7ming/CheapETL
  • (或 git clone https://gitee.com/ts7ming/CheapETL)

5.2. 準(zhǔn)備環(huán)境

  • 在MySQL執(zhí)行 etl.sql

  • 創(chuàng)建 settings.py

    DS_CONFIG = {
        'conn_type': 'mysql',
        'host': 'localhost', 
        'username': 'root', 
        'password': 'qiming', 
        'port': '3306', 
        'db_name': 'dw'
    }
    WORK_DIR = '/app/CheapETL/'
    DATAX_PY = '/opt/datax/bin/data.py'
    PY_PATH = 'python3'
    

5.3. 添加數(shù)據(jù)源

  • 在 MySQL etl_server 表添加數(shù)據(jù)源id和連接信息
    • 如果用datax寫入 doris, 需要單獨(dú)新建數(shù)據(jù)源id, port值為 fe_port,be_port 例如 9030,8030

5.4. 配置同步任務(wù)

  • 在 MySQL etl_job_sync 表添加同步配置

5.5. 執(zhí)行同步任務(wù)

  • 通過 xxl-job, crontab 或其他方式執(zhí)行 python3 script_path sync_id param
    • 數(shù)據(jù)量小的同步: script_path = /app/CheapETL/sync.py
    • 數(shù)據(jù)量大的同步: script_path = /app/CheapETL/sync_datax.py
    • sync_id = etl_job_sync.id
    • param是指定參數(shù), 優(yōu)先級(jí)高于 etl_job_sync.param_sql
  • 例如: python3 /app/CheapETL/sync_datax.py 2001
  • 例如: python3 /app/CheapETL/sync.py 2002 --start_date "$(date -d '-1 day' +%Y-%m-%d)" --end_date "$(date +%Y-%m-%d)"
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容