IT Best Practise/Airflow

05-02. 데이터 전달 방법(Task Flow 방식)

GilliLab IT 2024. 11. 12. 14:20
728x90
반응형

05-02. 데이터 전달 방법

전통적인 데이터 전달 방법

전통적인 데이터 전달 방법은 파일 시스템, 데이터베이스, 메시지 큐 등을 사용하여 Task 간에 데이터를 전달하는 방식입니다.

예제:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import json

# 데이터를 파일에 저장하는 함수
def write_to_file(**kwargs):
    data = {'message': 'Hello from Task 1'}
    with open('/tmp/data.json', 'w') as f:
        json.dump(data, f)

# 파일에서 데이터를 읽는 함수
def read_from_file(**kwargs):
    with open('/tmp/data.json', 'r') as f:
        data = json.load(f)
    print(f"Received message: {data['message']}")

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 객체 생성
dag = DAG(
    'traditional_data_transfer_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# PythonOperator 태스크 정의
write_task = PythonOperator(
    task_id='write_task',
    python_callable=write_to_file,
    provide_context=True,
    dag=dag,
)

read_task = PythonOperator(
    task_id='read_task',
    python_callable=read_from_file,
    provide_context=True,
    dag=dag,
)

# 태스크 간의 의존성 설정
write_task >> read_task

Task Flow를 사용하는 방법

Task Flow API를 사용하면 Python 함수 간에 직접 데이터를 전달할 수 있습니다. 이는 더 간단하고 직관적인 데이터 전달 방법입니다.

예제:

from airflow.decorators import dag, task
from datetime import datetime

# 기본 인자 설정
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# DAG 데코레이터 사용
@dag(default_args=default_args, schedule_interval='@daily', catchup=False)
def task_flow_dag():

    # Task 데코레이터 사용
    @task
    def extract():
        return {'message': 'Hello from Task 1'}

    @task
    def transform(data):
        data['message'] = data['message'] + ' - Transformed'
        return data

    @task
    def load(data):
        print(f"Received message: {data['message']}")

    # Task 간의 데이터 전달
    data = extract()
    transformed_data = transform(data)
    load(transformed_data)

# DAG 인스턴스 생성
dag_instance = task_flow_dag()

이제 Airflow에서 전통적인 데이터 전달 방법과 Task Flow API를 사용하는 방법을 이해할 수 있습니다. 이러한 방법들을 활용하여 Task 간의 데이터 전달을 효율적으로 관리하고 워크플로우를 더욱 간단하게 구성할 수 있습니다.

728x90
반응형