IT Best Practise/Airflow

05. 고급 기능

GilliLab IT 2024. 11. 12. 14:08
728x90
반응형

5. 고급 기능

XCom을 이용한 Task 간 데이터 교환

XCom은 Task 간에 데이터를 교환하는 데 사용됩니다. XCom은 pushpull 메서드를 사용하여 데이터를 주고받을 수 있습니다.

예제:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Task 1에서 데이터를 XCom으로 push하는 함수
def push_function(**kwargs):
    kwargs['ti'].xcom_push(key='message', value='Hello from Task 1')

# Task 2에서 XCom 데이터를 pull하는 함수
def pull_function(**kwargs):
    message = kwargs['ti'].xcom_pull(key='message', task_ids='push_task')
    print(f"Received message: {message}")

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'xcom_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# PythonOperator 태스크 정의
push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=dag,
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag,
)

# 태스크 간의 의존성 설정
push_task >> pull_task

Task Instance 및 DAG Run 관리

Task Instance 및 DAG Run 관리는 특정 Task와 DAG의 실행 상태를 모니터링하고 관리하는 데 사용됩니다.

예제:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Task Instance 정보를 출력하는 함수
def print_task_instance_info(**kwargs):
    ti = kwargs['ti']
    print(f"Task Instance: {ti}")

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'task_instance_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# DummyOperator 태스크 정의
start = DummyOperator(task_id='start', dag=dag)

# PythonOperator 태스크 정의
print_info = PythonOperator(
    task_id='print_info',
    python_callable=print_task_instance_info,
    provide_context=True,
    dag=dag,
)

# 태스크 간의 의존성 설정
start >> print_info

SLA (Service Level Agreement) 설정

SLA는 특정 Task가 정해진 시간 내에 완료되지 않을 경우 경고를 발생시키는 데 사용됩니다.

예제:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'sla': timedelta(minutes=30),  # SLA 설정
}

# DAG 객체 생성
dag = DAG(
    'sla_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# DummyOperator 태스크 정의
task = DummyOperator(task_id='task_with_sla', dag=dag)

Task Retry 및 Timeout 설정

Task Retry 및 Timeout 설정은 Task가 실패했을 때 재시도 횟수와 시간 제한을 설정하는 데 사용됩니다.

예제:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# 실패를 유도하는 함수
def fail_function():
    raise Exception("This task is designed to fail.")

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 3,  # 재시도 횟수 설정
    'retry_delay': timedelta(minutes=5),  # 재시도 간격 설정
    'execution_timeout': timedelta(minutes=10),  # 실행 시간 제한 설정
}

# DAG 객체 생성
dag = DAG(
    'retry_timeout_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# PythonOperator 태스크 정의
retry_task = PythonOperator(
    task_id='retry_task',
    python_callable=fail_function,
    dag=dag,
)

이제 Airflow에서 XCom을 이용한 Task 간 데이터 교환, Task Instance 및 DAG Run 관리, SLA 설정, Task Retry 및 Timeout 설정과 같은 고급 기능을 이해할 수 있습니다. 이러한 기능들을 활용하여 더욱 강력하고 유연한 워크플로우를 구축해 보세요.

728x90
반응형