본문 바로가기

Airflow 개념 본문

개발/Airflow

Airflow 개념

Louisus 2021. 8. 6. 11:19
728x90

Airflow란?

workflow 스케쥴링, 모니터링 도구
DAG(Directed Acyclic Graph)라는 개념으로 동작, python으로 DAG를 작성하고 순서를 정의
Airflow pipeline(동작 순서, 방식)을 python을 이용해 구성하기 때문에 동적인 구성 가능
Airflow는 각 태스크에서 오류가 발생할 때마다 여러 번 재 실행하기에 매우 회복성 높은 설계를 이끌어낸다.
Airflow를 완전히 멈췄다가 미 완료 태스크를 재시작하면서 실행 중이던 업무 흐름으로 되돌아갈 수 있다.

Airflow 구성

=> Airflow는 개발자가 작성한 Python DAG를 읽고, 거기에 맞춰 Scheduler가 Task를 스케줄링하면, Worker가 Task를 가져가 실행합니다. Task의 실행 상태는 Database에 저장되고, 사용자는 UI를 통해서 각 Task의 실행 상태, 성공 여부 등을 확인

WebServer - 웹 UI를 표현하고, workflow 상태를 표시하고 실행/재시작, 로그 확인 기능
Scheduler - 모든 DAG와 Task에 대하여 모니터링 및 관리하고, 실행해야 할 Task를 스케줄링.
스케쥴러는 각 DAG의 시작 시간(start_date)와 주기(schedule_interval)를 통해 어떤 DAG를 언제 실행할 지 결정. 이 때 시작 시간이 과거로 설정되어 있으면 스케줄러는 과거 시간부터 현재까지 실행되었어야 하는 DAG들을 모조리 실행 한다. 이를 backfill(Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤하는 경우 Backfill 사용 가능. 과거값 기준으로 재실행. backfill은 기본적으로 실행되지 않은 Task들만 실행)이라고 부르며, DAG 옵션인 catchup을 False로 설정함으로써 비활성화 가능.

execution_date - execution_date을 넣지 않으면 task를 실행할 수 없다. job이 실행(execution)된 id가 필요해서 이 id는 마치 쇼핑할 때의 주문 번호(run id) 처럼 웹 UI에 로깅되는 기준의 id로 사용되기도 하며, 스케줄러가 실행하는 transaction id. 예약 시간이 아니라 "예약을 잡으려고 시도한 시간"

& 주의 사항 - execution_date 는 실제 현재 시간과 다름.

https://m.blog.naver.com/gyrbsdl18/221561318823

https://it-sunny-333.tistory.com/157

DB - DAG, Task 정의, DAG run, Task 관리
Worker - 실제 Task를 실행하는 주체. Executor 종류에 따라 동작 방식이 다양함
Task - dag 안에 하나의 일의 단위
Task instance - 각 Task의 개별 상태를 의미하며 ‘running’, ‘success’, ‘failed’, ‘skipped’, ‘up for retry’ 등의 상태 값 보유
DAG - 실행하고 싶은 일(task)들의 실행 순서롤 구조화하고 실행 context를 제공. DAG에 대한 전체 정보를 저장 및 관리. Task들의 dependency를 정의. Airflow의 DAG는 실행하고 싶은 Task들의 관계와 dependency를 표현하고 있는 Task들의 모음. 어떤 순서와 어떤 dependency로 실행할지, 어떤 스케줄로 실행할지 등의 정보를 가지고 있습니다. 따라서 DAG를 정확하게 설정해야, Task를 원하는 대로 스케쥴링
& How to create DAG: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
& Parameters: https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html
dag_id : dag 이름
schedule_interval : '*/5 0 * * *' (매 5분마다 실행) 과 같이 cron 명령같이 정의할 수도 있고 '@daily', timedelta(days=1) 과같이 사용 가능
start_date : datetime(2021,05,10)과 같이 DAG를 언제부터 시작할 것인지 지정. DAG 구동의 기준점이 될 시간, 이 때 시작 되는게 아님을 주의!! start_date는 schedule_interval에 의해 처음 구동 될 시간을 정하게 된다.
https://forum.astronomer.io/t/airflow-start-date-concepts/393

https://medium.com/nerd-for-tech/airflow-catchup-backfill-demystified-355def1b6f92

https://it-sunny-333.tistory.com/157

https://monkeydev.tistory.com/2

  4. catchup: Scheduler는 DAG의 전체 Lifetime을 확인하면서 실행되지 않은 DAG를 실행. 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것. 만약 과거의 작업은 중요하지 않고, 현재 시점의 이후 DAG만 실행되어야 한다면 False로 설정 필요.

  5. dagrun_timeout (datetime.timedelta) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}

