IT Best Practise/Airflow

Apache Airflow 핵심 메카니즘

GilliLab IT 2024. 11. 13. 23:34
728x90
반응형

Apache Airflow 핵심 메카니즘

Apache Airflow에서 데이터 전달을 위한 설정이나 인자에 대해 자세히 설명하겠습니다. Airflow는 작업 간에 데이터를 전달하기 위해 다양한 메커니즘을 제공합니다. 주요 개념으로는 TaskInstance(ti), XCom, provide_context 등이 있습니다.

1. TaskInstance (ti)

TaskInstance는 Airflow에서 각 작업의 실행 인스턴스를 나타냅니다. TaskInstance 객체는 작업의 상태, 실행 시간, 컨텍스트 정보 등을 포함합니다. TaskInstance는 작업 간에 데이터를 전달하는 데 중요한 역할을 합니다.

예시

def my_task(**kwargs):
    ti = kwargs['ti']
    # TaskInstance 객체를 사용하여 XCom에 접근할 수 있습니다.
    value = ti.xcom_pull(task_ids='previous_task')
    print(f"Pulled value: {value}")

2. XCom (Cross-Communication)

XCom은 작업 간에 데이터를 전달하기 위한 Airflow의 내장 메커니즘입니다. XCom은 작은 크기의 데이터를 저장하고 검색하는 데 사용됩니다. 각 XCom은 키-값 쌍으로 저장되며, 특정 작업에서 데이터를 푸시(push)하고 다른 작업에서 이를 풀(pull)할 수 있습니다.

XCom 푸시

def push_function(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='my_key', value='my_value')

XCom 풀

def pull_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(key='my_key', task_ids='push_task')
    print(f"Pulled value: {value}")

3. provide_context

provide_context는 PythonOperator에서 사용되는 매개변수로, 이를 True로 설정하면 Airflow는 작업 함수에 실행 컨텍스트를 전달합니다. 실행 컨텍스트는 DAG 실행과 관련된 다양한 정보를 포함하는 사전(dictionary)입니다. 이 사전에는 execution_date, ds, next_execution_date, prev_execution_date, dag, task, task_instance(ti) 등이 포함됩니다.

예시

def my_task(**kwargs):
    execution_date = kwargs['execution_date']
    print(f"Execution date: {execution_date}")

my_task_operator = PythonOperator(
    task_id='my_task',
    python_callable=my_task,
    provide_context=True,
    dag=dag,
)

4. op_kwargs

op_kwargs는 PythonOperator에서 사용되는 매개변수로, 작업 함수에 전달할 인수를 사전 형태로 정의합니다. 이 사전의 키는 작업 함수의 매개변수 이름과 일치해야 합니다.

예시

def my_task(param1, param2, **kwargs):
    print(f"param1: {param1}, param2: {param2}")

my_task_operator = PythonOperator(
    task_id='my_task',
    python_callable=my_task,
    op_kwargs={'param1': 'value1', 'param2': 'value2'},
    dag=dag,
)

전체 예시

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'data_transfer_example',
    default_args=default_args,
    schedule_interval=None,
)

def push_function(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='my_key', value='my_value')
    print("Pushed value: my_value")

def pull_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(key='my_key', task_ids='push_task')
    print(f"Pulled value: {value}")

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=dag,
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag,
)

push_task >> pull_task

push_taskXCom을 사용하여 데이터를 푸시하고, pull_task가 이를 풀하여 출력합니다. provide_context=True를 사용하여 실행 컨텍스트를 작업 함수에 전달하고 있고, op_kwargs를 사용하여 작업 함수에 인수를 전달하고 있습니다.

728x90
반응형