Apache Airflow單機(jī)/分布式環(huán)境搭建

[TOC]


Airflow簡(jiǎn)介

Apache Airflow是一個(gè)提供基于DAG(有向無(wú)環(huán)圖)來(lái)編排工作流的、可視化的分布式任務(wù)調(diào)度平臺(tái)(也可單機(jī)),與Oozie、Azkaban等調(diào)度平臺(tái)類(lèi)似。Airflow在2014年由Airbnb發(fā)起,2016年3月進(jìn)入Apache基金會(huì),在2019年1月成為頂級(jí)項(xiàng)目。Airflow采用Python語(yǔ)言編寫(xiě),并提供可編程方式定義DAG工作流(編寫(xiě)Python代碼)。當(dāng)工作流通過(guò)代碼來(lái)定義時(shí),它們變得更加可維護(hù)、可版本化、可測(cè)試和協(xié)作。

Airflow的可視化界面提供了工作流節(jié)點(diǎn)的運(yùn)行監(jiān)控,可以查看每個(gè)節(jié)點(diǎn)的運(yùn)行狀態(tài)、運(yùn)行耗時(shí)、執(zhí)行日志等。也可以在界面上對(duì)節(jié)點(diǎn)的狀態(tài)進(jìn)行操作,如:標(biāo)記為成功、標(biāo)記為失敗以及重新運(yùn)行等。在Airflow中工作流上每個(gè)task都是原子可重試的,一個(gè)工作流某個(gè)環(huán)節(jié)的task失敗可自動(dòng)或手動(dòng)進(jìn)行重試,不必從頭開(kāi)始跑。

Airflow通常用在數(shù)據(jù)處理領(lǐng)域,也屬于大數(shù)據(jù)生態(tài)圈的一份子。當(dāng)然Airflow也可以用于調(diào)度非數(shù)據(jù)處理的任務(wù),只不過(guò)數(shù)據(jù)處理任務(wù)之間通常都會(huì)存在依賴(lài)關(guān)系。而且這個(gè)關(guān)系可能還比較復(fù)雜,用crontab等基礎(chǔ)工具無(wú)法滿(mǎn)足,因此更需要被調(diào)度平臺(tái)編排和管理。例如:

  • 時(shí)間依賴(lài):任務(wù)需要等待某一個(gè)時(shí)間點(diǎn)觸發(fā)
  • 外部系統(tǒng)依賴(lài):任務(wù)依賴(lài)外部系統(tǒng)需要調(diào)用接口去訪問(wèn)
  • 任務(wù)間依賴(lài):任務(wù) A 需要在任務(wù) B 完成后啟動(dòng),兩個(gè)任務(wù)互相間會(huì)產(chǎn)生影響
  • 資源環(huán)境依賴(lài):任務(wù)消耗資源非常多, 或者只能在特定的機(jī)器上執(zhí)行

Airflow的架構(gòu)圖如下:


image.png
  • Metadata Database:Airflow的元數(shù)據(jù)庫(kù),用于Webserver、Executor及Scheduler存儲(chǔ)各種狀態(tài)數(shù)據(jù),通常是MySQL或PostgreSQL
  • User Interface:用戶(hù)界面,即前端web界面
  • Webserver:web服務(wù)器,用于提供用戶(hù)界面的操作接口
  • Scheduler:調(diào)度器,負(fù)責(zé)處理觸發(fā)調(diào)度的工作流,并將工作流中的任務(wù)提交給執(zhí)行器處理
  • Executor:執(zhí)行器,負(fù)責(zé)處理任務(wù)實(shí)例。在本地模式下會(huì)運(yùn)行在調(diào)度器中,并負(fù)責(zé)所有任務(wù)實(shí)例的處理。但是大多數(shù)適合于生產(chǎn)的執(zhí)行器實(shí)際上是一個(gè)消息隊(duì)列(RabbitMQ、Redis),負(fù)責(zé)將任務(wù)實(shí)例推送給工作節(jié)點(diǎn)執(zhí)行
  • Workers:工作節(jié)點(diǎn),真正負(fù)責(zé)調(diào)起任務(wù)進(jìn)程、執(zhí)行任務(wù)的節(jié)點(diǎn),worker可以有多個(gè),是獨(dú)立的進(jìn)程
  • DAG Directory:存放DAG任務(wù)圖定義的Python代碼的目錄,代表一個(gè)Airflow的處理流程。代碼文件所在的位置通過(guò)Airflow配置dags_folder指定,需要保證執(zhí)行器、調(diào)度器以及工作節(jié)點(diǎn)都能夠訪問(wèn)到

