1. Installation

1.1 Installing Airflow

Installing from PyPi 문서를 보면서 설치합니다.

먼저 dependencies를 설치 합니다.

# MariaDB 사용시
sudo apt-get install mariadb-server
sudo apt-get install libmariadbclient-dev

# MySQL 사용시
sudo apt-get install mysql-server
sudo apt-get install libmysqlclient-dev
pip install --upgrade mysqlclient
pip install --upgrade mysql-connector-python
pip install --upgrade paramiko plyvel

Apache Airflow는 일종의 앱이기도 하면서, 라이브러리이기도 합니다.
라이브러리는 dependencies 를 열어놓고, 앱의 경우는 고정시켜놓습니다.
Apache Airflow의 관리자는 오픈된 환경으로 열어두었지만,
이 경우 간혹 잘못된 dependency 또는 업데이트로 인해서 설치가 안 될 수도 있습니다.

이것을 방지하기 위해서 constraint file을 따로 만들어서 안정화된 버젼을 관리중입니다.
providers 도 참고 합니다.

AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.2.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
PROVIDERS="async,postgres,gcp,google,google_auth,amazon,mysql,redis,elasticsearch,ssh,http,apache-hdfs,docker,kubernetes,cncf-kubernetes,jdbc,ocbc,mongo,slack"
pip install "apache-airflow[${PROVIDERS}]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 그냥 다 설치 .. ㅎㅎ (그냥 왠만하면 필요한것만 설치할 것.. 너무 많아...)
pip install 'apache-airflow[all]'

1.2 Setting Up Database Backend

기본적으로 Airflow는 SqlLite를 사용합니다.
production환경에서는 변경이 필요하며, 링크를 참조합니다.

본문에서는 MariaDB 또는 MySQL 기반의 backend를 정리했습니다.
sudo vi /etc/mysql/mariadb.conf.d/50-server.cnf 로 다음과 같이 추가 합니다.

[mysqld]
explicit_defaults_for_timestamp = 1

수정후 sudo systemctl restart mariadb 재시작 합니다.

CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'%';
FLUSH PRIVILEGES;
SHOW DATABASES;

airflow.cfg 파일을 수정합니다.

cd ~/airflow/
vi airflow.cfg

sql_alchemy_conn 을 수정합니다.
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>?charset=utf8 형식으로 만듭니다.
끝에 ?charset=utf8 넣었는데.. 안넣으면 제 경우는 에러가 발생했습니다.

# sql_alchemy_conn = sqlite:////home/anderson/airflow/airflow.db
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow?charset=utf8

# 샘플 데이터 설치 여부
load_examples = False

# Rest API Authentication 변경
auth_backend = airflow.api.auth.backend.basic_auth

1.3 Initialize DB and Webserver

# 예제 없이 설치
export AIRFLOW__CORE__LOAD_EXAMPLES=False
airflow db init

airflow users create \
    --username admin \
    --firstname Anderson \
    --lastname Jo \
    --role Admin \
    --email anderson.pytorch@gmail.com \
    --password admin

airflow webserver --port 8080

기본 예제를 불러오지 않을려면 airflow.cfg 에서 아래처럼 수정합니다.

load_examples = False

스케쥴러 실행은 다음과 같이 합니다.

airflow scheduler

1.4 Debug Executor

PyCharm 에서 바로 사용 가능 합니다.

if __name__ == "__main__":
    from airflow.utils.state import State

    dag.clear(dag_run_state=State.NONE)
    dag.run()

2. Example

2.1 Settings

대부분의 설정할 내용들은 ~/airflow/airflow.cfg 에 모두 있습니다.

주요 세팅

  • dags_folder = /home/anderson/airflow/dags # DAG 위치를 지정합니다.

Concurrency Settings

airflow.cfg Environment Variables Default Value
parallelism AIRFLOW__CORE__PARALLELISM 32
dag_concurrency AIRFLOW__CORE__DAG_CONCURRENCY 16
worker_concurrency AIRFLOW__CELERY__WORKER_CONCURRENCY 16
parsing_processes AIRFLOW__SCHEDULER__PARSING_PROCESSES 2
max_active_runs_per_dag AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG 16
  • parallelism : 동시 실행 가능한 tasks 숫자. task 가 queue에 걸려 있으면 증가 시킬 필요 있음
  • dag_concurrency : DAG당 실행시킬수 있는 태스크의 갯수. 리소스가 충분한데도 실행이 안되면 dag_concurrency 증가 필요 있음
  • worker_concurrency : Maximum Celery Worker 를 가르킴
  • parsing_processes : Threads 갯수.

2.2 Example

~/airflow/dags/my_dag.py 에 작성합니다.

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from textwrap import dedent

default_args = dict(
    owner='airflow',
    depends_on_past=False,
    email=['anderson.pytorch@gmail.com'],
    email_on_failure=False,
    email_on_retry=False,
    retries=1,
    retry_delay=timedelta(minutes=1))

dag_params = dict(
    description='This is SPARTA!',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 11, 1),
    catchup=False,
    tags=['example']
)

with DAG('my_dag',
         default_args=default_args,
         **dag_params) as dag:
    t1 = BashOperator(task_id='print_date',
                      bash_command='date')
    t2 = BashOperator(task_id='sleep',
                      depends_on_past=False,  # True 이면 이전 task가 성공해야지 실행됨
                      bash_command='sleep 3',
                      retries=3)

    templated_command = dedent(
        """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
        """)

    t3 = BashOperator(task_id='template',
                      depends_on_past=False,
                      bash_command=templated_command,
                      params={'my_param': 'Parameter I passed in'})
    t1 >> [t2, t3]

기본적인 명령어

# 치트 리스트 
$ airflow cheat-sheet

# DAG 리스트 보기
$ airflow dags list

# 특정 DAG의 tasks 보기
$ airflow tasks list [dag name]
# airflow tasks list my_dag

# 특정 DAG의 tasks를 트리형태로 보기
$ airflow tasks list [dag name] --tree
$ airflow tasks list my_dag --tree

# DAG 실행해보기
$ airflow dags test [dag name] [execution date]
$ airflow dags test my_dag now