dag = DAG('tutorial', catchup=False, default_args=default_args)

Operator - DAG안에서 정의되는 작업 함수. BashOperator, PythonOperator 등 여러 오퍼레이터 제공, 아래는 PythonOperator 속성 설명
https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating
Python Operator : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.PythonOperator
LastestOnlyOperator : 가장 최근에 스케줄된 dag만 실행. 현재 시간이 execution_time 과 다음 스케줄 된 execution_time 사이가 아닌 경우 LatestOnlyOperator는 모든 하위 태스크를 스킵
task_id : task 이름
python_callable : 실행할 함수명 설정
dag : dag 객체 지정
execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.

참고

Jinja template
https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html?highlight=macro\

Hooks - 외부 플랫폼(aws, slack 등) or 데이터베이스(mysql, hive 등)들을 쉽게 사용할 수 있게 정의해둔 오퍼레이터의 한 종류

Sensor - 지정된 행동이 성공했는지 주기적으로 확인하는데 쓰이는 오퍼레이터

Pools - 시스템에서 한 순간 많은 프로세스가 작동하면 부하가 심해짐. 때문에 병렬 실행되는 태스크의 수를 제한하는 역할. airflow.cfg 설정에서 변경 가능

Connections - 외부 시스템과 연결이 필요한 정보는 Airflow metastore DB에 저장. Airflow pipeline은 conn_id를 통해서 중앙 관리되는 연결 정보를 얻음.

SubDAGs - 반복되는 패턴을 정의하기 적절.

병렬로 실행되는 task-* operator들을 하나의 subDAG로 정의 가능

#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

Dag is returned by a factory method

def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)

dummy_operator = DummyOperator(
task_id='dummy_task',
dag=dag,
)

return dag

main_dag.py

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag

PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'

main_dag = DAG(
dag_id=PARENT_DAG_NAME,
schedule_interval=timedelta(hours=1),
start_date=datetime(2016, 1, 1)
)

sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
main_dag.schedule_interval),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)

XCom(Cross Communication) - Airflow task는 독립적으로 실행되기 때문에 기본적으로 서로 통신할 수단이 없음. 하지만 작업 흐름을 만들다 보면 이전 작업의 결과, 요소 등을 다음 작업에 전달하는 경우 발생. 이때 XCom을 이용해 메세지를 교환.
=> Task 간의 통신을 위한 메모 정도의 목적으로 설계, 대용량 파일 전송 등의 용도로는 적합하지 않음
=> XCom을 적용하기 위해서는 "provide_context":True 설정이 꼭 있어야 함

https://it-sunny-333.tistory.com/160?category=458977

https://it-sunny-333.tistory.com/160

https://moons08.github.io/programming/airflow-xcom/

dag arguments

args = {
"owner": "user",
"depends_on_past": False,
"start_date": days_ago(2),
"provide_context": True
}

dag = DAG (
"xcom_test",
default_args = args,
schedule_interval="@once",
)

push

def push_func(**context):
return 'xcom_test'

def push_by_xcom_push(**context):
context['task_instance'].xcom_push(key='pushed_value', value='xcom_push')

원하는 결과값 바로 리턴

push_info = PythonOperator(
task_id = 'push_info',
python_callable=push_func,
dag=dag
)

키-값 지정해서 메소드로 전달

push_by_xcom = PythonOperator(
task_id='push_by_xcom',
python_callable=push_by_xcom_push,
dag=dag
)

pull

def pull_function(**context):
# ti는 task_instance의 줄임
value = context['ti'].xcom_pull(task_ids='push_info')
print(value)

pull_1 = PythonOperator(
task_id='pull_info_1',
python_callable=pull_function,
dag=dag,
)

pull_2 = BashOperator(
task_id='pull_info_2',
bash_command='echo "{{ ti.xcom_pull(key="pushed_value") }}"', # .sh 파일 안에서도 사용 가능!
dag=dag,
)

subdag

def xcom_subdag(parent_dag_name, child_dag_name, args):

def pull_and_push(**context):
    values = context['ti'].xcom_pull(dag_id=parent_dag_name) # pull from parent
    val1 = context['ti'].xcom_pull(dag_id=parent_dag_name, task_ids='push_info')
    val2 = context['ti'].xcom_pull(dag_id=parent_dag_name,
                                    task_ids='push_by_xcom', key='pushed_value')

    context['ti'].xcom_push(key='val0', value=values)# push to child
    context['ti'].xcom_push(key='val1', value=val1)
    context['ti'].xcom_push(key='val2', value=val2)

    print(values)
    return values

