IT Best Practise/Airflow
05-02. 데이터 전달 방법(Task Flow 방식)
GilliLab IT
2024. 11. 12. 14:20
728x90
반응형
05-02. 데이터 전달 방법
전통적인 데이터 전달 방법
전통적인 데이터 전달 방법은 파일 시스템, 데이터베이스, 메시지 큐 등을 사용하여 Task 간에 데이터를 전달하는 방식입니다.
예제:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import json
# 데이터를 파일에 저장하는 함수
def write_to_file(**kwargs):
data = {'message': 'Hello from Task 1'}
with open('/tmp/data.json', 'w') as f:
json.dump(data, f)
# 파일에서 데이터를 읽는 함수
def read_from_file(**kwargs):
with open('/tmp/data.json', 'r') as f:
data = json.load(f)
print(f"Received message: {data['message']}")
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 객체 생성
dag = DAG(
'traditional_data_transfer_dag',
default_args=default_args,
schedule_interval='@daily',
)
# PythonOperator 태스크 정의
write_task = PythonOperator(
task_id='write_task',
python_callable=write_to_file,
provide_context=True,
dag=dag,
)
read_task = PythonOperator(
task_id='read_task',
python_callable=read_from_file,
provide_context=True,
dag=dag,
)
# 태스크 간의 의존성 설정
write_task >> read_task
Task Flow를 사용하는 방법
Task Flow API를 사용하면 Python 함수 간에 직접 데이터를 전달할 수 있습니다. 이는 더 간단하고 직관적인 데이터 전달 방법입니다.
예제:
from airflow.decorators import dag, task
from datetime import datetime
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# DAG 데코레이터 사용
@dag(default_args=default_args, schedule_interval='@daily', catchup=False)
def task_flow_dag():
# Task 데코레이터 사용
@task
def extract():
return {'message': 'Hello from Task 1'}
@task
def transform(data):
data['message'] = data['message'] + ' - Transformed'
return data
@task
def load(data):
print(f"Received message: {data['message']}")
# Task 간의 데이터 전달
data = extract()
transformed_data = transform(data)
load(transformed_data)
# DAG 인스턴스 생성
dag_instance = task_flow_dag()
이제 Airflow에서 전통적인 데이터 전달 방법과 Task Flow API를 사용하는 방법을 이해할 수 있습니다. 이러한 방법들을 활용하여 Task 간의 데이터 전달을 효율적으로 관리하고 워크플로우를 더욱 간단하게 구성할 수 있습니다.
728x90
반응형