0. 前置條件
- MySQL數(shù)據(jù)庫
- Python環(huán)境
1. 下載 DataX 工具 (可選)
cd /opt/
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
tar -zxvf datax.tar.gz
2. 配置項(xiàng)

3. 執(zhí)行流程-DataX
- 由xxl-job或其他調(diào)度工具發(fā)起同步
- 從
同步任務(wù)讀取作業(yè)信息 - 從
數(shù)據(jù)庫讀取對(duì)應(yīng) param_server_id, from_server_id, to_server_id 的連接方式 - (如有) 讀取動(dòng)態(tài)參數(shù). 例如 param_sql 為
則解析運(yùn)行參數(shù) run_param = {'start_time':'2026-01-01 11:11:11'}select max(update_time) as start_time from ods_order - 生成最終取數(shù)sql, 例如 from_sql 為
則生成最終執(zhí)行sqlselect a,b,c,d from erp_order where update_time>'{start_time}'select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11' - 用上述參數(shù)生成 datax json 配置文件到臨時(shí)目錄
- 生成 datax 命令行并執(zhí)行任務(wù)
- 解析輸出結(jié)果, 獲取同步行數(shù). 或讀取報(bào)錯(cuò)信息
- 記錄日志
4. 執(zhí)行流程-Pandas
- 由xxl-job或其他調(diào)度工具發(fā)起同步
- 從
同步任務(wù)讀取作業(yè)信息 - 從
數(shù)據(jù)庫讀取對(duì)應(yīng) param_server_id, from_server_id, to_server_id 的連接方式 - (如有) 讀取動(dòng)態(tài)參數(shù). 例如 param_sql 為
則解析運(yùn)行參數(shù) run_param = {'start_time':'2026-01-01 11:11:11'}select max(update_time) as start_time from ods_order - 生成最終取數(shù)sql, 例如 from_sql 為
則生成最終執(zhí)行sqlselect a,b,c,d from erp_order where update_time>'{start_time}'select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11' - 執(zhí)行 before_write
- 讀取 最終執(zhí)行sql 結(jié)果到 DataFrame
- 寫入目標(biāo)數(shù)據(jù)
- 執(zhí)行 after_write
- 記錄日志
5. 完整部署步驟
5.1. 下載代碼
git clone https://github.com/ts7ming/CheapETL- (或
git clone https://gitee.com/ts7ming/CheapETL)
5.2. 準(zhǔn)備環(huán)境
在MySQL執(zhí)行
etl.sql-
創(chuàng)建
settings.pyDS_CONFIG = { 'conn_type': 'mysql', 'host': 'localhost', 'username': 'root', 'password': 'qiming', 'port': '3306', 'db_name': 'dw' } WORK_DIR = '/app/CheapETL/' DATAX_PY = '/opt/datax/bin/data.py' PY_PATH = 'python3'
5.3. 添加數(shù)據(jù)源
- 在 MySQL
etl_server表添加數(shù)據(jù)源id和連接信息- 如果用datax寫入 doris, 需要單獨(dú)新建數(shù)據(jù)源id, port值為
fe_port,be_port例如9030,8030
- 如果用datax寫入 doris, 需要單獨(dú)新建數(shù)據(jù)源id, port值為
5.4. 配置同步任務(wù)
- 在 MySQL
etl_job_sync表添加同步配置
5.5. 執(zhí)行同步任務(wù)
- 通過 xxl-job, crontab 或其他方式執(zhí)行
python3 script_path sync_id param- 數(shù)據(jù)量小的同步:
script_path=/app/CheapETL/sync.py - 數(shù)據(jù)量大的同步:
script_path=/app/CheapETL/sync_datax.py -
sync_id=etl_job_sync.id - param是指定參數(shù), 優(yōu)先級(jí)高于
etl_job_sync.param_sql
- 數(shù)據(jù)量小的同步:
- 例如:
python3 /app/CheapETL/sync_datax.py 2001 - 例如:
python3 /app/CheapETL/sync.py 2002 --start_date "$(date -d '-1 day' +%Y-%m-%d)" --end_date "$(date +%Y-%m-%d)"