Airflow 101

1. Installation
1.1 Installing Airflow
Installing from PyPi 문서를 보면서 설치합니다.
먼저 dependencies를 설치 합니다.
# 무조건 설치
sudo apt install libpq-dev
# MariaDB 사용시
sudo apt install mariadb-server libmariadb-dev
sudo apt install mariadb-client
# MySQL 사용시
sudo apt-get install mysql-server
sudo apt install libmariadb-dev
pip install --upgrade mysqlclient
pip install --upgrade mysql-connector-python
pip install --upgrade paramiko plyvel
Apache Airflow는 일종의 앱이기도 하면서, 라이브러리이기도 합니다.
라이브러리는 dependencies 를 열어놓고, 앱의 경우는 고정시켜놓습니다.
Apache Airflow의 관리자는 오픈된 환경으로 열어두었지만,
이 경우 간혹 잘못된 dependency 또는 업데이트로 인해서 설치가 안 될 수도 있습니다.
이것을 방지하기 위해서 constraint file을 따로 만들어서 안정화된 버젼을 관리중입니다.
providers 도 참고 합니다.
$ AIRFLOW_VERSION=2.4.3
$ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ PROVIDERS="async,postgres,gcp,google,google_auth,amazon,mysql,redis,elasticsearch,ssh,http,apache-hdfs,docker,kubernetes,cncf-kubernetes,jdbc,ocbc,mongo,slack"
$ pip install "apache-airflow[${PROVIDERS}]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 그냥 다 설치 .. ㅎㅎ (그냥 왠만하면 필요한것만 설치할 것.. 너무 많아...)
pip install 'apache-airflow[all]'
1.2 Setting Up Database Backend
기본적으로 Airflow는 SqlLite를 사용합니다.
production환경에서는 변경이
필요하며, 링크를
참조합니다.
본문에서는 MariaDB 또는 MySQL 기반의 backend를 정리했습니다.
sudo vi /etc/mysql/mariadb.conf.d/50-server.cnf
로 다음과 같이 추가 합니다.
[mysqld]
explicit_defaults_for_timestamp = 1
collation-server = utf8_unicode_ci
init-connect='SET NAMES utf8'
character-set-server = utf8
max_allowed_packet=16M
# 8 hours
wait_timeout = 28800
# 8 hours
interactive_timeout = 28800
수정후 sudo systemctl restart mariadb
재시작 합니다.
admin 으로 sudo mysql
이렇게 접속해서 다음을 실행합니다.
실행전 airflow_pass
부분의 패스워드 변경이 필요합니다.
CREATE
DATABASE airflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE
USER 'airflow' IDENTIFIED BY 'airflow_password';
GRANT ALL PRIVILEGES ON airflow.* TO
'airflow';
FLUSH
PRIVILEGES;
SHOW
DATABASES;
airflow.cfg
파일을 수정합니다.
cd ~/airflow/
vi airflow.cfg
sql_alchemy_conn 을 수정합니다.
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>?charset=utf8
형식으로 만듭니다.
끝에 ?charset=utf8
넣었는데.. 안넣으면 제 경우는 에러가 발생했습니다.
# sql_alchemy_conn = sqlite:////home/anderson/airflow/airflow.db
sql_alchemy_conn = mysql+mysqldb://airflow:airflow_password@localhost:3306/airflow?charset=utf8
# 샘플 데이터 설치 여부
load_examples = False
# Rest API Authentication 변경
auth_backend = airflow.api.auth.backend.basic_auth
1.3 Initialize DB and Webserver
# 예제 없이 설치
export AIRFLOW__CORE__LOAD_EXAMPLES=False
airflow db init
airflow users create \
--username admin \
--firstname Anderson \
--lastname Jo \
--role Admin \
--email anderson.pytorch@gmail.com \
--password admin
airflow webserver --port 8080
기본 예제를 불러오지 않을려면 airflow.cfg 에서 아래처럼 수정합니다.
load_examples = False
스케쥴러 실행은 다음과 같이 합니다.
airflow scheduler
1.4 Debug Executor
PyCharm 에서 바로 사용 가능 합니다.
if __name__ == "__main__":
from airflow.utils.state import State
dag.clear(dag_run_state=State.NONE)
dag.run()
2. Example
2.1 Settings
대부분의 설정할 내용들은 ~/airflow/airflow.cfg
에 모두 있습니다.
주요 세팅
dags_folder
= /home/anderson/airflow/dags # DAG 위치를 지정합니다.
Concurrency Settings
airflow.cfg | Environment Variables | Default Value |
---|---|---|
parallelism | AIRFLOW__CORE__PARALLELISM | 32 |
dag_concurrency | AIRFLOW__CORE__DAG_CONCURRENCY | 16 |
worker_concurrency | AIRFLOW__CELERY__WORKER_CONCURRENCY | 16 |
parsing_processes | AIRFLOW__SCHEDULER__PARSING_PROCESSES | 2 |
max_active_runs_per_dag | AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG | 16 |
- parallelism : 동시 실행 가능한 tasks 숫자. task 가 queue에 걸려 있으면 증가 시킬 필요 있음
- dag_concurrency : DAG당 실행시킬수 있는 태스크의 갯수. 리소스가 충분한데도 실행이 안되면 dag_concurrency 증가 필요 있음
- worker_concurrency : Maximum Celery Worker 를 가르킴
- parsing_processes : Threads 갯수.
2.2 Example
~/airflow/dags/my_dag.py
에 작성합니다.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from textwrap import dedent
default_args = dict(
owner='airflow',
depends_on_past=False,
email=['anderson.pytorch@gmail.com'],
email_on_failure=False,
email_on_retry=False,
retries=1,
retry_delay=timedelta(minutes=1))
dag_params = dict(
description='This is SPARTA!',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 11, 1),
catchup=False,
tags=['example']
)
with DAG('my_dag',
default_args=default_args,
**dag_params) as dag:
t1 = BashOperator(task_id='print_date',
bash_command='date')
t2 = BashOperator(task_id='sleep',
depends_on_past=False, # True 이면 이전 task가 성공해야지 실행됨
bash_command='sleep 3',
retries=3)
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
""")
t3 = BashOperator(task_id='template',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'})
t1 >> [t2, t3]
기본적인 명령어
# 치트 리스트
$ airflow cheat-sheet
# DAG 리스트 보기
$ airflow dags list
# 특정 DAG의 tasks 보기
$ airflow tasks list [dag name]
# airflow tasks list my_dag
# 특정 DAG의 tasks를 트리형태로 보기
$ airflow tasks list [dag name] --tree
$ airflow tasks list my_dag --tree
# DAG 실행해보기
$ airflow dags test [dag name] [execution date]
$ airflow dags test my_dag now