關(guān)于Airflow的更多內(nèi)容可以參考官方文檔:


準(zhǔn)備工作

1、準(zhǔn)備虛擬機(jī)或云服務(wù)環(huán)境,我這里使用的是本地的虛擬機(jī):

  • 操作系統(tǒng):CentOS7
  • CPU:8核
  • 內(nèi)存:16G
  • 硬盤(pán):20G
  • IP:192.168.243.175

2、編譯安裝Python3,安裝步驟可以參考下文:

我這里安裝的版本是3.9.1:

[root@localhost ~]# python3 --version
Python 3.9.1

3、安裝Docker環(huán)境,安裝步驟可以參考下文:

我這里安裝的版本是19.03.12:

[root@localhost ~]# docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:46:54 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:45:28 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

4、安裝MySQL數(shù)據(jù)庫(kù),安裝步驟可以參考下文或MySQL官方文檔

我這里安裝的版本是8.0.21:

> select version();
8.0.21

Airflow單機(jī)環(huán)境搭建

完成準(zhǔn)備工作后,我們就先來(lái)搭建Airflow的單機(jī)環(huán)境,先上官方文檔:

設(shè)置一下Airflow的文件存儲(chǔ)目錄:

[root@localhost ~]# vim /etc/profile
export AIRFLOW_HOME=/usr/local/airflow
[root@localhost ~]# source /etc/profile

Airflow的安裝很簡(jiǎn)單,只需要一條命令就能完成:

$ pip3 install "apache-airflow==2.1.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.0/constraints-3.9.txt" -i https://pypi.tuna.tsinghua.edu.cn/simple --default-timeout=6000

安裝完成后,執(zhí)行如下命令初始化數(shù)據(jù)庫(kù):

[root@localhost ~]# airflow db init
Traceback (most recent call last):
  File "/usr/local/python/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/usr/local/python/lib/python3.9/site-packages/airflow/__init__.py", line 34, in <module>
    from airflow import settings
  File "/usr/local/python/lib/python3.9/site-packages/airflow/settings.py", line 35, in <module>
    from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf  # NOQA F401
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 1115, in <module>
    conf = initialize_config()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 877, in initialize_config
    conf.validate()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 202, in validate
    self._validate_config_dependencies()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 234, in _validate_config_dependencies
    import sqlite3
  File "/usr/local/python/lib/python3.9/sqlite3/__init__.py", line 23, in <module>
    from sqlite3.dbapi2 import *
  File "/usr/local/python/lib/python3.9/sqlite3/dbapi2.py", line 27, in <module>
    from _sqlite3 import *
ModuleNotFoundError: No module named '_sqlite3'

這時(shí)肯定會(huì)報(bào)錯(cuò),因?yàn)槲覀冞€沒(méi)有配置數(shù)據(jù)相關(guān)信息。之所以要先執(zhí)行一下這條命令是為了讓Airflow在我們?cè)O(shè)定的目錄下生成配置文件:

[root@localhost ~]# ls /usr/local/airflow/
airflow.cfg  webserver_config.py
[root@localhost ~]# 

修改配置文件:

[root@localhost ~]# vim /usr/local/airflow/airflow.cfg
[core]
dags_folder = /usr/local/airflow/dags
default_timezone = Asia/Shanghai
# 配置數(shù)據(jù)庫(kù)
sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
# Are DAGs paused by default at creation
dags_are_paused_at_creation = False

[webserver]
default_ui_timezone = Asia/Shanghai
# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = graph

[scheduler]
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30

到MySQL上創(chuàng)建數(shù)據(jù)庫(kù)和用戶(hù):

CREATE DATABASE airflow CHARACTER SET utf8;
create user 'airflow'@'%' identified by '123456a.';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
  • Tips:數(shù)據(jù)庫(kù)編碼需為utf8,否則Airflow初始化數(shù)據(jù)庫(kù)時(shí)可能會(huì)失敗

