airflow

下載安裝airflow

使用pip 安裝

pip install apache-airflow

將默認(rèn)的數(shù)據(jù)庫sqlite 改為mysql

如果想用 mysql 作為 數(shù)據(jù)庫,則 執(zhí)行以下命令

  1. pip install 'apache-airflow[mysql]'
  2. 下載 mysql(已經(jīng)有了則跳過)
  3. 更改airflow.cfg 文件,更改以下這兩個(gè)配置,executor 改為local
executor = LocalExecutor
 # their website
sql_alchemy_conn = mysql://root:PASSWORD@localhost/Airflow

Airflow 是提前建好的數(shù)據(jù)庫。

  1. 執(zhí)行airflow initdb 命令。如果報(bào)錯(cuò) Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
    則需要在mysql 的配置文件my.cnf ,mysqld 下增加explicit_defaults_for_timestamp = 1。然后重啟mysqld 。如果是 brew 安裝的mysql 。執(zhí)行
$ brew services list
$ brew services restart SERVICE_NAME

quick start

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

Example

官方文檔的例子

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

實(shí)例化一個(gè)DAG

tutorial 是 dag_id. 唯一標(biāo)識(shí)你的DAG。

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

Tasks

當(dāng)實(shí)例化 operators 時(shí),就會(huì)生成 Tasks。 An object instantiated from an operator is called a constructor. 參數(shù)task_id 是 作為task 的唯一標(biāo)識(shí)。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

一個(gè)task 必須包含或者繼承task_idowner, 不然Airflow 會(huì)引發(fā)異常。

Templating with Jinja

Airflow 提供了一些內(nèi)置的 參數(shù)和宏。

建立依賴

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

當(dāng)發(fā)現(xiàn)有環(huán),或者引用多次,airflow就會(huì)引起異常。

Testing

運(yùn)行腳本

將上面的代碼放到一個(gè)文件 tutorial.py 在DAG 文件夾下,這個(gè)文件夾在airflow.cfg 配置好了。默認(rèn)是~/airflow/dags

python ~/airflow/dags/tutorial.py

命令行元數(shù)據(jù)驗(yàn)證

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

Testing

airflow test tutorial print_date 2015-06-01

Backfill

執(zhí)行backfill 會(huì)真正執(zhí)行 dag, 在UI 上可以看到執(zhí)行狀態(tài)。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

airflow Guides

Config

配置文件 airflow.cfg. 也可以用過環(huán)境變量 $AIRFLOW__{SECTION}__{KEY}
比如
數(shù)據(jù)庫的連接,在airflow.cfg 如下

[core]
sql_alchemy_conn = my_conn_string

也可以新建一個(gè)環(huán)境變量,像

AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string

也可以 在key 后面添加_cmd 后面跟bash 命令,比如

[core]
sql_alchemy_conn_cmd = bash_command_to_run

支持_cmd的有

  • sql_alchemy_conn in [core] section

  • fernet_key in [core] section

  • broker_url in [celery] section

  • result_backend in [celery] section

  • password in [atlas] section

  • smtp_password in [smtp] section

  • bind_password in [ldap] section

  • git_password in [kubernetes] section

配置選項(xiàng)的優(yōu)先級(jí)順序:

  1. 環(huán)境變量
  2. 在airflow.cfg的配置
  3. 在airflow.cfg 的命令
  4. Airflow 默認(rèn)的內(nèi)置配置
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 在快速啟動(dòng)部分中設(shè)置很簡單,構(gòu)建生產(chǎn)級(jí)環(huán)境需要更多的工作,下面來了解一下。 1. 設(shè)置配置選項(xiàng) 第一次運(yùn)行Airf...
    路小漫閱讀 9,829評(píng)論 0 3
  • 本文將介紹 Airflow 這一款優(yōu)秀的調(diào)度工具。主要包括 Airflow 的服務(wù)構(gòu)成、Airflow 的 Web...
    a7f00a9019ae閱讀 64,058評(píng)論 6 42
  • 跑步堅(jiān)持第二天
    九分傷閱讀 204評(píng)論 0 0
  • 對(duì)不起 那就這樣吧 早已放棄期待 只是 這一天來的有點(diǎn)快 原來也偷的半日浮生 該慶幸 曾一起
    yyfly閱讀 139評(píng)論 0 0
  • 我是文科專業(yè),能選擇的理想的職業(yè)屈指可數(shù)。開始選的是法律,可能差個(gè)幾分未被錄取,調(diào)劑到一個(gè)醫(yī)學(xué)院校的心理專業(yè),當(dāng)然...
    InspironI閱讀 241評(píng)論 0 0

友情鏈接更多精彩內(nèi)容