Airflow는 Python을 기반으로 DAG를 작성하여 Workflow를 관리하는 도구이다. 수많은 데이터 파이프라인을 Reliable하게 관리, 실행, 모니터링하여 오케스트레이션할 수 있도록 도와준다.
Workflow 관리 도구로 Oozie가 있지만, Hadoop과 강한 Coupling을 가지기 떄문에 많은 기업에서 사용되는 Airflow를 익히려고 한다.
Airflow의 구성
Scheduler: Workflow를 스케줄링하는 데몬이다.
Web Server: Airflow의 웹 인터페이스를 제공하는 웹 서버이다. Flask와 Gunicorn을 이용하여 인터페이스를 제공한다.
MetaStore: 메타데이터가 저장되는 데이터베이스이다. 주로 Postgresql을 추천하지만, SQL Alchemy와 호환 가능한 MySQL이나 SQLite도 이용이 가능하다.
Executor: 어떤 환경에서 Task가 실행될지를 정의하는 메커니즘을 뜻한다. Debug, Local, Sequential, Celery 등의 타입이 존재한다. Executer는 pluggable하여 뛰어난 확장성을 지닌다.
Worker: 실제 Task를 처리하는 컴포넌트이다.
Operator, Sensor
Operator와 Sensor는 DAG의 노드가 되는 작업의 유형을 나타내며, 작업의 생성을 담당한다.
Operator는 Bash 커맨드 실행, Python 코드 실행, Email 전송, DB 조작 등 다양하게 연산화 된 작업을 제공한다. 작업을 수행하거나 다른 시스템에 작업을 수행하도록 하는 지시하는 Action Operator, 한 시스템에서 다른 시스템으로 데이터를 이동 시키는 Transfer Operator, 특정 조건이 충족될 때까지 반복으로 실행되는 Sensor Operator의 세가지 유형을 가진다.
Sensor는 Operator의 특별한 타입으로, 이벤트를 기다리며 해당 조건 충족시에만 이후 진행될 downstream작업을 수행할 수 있게 해준다. 효율성을 위해 세가지 유형의 Sensing 방법을 가지는데, 전체 런타임동안 worker를 감시하는 poke, 특정 상황에만 worker의 상태를 체크하며 그 사이에는 sleep하는 reschedule, Airflow 2.4.0 버전부터 지원하는 batch poke 형태의 smart sensor가 있다.
모든 Operator는 Base Operator 에서 상속되어 생성되고, 모든 Sensor 는 timeout과 poke_interval 등을 포함하는 Base Sensor Operator에서 상속되어 생성된다.
Task, Task Instance
Task는 데이터 파이프라인에 존재하는 개념적 Operator를 의미한다.
Task Instance는 데이터 파이프라인이 Trigger되어 실행될 때 메모리에 생성된 Task의 인스턴스이다.
Airflow Cluster
Multi-Node 환경에서 각 worker들은 서로의 존재를 몰라도 된다. 각자에게 할당되는 작업만 수행하며 그러함에도 정상적으로 실행 결과를 반환한다.
1.
Airflow Scheduler는 주기적으로 각종 정보를 담은 Metastore에 접근해 작업해야 할 DAG가 등록되었는지 확인한다.
2.
사용자가 Airflow WebServer에서 수동으로 DAG를 트리거하는 등 DAG 작업 스케줄에 무관하게 특정 DAG 작업이 실행되어야 하면, Scheduler 데몬은 DagRun 인스턴스를 Metastore에 생성하고 Queueing 서비스에 메시지를 밀어넣어 DAG 내 개별 작업을 수행한다. 각 메시지는 DAG Id, 작업 Id, 실행해야 할 함수의 이름, BashOperator 작업일 경우 bash 코드 등 작업의 정보를 담는다
3.
worker 데몬에 의해 관리되는 여러개의 celeryd 프로세스는 Queueing 서비스에서 규칙적으로 수행되어야 하는 작업을 확인하고 pull한다. 한 celeryd 프로세스가 작업 메시지를 pull하면, 메시지에 입각해 제공된 코드를 실행하며 성공 실패 여부가 담긴 작업 인스턴스를 Metastore에 기록한다.
Airflow Deployment
최신 버전의 Airflow는 CentOS7에서 관련 Dependency를 지원하지 않는다. kojipkg에서 맘먹고 세팅하거나 미리 환경이 설치된 Docker 이미지를 가져올 수 있겠지만, 난이도가 매우 높다. 그런 이유로 좀 더 낮은 2.0.0 버전을 사용하여 진행하였다.
모든 노드에 Airflow를 설치하여 작업을 분산하거나 특정 노드에만 할당한다. 각종 dependency 설치를 위해 Dockerfile의 일부를 다음과 같이 구성하였다.
RUN AIRFLOW_VERSION=2.0.0 && \
PYTHON_VERSION=3.6 && \
mkdir /opt/airflow && \
mkdir /opt/airflow/airflow-${AIRFLOW_VERSION} && \
mkdir /opt/airflow/airflow-${AIRFLOW_VERSION}/dags && \
mkdir /opt/airflow/airflow-${AIRFLOW_VERSION}/logs && \
ln -s /opt/airflow/airflow-${AIRFLOW_VERSION} /opt/airflow/current && \
cd /opt/airflow && \
pip3 install --upgrade pip && \
pip install --upgrade setuptools &&\
pip install -U pip setuptools wheel && \
export SLUGIFY_USES_TEXT_UNIDECODE=yes && \
yum -y install epel-release gcc gcc-c++ glibc-core glibc-common mysql-devel python-devel python-setuptools python3-devel python3-pip openldap-devel && \
pip install pytz pyOpenSSL ndg-httpsclient mysqlclient celery flower apache-airflow==${AIRFLOW_VERSION} \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt
Docker
복사
pip, setuptools를 업그레이드 해주어야 설치가 잘 진행된다. CentOS7은 기본 라이브러리 지원과 지속적인 레포 관리가 중단되어 dependency들의 버전선택을 잘 해주어야 한다. dependency의 dependency의 dependency가 꼬이면 답도 없다.
Airflow의 config파일은 Airflow를 첫 실행할 때 $AIRFLOW_HOME 경로에 생성된다. 파일이 제법 크기 떄문에 수정한 영역만 나타내었다.
#airflow.cfg
[core]
# config파일은 실행 초기에 자동으로 생성되기 때문에 환경변수인 $AIRFLOW_HOME만 설정하고 실행해주면 경로수정은 하지 않아도 된다.
dags_folder = /opt/airflow/airflow-2.0.0/dags
executor = CeleryExecutor
sql_alchemy_conn = mysql://airflow:airflow@192.168.0.29:3306/airflow
[webserver]
# 혹시 모를 내장 톰캣과의 충돌을 막기위해 port를 변경하였다.
base_url = http://localhost:5080
web_server_port = 5080
[celery]
# 리소스 부족으로 값을 낮추었다.
worker_concurrency = 2
#Queueing Service의 URL로, RabbitMQ를 사용하였다.
broker_url = amqp://guest:guest@172.16.238.4:5672/
result_backend = db+mysql://airflow:airflow@192.168.0.29:3306/airflow
Python
복사
해당 worker와 연결된 RabbitMQ의 queue 작업을 가져오면, worker는 작업 정보를 기반으로 자신이 가진 DAG를 탐색하여 수행해야 할 작업을 진행한다. 그런 이유로 dags와 logs 폴더는 모든 컨테이너에 마운트 하여 공유한다.
실제 프로덕션 환경에서는 git과 ansible 등을 이용하여 DAG를 공유한다고 한다.
Workflow 테스트
Metastore로 이용할 MySQL 사용자 등록을 해준다. 로컬의 MySQL Workbench에서도 확인하기 위해 서버의 ip와 로컬 ip를 등록해주었다.
mysql_native_password 설정을 해주지 않으면 Metastore 접근 시 (아마)SHA인증을 요구하기 때문에 설정해주어야 한다.
create schema airflow;
use airflow;
create user 'airflow'@192.168.0.14 IDENTIFIED BY 'airflow';
create user 'airflow'@192.168.0.29 IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@192.168.0.14 with grant option;
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@192.168.0.29 with grant option;
ALTER USER 'airflow'@192.168.0.14 IDENTIFIED WITH mysql_native_password BY 'airflow';
Plain Text
복사
RabbitMQ를 시작, 마스터 컨테이너에 webserver, scheduler, flower과 5개의 컨테이너에 worker를 띄운다. Metastore 초기화나 사용자 등록이 필요하다면 -D, —daemon 옵션으로 데몬 실행을 하는데 적용이 안돼서 &를 통해 백그라운드로 실행하였다.
[slave01 ~]$ rabbitmq-plugins enable rabbitmq_management
[slave01 ~]$ /sbin/rabbitmq-server -detached
// if not initialized
[slave01 ~]$ airflow db init
[slave01 ~]$ airflow webserver -D &
[slave01 ~]$ airflow scheduler -D &
[slave01 ~]$ airflow celery flower -D &
[slave01 ~]$ airflow celery worker -D -q queue-slave01 &
[master01 ~]$ airflow celery worker -D -q queue-master01 &
[master02 ~]$ airflow celery worker -D -q queue-master02 &
[slave02 ~]$ airflow celery worker -D -q queue-slave02 &
[slave03 ~]$ airflow celery worker -D -q queue-slave03 &
// if not created
[slave01 ~]$ airflow users create
--username guest
--firstname aa
--lastname bb
--role Admin
--email xx@xx.com
Plain Text
복사
Airflow Webserver에 접속하여 tutorial 로컬 작업을 Trigger하여 이상없이 작동 하는지 체크한다.
마운트 한 dags 폴더 내에 간단한 분산 작업을 넣고 테스트 한다.
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'start_date': days_ago(1),
}
dag = DAG(
'ClusterDAG_test',
default_args=default_args,
schedule_interval='@once'
)
t1 = BashOperator(
task_id='t1',
bash_command='sleep 1',
queue='queue-slave01',
dag=dag
)
t2 = BashOperator(
task_id='t2',
bash_command='sleep 1',
queue='queue-slave02',
dag=dag
)
t3 = BashOperator(
task_id='t3',
bash_command='sleep 1',
queue='queue-master01',
dag=dag
)
t1 >> [t2, t3]
Python
복사
RabbitMQ Web에서 Queue에 할당 된 메시지들을 확인할 수 있다.
Flower Web에서 worker노드 별 작업 수행 결과 및 로그를 모니터링할 수 있다.
개선방안
Airflow Scheduler는 SPOF(Single Point of Failure)이기 떄문에 High Availablity에 방해가 된다. Clairvoyant에서 제공하는 airflow-scheduler-failover-controller는 이를 극복할 수 있도록 돕는다.