安裝MySQL客戶(hù)端相關(guān)依賴(lài)包,需要具備如下依賴(lài)才能成功安裝Python的mysqlclient庫(kù):

[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-devel-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-plugins-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-common-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-compat-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# yum install -y ./*.rpm

安裝gcc:

[root@localhost ~]# yum install -y gcc make libffi-devel zlib*

安裝mysqlclient:

[root@localhost ~]# pip3 install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple

再次初始化數(shù)據(jù)庫(kù):

[root@localhost ~]# airflow db init
...
Initialization done

初始化成功后,數(shù)據(jù)庫(kù)表如下:


image.png

然后創(chuàng)建管理員用戶(hù):

[root@localhost ~]# airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

啟動(dòng)webserver:

[root@localhost ~]# airflow webserver --port 8080

啟動(dòng)scheduler:

[root@localhost ~]# airflow scheduler

執(zhí)行官方的示例任務(wù),測(cè)試下Airflow是否已正常啟動(dòng),如下輸出success代表沒(méi)問(wèn)題:

[root@localhost ~]# airflow tasks run example_bash_operator runme_0 2015-01-01
[2021-06-19 21:44:47,149] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/airflow/dags
Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
[2021-06-19 21:44:47,763] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/python/lib/python3.9/site-packages/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
[root@localhost ~]# 

Airflow的常用命令

# 守護(hù)進(jìn)程運(yùn)行webserver
$ airflow webserver -D       

# 守護(hù)進(jìn)程運(yùn)行調(diào)度器
$ airflow scheduler -D       

# 守護(hù)進(jìn)程運(yùn)行調(diào)度器
$ airflow worker -D          

# 守護(hù)進(jìn)程運(yùn)行celery worker并指定任務(wù)并發(fā)數(shù)為1
$ airflow worker -c 1 -D     

# 暫停任務(wù)
$ airflow pause $dag_id     

# 取消暫停,等同于在管理界面打開(kāi)off按鈕
$ airflow unpause $dag_id    

# 查看task列表
$ airflow list_tasks $dag_id  

# 清空任務(wù)實(shí)例
$ airflow clear $dag_id       

# 運(yùn)行整個(gè)dag文件
$ airflow trigger_dag $dag_id -r $RUN_ID -e $EXEC_DATE  

# 運(yùn)行task
$ airflow run $dag_id $task_id $execution_date       

常用頁(yè)面操作

接著訪問(wèn)http://192.168.243.175:8080,登錄airflow的用戶(hù)界面:


image.png

登錄成功,首頁(yè)如下:


image.png

右上角可以選擇時(shí)區(qū):


image.png

頁(yè)面上有些示例的任務(wù),我們可以手動(dòng)觸發(fā)一些任務(wù)進(jìn)行測(cè)試:


image.png

image.png

點(diǎn)擊具體的DAG,就可以查看該DAG的詳細(xì)信息和各個(gè)節(jié)點(diǎn)的運(yùn)行狀態(tài):


image.png

點(diǎn)擊DAG中的節(jié)點(diǎn),就可以對(duì)該節(jié)點(diǎn)進(jìn)行操作:


image.png

自定義DAG

接下來(lái)我們自定義一個(gè)簡(jiǎn)單的DAG給Airflow運(yùn)行,創(chuàng)建Python代碼文件:

[root@localhost ~]# mkdir /usr/local/airflow/dags
[root@localhost ~]# vim /usr/local/airflow/dags/my_dag_example.py

代碼示例:

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# 默認(rèn)參數(shù)
args = {
    'owner': 'admin',
}

with DAG(
        dag_id='my_dag_example',
        default_args=args,
        schedule_interval='@once',
        start_date=days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['my_dag'],
        params={"example_key": "example_value"}
) as dag:
    # 定義DAG中的節(jié)點(diǎn)
    first = BashOperator(
        task_id='first',
        bash_command='echo "run first task"',
    )
    middle = BashOperator(
        task_id='middle',
        bash_command='echo "run middle task"',
    )
    last = BashOperator(
        task_id='last',
        bash_command='echo "run last task"',
    )
    
    # 定義節(jié)點(diǎn)的上下游關(guān)系
    first >> middle >> last

等待一會(huì)在Web界面上可以看到我們自定義的DAG任務(wù)已經(jīng)被運(yùn)行完了,因?yàn)楸容^簡(jiǎn)單,所以執(zhí)行得很快:


image.png

查看下節(jié)點(diǎn)的關(guān)系是否與我們?cè)诖a中定義的一樣:


image.png

關(guān)于DAG的代碼定義可以參考官方的示例代碼和官方文檔,自帶的例子在如下目錄:

  • /usr/local/python/lib/python3.9/site-packages/airflow/example_dags

Airflow分布式環(huán)境搭建

如果Airflow要支持分布式的話(huà),需要安裝RabbitMQ或Redis作為Airflow的Executor,安裝步驟可以參考下文:

文本采用的是RabbitMQ,版本為3.8.9。若只是測(cè)試的話(huà)可以使用Docker快速安裝,如下:

[root@localhost ~]# docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.9-management
[root@localhost ~]# docker exec -it rabbitmq bash
root@49c8ebed2525:/# rabbitmqctl add_user airflow password  # 添加用戶(hù)
root@49c8ebed2525:/# rabbitmqctl add_vhost airflow_vhost   # 添加虛擬主機(jī)
root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow airflow_vhost  # 為用戶(hù)綁定虛擬主機(jī)
root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow administrator  # 設(shè)置用戶(hù)權(quán)限為管理員
root@49c8ebed2525:/# rabbitmqctl  set_permissions -p airflow_vhost airflow '.*' '.*' '.*' # 設(shè)置遠(yuǎn)程登錄權(quán)限

在分布式這一環(huán)節(jié)我們使用Docker來(lái)部署,因?yàn)槿萜鞯膹椥阅芰Ω鼜?qiáng),而且部署方便,可以快速擴(kuò)展多個(gè)worker。首先,拉取airflow的docker鏡像:

[root@localhost ~]# docker pull apache/airflow

拷貝之前本地安裝時(shí)生成的airflow配置文件:

[root@localhost ~]# cp /usr/local/airflow/airflow.cfg ~
[root@localhost ~]# vim airflow.cfg

然后修改配置文件的內(nèi)容如下:

[core]
# 存放dag定義文件的目錄
dags_folder = /opt/airflow/dags
default_timezone = Asia/Shanghai
# 配置數(shù)據(jù)庫(kù)
sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
# The executor class that airflow should use
executor = CeleryExecutor
# Are DAGs paused by default at creation
dags_are_paused_at_creation = False
plugins_folder = /opt/airflow/plugins

[webserver]
default_ui_timezone = Asia/Shanghai
# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = graph

[scheduler]
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30
child_process_log_directory = /opt/airflow/logs/scheduler

[logging]
base_log_folder = /opt/airflow/logs
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log

[celery]
# worker的并發(fā)度,worker可以執(zhí)行的任務(wù)實(shí)例的數(shù)量
worker_concurrency = 16
# worker日志服務(wù)的端口
worker_log_server_port = 8795
# RabbitMQ的連接地址
broker_url = amqp://airflow:password@192.168.243.175:5672/airflow_vhost
result_backend = db+mysql://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
flower_host = 0.0.0.0
flower_port = 5555

[operators]
default_queue = airflow_queue

創(chuàng)建一個(gè)airflow專(zhuān)屬的docker網(wǎng)絡(luò),為了啟動(dòng)容器時(shí)能夠指定各個(gè)節(jié)點(diǎn)的ip以及設(shè)置host,也利于與其他容器的網(wǎng)絡(luò)隔離:

[root@localhost ~]# docker network create --driver bridge --subnet=172.18.12.0/16 --gateway=172.18.1.1 airflow

然后從鏡像中創(chuàng)建各個(gè)節(jié)點(diǎn)的容器,注意ip和host的設(shè)置:

[root@localhost ~]# docker run -d -p 8080:8080 --name airflow_webserver \
--network=airflow --ip 172.18.12.1 --hostname airflow_webserver \
--add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow webserver

[root@localhost ~]# docker run -d --name airflow_scheduler \
--network=airflow --ip 172.18.12.2 --hostname airflow_scheduler \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow scheduler

