IT Best Practise/Airflow

04. DAG 작성

GilliLab IT 2024. 11. 12. 14:04
728x90
반응형

4. DAG 작성

DAG 작성 기본

DAG 작성의 기본은 작업의 흐름을 정의하는 것입니다. DAG는 방향성이 있는 비순환 그래프로, 작업의 순서와 의존성을 나타냅니다.

예제:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'basic_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# DummyOperator를 사용하여 시작과 끝 태스크 정의
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

# 태스크 간의 의존성 설정
start >> end

Task 간의 의존성 설정

Task 간의 의존성 설정은 DAG 내에서 작업의 순서를 정의하는 데 중요합니다. 의존성은 >> 연산자를 사용하여 설정할 수 있습니다.

예제:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'dependency_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# DummyOperator를 사용하여 시작, 중간, 끝 태스크 정의
start = DummyOperator(task_id='start', dag=dag)
middle = DummyOperator(task_id='middle', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

# 태스크 간의 의존성 설정
start >> middle >> end

다양한 Operator 사용법 (PythonOperator, BashOperator 등)

Airflow는 다양한 Operator를 제공하여 다양한 작업을 수행할 수 있습니다. 여기서는 PythonOperator와 BashOperator의 사용법을 살펴봅니다.

예제:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

# PythonOperator에서 실행할 함수 정의
def print_hello():
    print("Hello, World!")

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'operator_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# PythonOperator 태스크 정의
python_task = PythonOperator(
    task_id='python_task',
    python_callable=print_hello,
    dag=dag,
)

# BashOperator 태스크 정의
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello, World!"',
    dag=dag,
)

# 태스크 간의 의존성 설정
python_task >> bash_task

Branching 및 SubDAG

Branching은 조건에 따라 다른 작업을 실행할 수 있게 합니다. SubDAG는 DAG 내에서 또 다른 DAG를 정의할 수 있게 합니다.

예제:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime

# BranchPythonOperator에서 실행할 함수 정의
def choose_branch():
    return 'branch_a' if datetime.now().second % 2 == 0 else 'branch_b'

# SubDAG 정의 함수
def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        start = DummyOperator(task_id='start')
        end = DummyOperator(task_id='end')
        start >> end
    return dag_subdag

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'branching_subdag_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# 시작 태스크 정의
start = DummyOperator(task_id='start', dag=dag)

# BranchPythonOperator 태스크 정의
branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
    dag=dag,
)

# Branch A와 Branch B 태스크 정의
branch_a = DummyOperator(task_id='branch_a', dag=dag)
branch_b = DummyOperator(task_id='branch_b', dag=dag)

# SubDagOperator 태스크 정의
subdag_task = SubDagOperator(
    task_id='subdag_task',
    subdag=subdag('branching_subdag_dag', 'subdag', default_args),
    dag=dag,
)

# 끝 태스크 정의
end = DummyOperator(task_id='end', dag=dag)

# 태스크 간의 의존성 설정
start >> branch
branch >> branch_a >> subdag_task >> end
branch >> branch_b >> end

이제 Airflow에서 DAG를 작성하는 기본 개념과 다양한 Operator, Task 간의 의존성 설정, Branching 및 SubDAG 사용법을 이해할 수 있습니다. 이러한 개념들을 활용하여 복잡한 워크플로우를 효율적으로 관리하고 자동화할 수 있습니다. Airflow의 강력한 기능을 통해 데이터 파이프라인을 더욱 효과적으로 구축해 보세요.

728x90
반응형