Apache Airflow 핵심 메카니즘
Apache Airflow 핵심 메카니즘
Apache Airflow에서 데이터 전달을 위한 설정이나 인자에 대해 자세히 설명하겠습니다. Airflow는 작업 간에 데이터를 전달하기 위해 다양한 메커니즘을 제공합니다. 주요 개념으로는 TaskInstance
(ti), XCom
, provide_context
등이 있습니다.
1. TaskInstance (ti)
TaskInstance
는 Airflow에서 각 작업의 실행 인스턴스를 나타냅니다. TaskInstance
객체는 작업의 상태, 실행 시간, 컨텍스트 정보 등을 포함합니다. TaskInstance
는 작업 간에 데이터를 전달하는 데 중요한 역할을 합니다.
예시
def my_task(**kwargs):
ti = kwargs['ti']
# TaskInstance 객체를 사용하여 XCom에 접근할 수 있습니다.
value = ti.xcom_pull(task_ids='previous_task')
print(f"Pulled value: {value}")
2. XCom (Cross-Communication)
XCom
은 작업 간에 데이터를 전달하기 위한 Airflow의 내장 메커니즘입니다. XCom
은 작은 크기의 데이터를 저장하고 검색하는 데 사용됩니다. 각 XCom
은 키-값 쌍으로 저장되며, 특정 작업에서 데이터를 푸시(push)하고 다른 작업에서 이를 풀(pull)할 수 있습니다.
XCom 푸시
def push_function(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='my_key', value='my_value')
XCom 풀
def pull_function(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(key='my_key', task_ids='push_task')
print(f"Pulled value: {value}")
3. provide_context
provide_context
는 PythonOperator에서 사용되는 매개변수로, 이를 True
로 설정하면 Airflow는 작업 함수에 실행 컨텍스트를 전달합니다. 실행 컨텍스트는 DAG 실행과 관련된 다양한 정보를 포함하는 사전(dictionary)입니다. 이 사전에는 execution_date
, ds
, next_execution_date
, prev_execution_date
, dag
, task
, task_instance
(ti) 등이 포함됩니다.
예시
def my_task(**kwargs):
execution_date = kwargs['execution_date']
print(f"Execution date: {execution_date}")
my_task_operator = PythonOperator(
task_id='my_task',
python_callable=my_task,
provide_context=True,
dag=dag,
)
4. op_kwargs
op_kwargs
는 PythonOperator에서 사용되는 매개변수로, 작업 함수에 전달할 인수를 사전 형태로 정의합니다. 이 사전의 키는 작업 함수의 매개변수 이름과 일치해야 합니다.
예시
def my_task(param1, param2, **kwargs):
print(f"param1: {param1}, param2: {param2}")
my_task_operator = PythonOperator(
task_id='my_task',
python_callable=my_task,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)
전체 예시
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
'data_transfer_example',
default_args=default_args,
schedule_interval=None,
)
def push_function(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='my_key', value='my_value')
print("Pushed value: my_value")
def pull_function(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(key='my_key', task_ids='push_task')
print(f"Pulled value: {value}")
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=dag,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag,
)
push_task >> pull_task
push_task
가 XCom
을 사용하여 데이터를 푸시하고, pull_task
가 이를 풀하여 출력합니다. provide_context=True
를 사용하여 실행 컨텍스트를 작업 함수에 전달하고 있고, op_kwargs
를 사용하여 작업 함수에 인수를 전달하고 있습니다.