dag = DAG(
    dag_id='%s.%s' % (parent_dag_name, child_dag_name),
    default_args=args,
    schedule_interval=None,
)

pull_from_parent = PythonOperator(
    task_id='%s-pull_from_parent' % (child_dag_name),
    python_callable=pull_and_push,
    dag=dag,
    )
return dag

Trigger Rules
일반적으로 task는 상위 task가 성공할 때만 실행됩니다. 이보다 복잡한 의존성 설정을 위한 trigger rule들이 존재

all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.
none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
dummy: dependencies are just for show, trigger at will

Dag 예외 처리
https://www.astronomer.io/guides/error-notifications-in-airflow

https://medium.com/unruly-engineering/quick-easy-alerting-for-apache-airflow-53c3f1ba2ca

=> Context key-value 값

{'conf': <airflow.configuration.AirflowConfigParser object at 0x7f58c9075908>,
'dag': <DAG: workschedulerTest>,
'dag_run': <DagRun workschedulerTest @ 2021-06-28 03:58:00+00:00: scheduled__2021-06-28T03:58:00+00:00, externally triggered: False>,
'ds': '2021-06-28',
'ds_nodash': '20210628',
'execution_date': DateTime(2021, 6, 28, 3, 58, 0, tzinfo=Timezone('+00:00')),
'inlets': [],
'macros': <module 'airflow.macros' from '/usr/local/lib/python3.6/site-packages/airflow/macros/init.py'>,
'next_ds': '2021-06-28',
'next_ds_nodash': '20210628',
'next_execution_date': DateTime(2021, 6, 28, 3, 59, 0, tzinfo=Timezone('UTC')),
'outlets': [],
'params': {},
'prev_ds': '2021-06-28',
'prev_ds_nodash': '20210628',
'prev_execution_date': DateTime(2021, 6, 28, 3, 57, 0, tzinfo=Timezone('UTC')),
'prev_execution_date_success': <Proxy at 0x7f58362f3ac8 with factory <function TaskInstance.get_template_context.. at 0x7f58363616a8>>,
'prev_start_date_success': <Proxy at 0x7f58362f3708 with factory <function TaskInstance.get_template_context.. at 0x7f5836358b70>>,
'run_id': 'scheduled__2021-06-28T03:58:00+00:00',
'task': <Task(PythonOperator): task_1>,
'task_instance': <TaskInstance: workschedulerTest.task_1 2021-06-28T03:58:00+00:00 [failed]>,
'task_instance_key_str': 'workschedulerTest__task_1__20210628',
'test_mode': False,
'ti': <TaskInstance: workschedulerTest.task_1 2021-06-28T03:58:00+00:00 [failed]>,
'tomorrow_ds': '2021-06-29',
'tomorrow_ds_nodash': '20210629',
'ts': '2021-06-28T03:58:00+00:00',
'ts_nodash': '20210628T035800',
'ts_nodash_with_tz': '20210628T035800+0000',
'var': {'json': None, 'value': None},
'yesterday_ds': '2021-06-27',
'yesterday_ds_nodash': '20210627',
'exception': FileNotFoundError(2, 'No such file or directory')}

=> Component for configuring

dag is the name of our DAG

task is the name of our Task

ts is the timestamp of when our Task was scheduled to run

log_url is the location of the logs for the instance of this Task on our Airflow instance

Timezone
airflow 라이브러리 내에서 taskinstance를 항상 utc timezone으로 고정, 때문에 start_date ~ timezone을 변경해도 내부 라이브러리에서 다시 utc로 변경하기 때문에 start_date, end_date, execution_date 등의 요소가 utc 기준으로 로그 기록이나 DB에 저장됨.

https://yahwang.github.io/posts/87

https://yahwang.github.io/posts/airflow

Airflow 디렉토리 구조

airflow
airflow.cfg : airflow 환경설정 파일
airflow.db : 데이터베이스(SQLite)파일
dags : dag들을 저장하는 디렉토리
xxx.py : dag 정의 파이썬 파일
logs : 로그파일을 저장하는 디렉토리

Airflow 명령어

airflow dags list : dag 리스트 출력
airflow db init : 데이터베이스 초기화
airflow tasks test "dag id 명" "task id 명" "인수" : task별로 실행

'개발 > Airflow' 카테고리의 다른 글

Airflow 도커로 세팅  (0) 2021.08.06
Airflow 세팅  (0) 2021.08.06
Comments