IT Best Practise/Airflow
04. DAG 작성
GilliLab IT
2024. 11. 12. 14:04
728x90
반응형
4. DAG 작성
DAG 작성 기본
DAG 작성의 기본은 작업의 흐름을 정의하는 것입니다. DAG는 방향성이 있는 비순환 그래프로, 작업의 순서와 의존성을 나타냅니다.
예제:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 객체 생성
dag = DAG(
'basic_dag',
default_args=default_args,
schedule_interval='@daily',
)
# DummyOperator를 사용하여 시작과 끝 태스크 정의
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
# 태스크 간의 의존성 설정
start >> end
Task 간의 의존성 설정
Task 간의 의존성 설정은 DAG 내에서 작업의 순서를 정의하는 데 중요합니다. 의존성은 >>
연산자를 사용하여 설정할 수 있습니다.
예제:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 객체 생성
dag = DAG(
'dependency_dag',
default_args=default_args,
schedule_interval='@daily',
)
# DummyOperator를 사용하여 시작, 중간, 끝 태스크 정의
start = DummyOperator(task_id='start', dag=dag)
middle = DummyOperator(task_id='middle', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
# 태스크 간의 의존성 설정
start >> middle >> end
다양한 Operator 사용법 (PythonOperator, BashOperator 등)
Airflow는 다양한 Operator를 제공하여 다양한 작업을 수행할 수 있습니다. 여기서는 PythonOperator와 BashOperator의 사용법을 살펴봅니다.
예제:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
# PythonOperator에서 실행할 함수 정의
def print_hello():
print("Hello, World!")
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 객체 생성
dag = DAG(
'operator_dag',
default_args=default_args,
schedule_interval='@daily',
)
# PythonOperator 태스크 정의
python_task = PythonOperator(
task_id='python_task',
python_callable=print_hello,
dag=dag,
)
# BashOperator 태스크 정의
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello, World!"',
dag=dag,
)
# 태스크 간의 의존성 설정
python_task >> bash_task
Branching 및 SubDAG
Branching은 조건에 따라 다른 작업을 실행할 수 있게 합니다. SubDAG는 DAG 내에서 또 다른 DAG를 정의할 수 있게 합니다.
예제:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime
# BranchPythonOperator에서 실행할 함수 정의
def choose_branch():
return 'branch_a' if datetime.now().second % 2 == 0 else 'branch_b'
# SubDAG 정의 함수
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
start >> end
return dag_subdag
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 객체 생성
dag = DAG(
'branching_subdag_dag',
default_args=default_args,
schedule_interval='@daily',
)
# 시작 태스크 정의
start = DummyOperator(task_id='start', dag=dag)
# BranchPythonOperator 태스크 정의
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
dag=dag,
)
# Branch A와 Branch B 태스크 정의
branch_a = DummyOperator(task_id='branch_a', dag=dag)
branch_b = DummyOperator(task_id='branch_b', dag=dag)
# SubDagOperator 태스크 정의
subdag_task = SubDagOperator(
task_id='subdag_task',
subdag=subdag('branching_subdag_dag', 'subdag', default_args),
dag=dag,
)
# 끝 태스크 정의
end = DummyOperator(task_id='end', dag=dag)
# 태스크 간의 의존성 설정
start >> branch
branch >> branch_a >> subdag_task >> end
branch >> branch_b >> end
이제 Airflow에서 DAG를 작성하는 기본 개념과 다양한 Operator, Task 간의 의존성 설정, Branching 및 SubDAG 사용법을 이해할 수 있습니다. 이러한 개념들을 활용하여 복잡한 워크플로우를 효율적으로 관리하고 자동화할 수 있습니다. Airflow의 강력한 기능을 통해 데이터 파이프라인을 더욱 효과적으로 구축해 보세요.
728x90
반응형