[root@localhost ~]# docker run -d -p 5555:5555 --name airflow_flower \
--network=airflow --ip 172.18.12.3 --hostname airflow_flower \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_scheduler:172.18.12.2 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow celery flower

[root@localhost ~]# docker run -d -p 8795:8795 --name airflow_worker1 \
--network=airflow --ip 172.18.12.4 --hostname airflow_worker1 \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow celery worker

[root@localhost ~]# docker run -d -p 8796:8795 --name airflow_worker2 \
--network=airflow --ip 172.18.12.5 --hostname airflow_worker2 \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_scheduler:172.18.12.2 \
apache/airflow celery worker

將宿主機(jī)上修改后的配置文件替換容器內(nèi)的配置文件:

[root@localhost ~]# docker cp ./airflow.cfg airflow_webserver:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_scheduler:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_flower:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_worker1:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_worker2:/opt/airflow/airflow.cfg

刪除之前部署單機(jī)版時(shí)產(chǎn)生的數(shù)據(jù)表,然后重新執(zhí)行數(shù)據(jù)庫(kù)的初始化:

[root@localhost ~]# airflow db init

由于刪除了之前的數(shù)據(jù),所以需要重新創(chuàng)建airflow的管理員用戶(hù):

[root@localhost ~]# airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

然后重啟各個(gè)節(jié)點(diǎn):

[root@localhost ~]# docker restart airflow_webserver
[root@localhost ~]# docker restart airflow_scheduler
[root@localhost ~]# docker restart airflow_flower
[root@localhost ~]# docker restart airflow_worker1
[root@localhost ~]# docker restart airflow_worker2

通過(guò)docker ps確認(rèn)各個(gè)節(jié)點(diǎn)都啟動(dòng)成功后,訪問(wèn)flower的web界面,可以查看在線的worker信息,以確認(rèn)worker的存活狀態(tài):

image.png

然后訪問(wèn)webserver的web界面,確認(rèn)能正常訪問(wèn):


image.png

由于容器內(nèi)的/opt/airflow/dags目錄下沒(méi)有任何文件,所以webserver的界面是空的。現(xiàn)在我們將之前編寫(xiě)的dag文件拷貝到容器內(nèi)。注意,dag文件需要同步到所有的scheduler和worker節(jié)點(diǎn),并且要保證airflow對(duì)該文件有足夠的權(quán)限。如下示例:

[root@localhost ~]# chmod 777 /usr/local/airflow/dags/my_dag_example.py  # 為了避免權(quán)限問(wèn)題,這里直接放開(kāi)所有權(quán)限
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker1:/opt/airflow/dags/my_dag_example.py    # 先拷貝到worker節(jié)點(diǎn),如果先拷貝到scheduler節(jié)點(diǎn)會(huì)觸發(fā)調(diào)度,此時(shí)worker節(jié)點(diǎn)沒(méi)相應(yīng)的dag文件就會(huì)報(bào)錯(cuò)
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker2:/opt/airflow/dags/my_dag_example.py
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_scheduler:/opt/airflow/dags/my_dag_example.py

同步完dag文件后,等待一會(huì)可以看到任務(wù)被調(diào)度起來(lái)了:


image.png

運(yùn)行成功:


image.png

進(jìn)入graph view界面查看各個(gè)節(jié)點(diǎn)的狀態(tài):


image.png

查看first節(jié)點(diǎn)的日志信息,看看是否被正確調(diào)度到worker上了??梢钥吹?,該節(jié)點(diǎn)被調(diào)度到了airflow_worker2上:

image.png

middle節(jié)點(diǎn)則被調(diào)度到了airflow_worker1上:

image.png

至此,我們就完成了airflow分布式環(huán)境的搭建和驗(yàn)證。但是還有一些不完美,就是在這個(gè)架構(gòu)下webserver和scheduler有單點(diǎn)故障問(wèn)題,不具備高可用性。不過(guò)在較新的版本中這個(gè)問(wèn)題也比較好解決,webserver和scheduler都啟動(dòng)多個(gè)節(jié)點(diǎn)就好了,不像在老版本中為了讓scheduler節(jié)點(diǎn)高可用還要做額外的特殊處理。關(guān)于scheduler的高可用說(shuō)明可以參考官方文檔:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容