下載安裝airflow
使用pip 安裝
pip install apache-airflow
將默認(rèn)的數(shù)據(jù)庫sqlite 改為mysql
如果想用 mysql 作為 數(shù)據(jù)庫,則 執(zhí)行以下命令
pip install 'apache-airflow[mysql]'- 下載 mysql(已經(jīng)有了則跳過)
- 更改airflow.cfg 文件,更改以下這兩個(gè)配置,executor 改為local
executor = LocalExecutor
# their website
sql_alchemy_conn = mysql://root:PASSWORD@localhost/Airflow
Airflow 是提前建好的數(shù)據(jù)庫。
- 執(zhí)行
airflow initdb命令。如果報(bào)錯(cuò)Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
則需要在mysql 的配置文件my.cnf ,mysqld 下增加explicit_defaults_for_timestamp = 1。然后重啟mysqld 。如果是 brew 安裝的mysql 。執(zhí)行
$ brew services list
$ brew services restart SERVICE_NAME
quick start
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
Example
官方文檔的例子
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
實(shí)例化一個(gè)DAG
tutorial 是 dag_id. 唯一標(biāo)識(shí)你的DAG。
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
Tasks
當(dāng)實(shí)例化 operators 時(shí),就會(huì)生成 Tasks。 An object instantiated from an operator is called a constructor. 參數(shù)task_id 是 作為task 的唯一標(biāo)識(shí)。
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
一個(gè)task 必須包含或者繼承task_id 和 owner, 不然Airflow 會(huì)引發(fā)異常。
Templating with Jinja
Airflow 提供了一些內(nèi)置的 參數(shù)和宏。
建立依賴
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
當(dāng)發(fā)現(xiàn)有環(huán),或者引用多次,airflow就會(huì)引起異常。
Testing
運(yùn)行腳本
將上面的代碼放到一個(gè)文件 tutorial.py 在DAG 文件夾下,這個(gè)文件夾在airflow.cfg 配置好了。默認(rèn)是~/airflow/dags
python ~/airflow/dags/tutorial.py
命令行元數(shù)據(jù)驗(yàn)證
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
Testing
airflow test tutorial print_date 2015-06-01
Backfill
執(zhí)行backfill 會(huì)真正執(zhí)行 dag, 在UI 上可以看到執(zhí)行狀態(tài)。
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
airflow Guides
Config
配置文件 airflow.cfg. 也可以用過環(huán)境變量 $AIRFLOW__{SECTION}__{KEY}
比如
數(shù)據(jù)庫的連接,在airflow.cfg 如下
[core]
sql_alchemy_conn = my_conn_string
也可以新建一個(gè)環(huán)境變量,像
AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string
也可以 在key 后面添加_cmd 后面跟bash 命令,比如
[core]
sql_alchemy_conn_cmd = bash_command_to_run
支持_cmd的有
sql_alchemy_connin[core]sectionfernet_keyin[core]sectionbroker_urlin[celery]sectionresult_backendin[celery]sectionpasswordin[atlas]sectionsmtp_passwordin[smtp]sectionbind_passwordin[ldap]sectiongit_passwordin[kubernetes]section
配置選項(xiàng)的優(yōu)先級(jí)順序:
- 環(huán)境變量
- 在airflow.cfg的配置
- 在airflow.cfg 的命令
- Airflow 默認(rèn)的內(nèi)置配置