728x90
반응형
실무: Apache Airflow 병렬 태스크 - 1
Apache Airflow에서 첫 번째 고객별 작업 처리한 후, 다시 고객별 작업을 추가하는 방법을 단계별로 설명하겠습니다
1. 기본 설정 및 라이브러리 임포트
먼저 필요한 라이브러리를 임포트하고 기본 설정을 정의합니다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
import pandas as pd
import sqlalchemy
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
'customer_workflow',
default_args=default_args,
schedule_interval=None,
)
2. 대상 고객 추출 작업 정의
첫 번째 작업은 데이터베이스에서 대상 고객을 추출하는 작업입니다. 이 작업은 고객 ID 리스트를 반환합니다.
def fetch_customers(**kwargs):
engine = sqlalchemy.create_engine('postgresql://user:password@localhost/dbname')
query = "SELECT customer_id FROM customers WHERE target = TRUE"
df = pd.read_sql(query, engine)
customer_ids = df['customer_id'].tolist()
return customer_ids
fetch_customers_task = PythonOperator(
task_id='fetch_customers',
python_callable=fetch_customers,
provide_context=True,
dag=dag,
)
3. 첫 번째 고객별 병렬 작업 정의
고객별로 병렬로 실행될 첫 번째 작업을 정의합니다. 이 작업은 각 고객 ID를 받아서 처리합니다.
def process_customer_stage_1(customer_id, **kwargs):
# 첫 번째 고객별 처리 로직을 여기에 작성합니다.
print(f"Processing customer {customer_id} in stage 1")
def create_customer_tasks_stage_1(customer_ids, dag):
with TaskGroup(group_id='customer_tasks_stage_1', dag=dag) as customer_tasks_stage_1:
for customer_id in customer_ids:
PythonOperator(
task_id=f'process_customer_stage_1_{customer_id}',
python_callable=process_customer_stage_1,
op_kwargs={'customer_id': customer_id},
dag=dag,
)
return customer_tasks_stage_1
4. 두 번째 고객별 병렬 작업 정의
고객별로 병렬로 실행될 두 번째 작업을 정의합니다. 이 작업은 각 고객 ID를 받아서 처리합니다.
def process_customer_stage_2(customer_id, **kwargs):
# 두 번째 고객별 처리 로직을 여기에 작성합니다.
print(f"Processing customer {customer_id} in stage 2")
def create_customer_tasks_stage_2(customer_ids, dag):
with TaskGroup(group_id='customer_tasks_stage_2', dag=dag) as customer_tasks_stage_2:
for customer_id in customer_ids:
PythonOperator(
task_id=f'process_customer_stage_2_{customer_id}',
python_callable=process_customer_stage_2,
op_kwargs={'customer_id': customer_id},
dag=dag,
)
return customer_tasks_stage_2
5. 고객별 작업을 동적으로 생성하는 작업 정의
고객별 작업을 동적으로 생성하는 작업을 정의합니다. 이 작업은 fetch_customers
작업의 결과를 받아서 고객별 작업을 생성합니다.
def create_tasks(**kwargs):
ti = kwargs['ti']
customer_ids = ti.xcom_pull(task_ids='fetch_customers')
# 첫 번째 고객별 작업 생성
customer_tasks_stage_1 = create_customer_tasks_stage_1(customer_ids, dag)
# 두 번째 고객별 작업 생성
customer_tasks_stage_2 = create_customer_tasks_stage_2(customer_ids, dag)
# 첫 번째 작업이 완료된 후 두 번째 작업이 실행되도록 설정
customer_tasks_stage_1 >> customer_tasks_stage_2
return customer_tasks_stage_1, customer_tasks_stage_2
create_tasks_task = PythonOperator(
task_id='create_tasks',
python_callable=create_tasks,
provide_context=True,
dag=dag,
)
6. 작업 의존성 설정
마지막으로 작업 간의 의존성을 설정합니다. fetch_customers
작업이 완료된 후 create_tasks
작업이 실행되도록 설정합니다.
fetch_customers_task >> create_tasks_task
전체 코드
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
import pandas as pd
import sqlalchemy
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
'customer_workflow',
default_args=default_args,
schedule_interval=None,
)
def fetch_customers(**kwargs):
engine = sqlalchemy.create_engine('postgresql://user:password@localhost/dbname')
query = "SELECT customer_id FROM customers WHERE target = TRUE"
df = pd.read_sql(query, engine)
customer_ids = df['customer_id'].tolist()
return customer_ids
fetch_customers_task = PythonOperator(
task_id='fetch_customers',
python_callable=fetch_customers,
provide_context=True,
dag=dag,
)
def process_customer_stage_1(customer_id, **kwargs):
# 첫 번째 고객별 처리 로직을 여기에 작성합니다.
print(f"Processing customer {customer_id} in stage 1")
def create_customer_tasks_stage_1(customer_ids, dag):
with TaskGroup(group_id='customer_tasks_stage_1', dag=dag) as customer_tasks_stage_1:
for customer_id in customer_ids:
PythonOperator(
task_id=f'process_customer_stage_1_{customer_id}',
python_callable=process_customer_stage_1,
op_kwargs={'customer_id': customer_id},
dag=dag,
)
return customer_tasks_stage_1
def process_customer_stage_2(customer_id, **kwargs):
# 두 번째 고객별 처리 로직을 여기에 작성합니다.
print(f"Processing customer {customer_id} in stage 2")
def create_customer_tasks_stage_2(customer_ids, dag):
with TaskGroup(group_id='customer_tasks_stage_2', dag=dag) as customer_tasks_stage_2:
for customer_id in customer_ids:
PythonOperator(
task_id=f'process_customer_stage_2_{customer_id}',
python_callable=process_customer_stage_2,
op_kwargs={'customer_id': customer_id},
dag=dag,
)
return customer_tasks_stage_2
def create_tasks(**kwargs):
ti = kwargs['ti']
customer_ids = ti.xcom_pull(task_ids='fetch_customers')
# 첫 번째 고객별 작업 생성
customer_tasks_stage_1 = create_customer_tasks_stage_1(customer_ids, dag)
# 두 번째 고객별 작업 생성
customer_tasks_stage_2 = create_customer_tasks_stage_2(customer_ids, dag)
# 첫 번째 작업이 완료된 후 두 번째 작업이 실행되도록 설정
customer_tasks_stage_1 >> customer_tasks_stage_2
return customer_tasks_stage_1, customer_tasks_stage_2
create_tasks_task = PythonOperator(
task_id='create_tasks',
python_callable=create_tasks,
provide_context=True,
dag=dag,
)
fetch_customers_task >> create_tasks_task
첫 번째 고객별 작업을 처리한 후, 다시 고객별 작업을 병렬로 실행하는 Airflow DAG를 구성할 수 있습니다.
728x90
반응형
'IT Best Practise > Airflow' 카테고리의 다른 글
실무: Apache Airflow 데이터 전달 - 3 (0) | 2024.11.15 |
---|---|
실무: Apache Airflow 병렬 태스크(Sqlite 기반 데이터 전달) - 2 (0) | 2024.11.15 |
Apache Airflow 핵심 메카니즘 (0) | 2024.11.13 |
10. Airflow - 실습 예제 (0) | 2024.11.12 |
09. 보안 (0) | 2024.11.12 |