728x90
반응형

실무: Apache Airflow 병렬 태스크 - 1

Apache Airflow에서 첫 번째 고객별 작업 처리한 후, 다시 고객별 작업을 추가하는 방법을 단계별로 설명하겠습니다

1. 기본 설정 및 라이브러리 임포트

먼저 필요한 라이브러리를 임포트하고 기본 설정을 정의합니다.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
import pandas as pd
import sqlalchemy

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'customer_workflow',
    default_args=default_args,
    schedule_interval=None,
)

2. 대상 고객 추출 작업 정의

첫 번째 작업은 데이터베이스에서 대상 고객을 추출하는 작업입니다. 이 작업은 고객 ID 리스트를 반환합니다.

def fetch_customers(**kwargs):
    engine = sqlalchemy.create_engine('postgresql://user:password@localhost/dbname')
    query = "SELECT customer_id FROM customers WHERE target = TRUE"
    df = pd.read_sql(query, engine)
    customer_ids = df['customer_id'].tolist()
    return customer_ids

fetch_customers_task = PythonOperator(
    task_id='fetch_customers',
    python_callable=fetch_customers,
    provide_context=True,
    dag=dag,
)

3. 첫 번째 고객별 병렬 작업 정의

고객별로 병렬로 실행될 첫 번째 작업을 정의합니다. 이 작업은 각 고객 ID를 받아서 처리합니다.

def process_customer_stage_1(customer_id, **kwargs):
    # 첫 번째 고객별 처리 로직을 여기에 작성합니다.
    print(f"Processing customer {customer_id} in stage 1")

def create_customer_tasks_stage_1(customer_ids, dag):
    with TaskGroup(group_id='customer_tasks_stage_1', dag=dag) as customer_tasks_stage_1:
        for customer_id in customer_ids:
            PythonOperator(
                task_id=f'process_customer_stage_1_{customer_id}',
                python_callable=process_customer_stage_1,
                op_kwargs={'customer_id': customer_id},
                dag=dag,
            )
    return customer_tasks_stage_1

4. 두 번째 고객별 병렬 작업 정의

고객별로 병렬로 실행될 두 번째 작업을 정의합니다. 이 작업은 각 고객 ID를 받아서 처리합니다.

def process_customer_stage_2(customer_id, **kwargs):
    # 두 번째 고객별 처리 로직을 여기에 작성합니다.
    print(f"Processing customer {customer_id} in stage 2")

def create_customer_tasks_stage_2(customer_ids, dag):
    with TaskGroup(group_id='customer_tasks_stage_2', dag=dag) as customer_tasks_stage_2:
        for customer_id in customer_ids:
            PythonOperator(
                task_id=f'process_customer_stage_2_{customer_id}',
                python_callable=process_customer_stage_2,
                op_kwargs={'customer_id': customer_id},
                dag=dag,
            )
    return customer_tasks_stage_2

5. 고객별 작업을 동적으로 생성하는 작업 정의

고객별 작업을 동적으로 생성하는 작업을 정의합니다. 이 작업은 fetch_customers 작업의 결과를 받아서 고객별 작업을 생성합니다.

def create_tasks(**kwargs):
    ti = kwargs['ti']
    customer_ids = ti.xcom_pull(task_ids='fetch_customers')

    # 첫 번째 고객별 작업 생성
    customer_tasks_stage_1 = create_customer_tasks_stage_1(customer_ids, dag)

    # 두 번째 고객별 작업 생성
    customer_tasks_stage_2 = create_customer_tasks_stage_2(customer_ids, dag)

    # 첫 번째 작업이 완료된 후 두 번째 작업이 실행되도록 설정
    customer_tasks_stage_1 >> customer_tasks_stage_2

    return customer_tasks_stage_1, customer_tasks_stage_2

create_tasks_task = PythonOperator(
    task_id='create_tasks',
    python_callable=create_tasks,
    provide_context=True,
    dag=dag,
)

6. 작업 의존성 설정

마지막으로 작업 간의 의존성을 설정합니다. fetch_customers 작업이 완료된 후 create_tasks 작업이 실행되도록 설정합니다.

fetch_customers_task >> create_tasks_task

전체 코드

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
import pandas as pd
import sqlalchemy

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'customer_workflow',
    default_args=default_args,
    schedule_interval=None,
)

def fetch_customers(**kwargs):
    engine = sqlalchemy.create_engine('postgresql://user:password@localhost/dbname')
    query = "SELECT customer_id FROM customers WHERE target = TRUE"
    df = pd.read_sql(query, engine)
    customer_ids = df['customer_id'].tolist()
    return customer_ids

fetch_customers_task = PythonOperator(
    task_id='fetch_customers',
    python_callable=fetch_customers,
    provide_context=True,
    dag=dag,
)

def process_customer_stage_1(customer_id, **kwargs):
    # 첫 번째 고객별 처리 로직을 여기에 작성합니다.
    print(f"Processing customer {customer_id} in stage 1")

def create_customer_tasks_stage_1(customer_ids, dag):
    with TaskGroup(group_id='customer_tasks_stage_1', dag=dag) as customer_tasks_stage_1:
        for customer_id in customer_ids:
            PythonOperator(
                task_id=f'process_customer_stage_1_{customer_id}',
                python_callable=process_customer_stage_1,
                op_kwargs={'customer_id': customer_id},
                dag=dag,
            )
    return customer_tasks_stage_1

def process_customer_stage_2(customer_id, **kwargs):
    # 두 번째 고객별 처리 로직을 여기에 작성합니다.
    print(f"Processing customer {customer_id} in stage 2")

def create_customer_tasks_stage_2(customer_ids, dag):
    with TaskGroup(group_id='customer_tasks_stage_2', dag=dag) as customer_tasks_stage_2:
        for customer_id in customer_ids:
            PythonOperator(
                task_id=f'process_customer_stage_2_{customer_id}',
                python_callable=process_customer_stage_2,
                op_kwargs={'customer_id': customer_id},
                dag=dag,
            )
    return customer_tasks_stage_2

def create_tasks(**kwargs):
    ti = kwargs['ti']
    customer_ids = ti.xcom_pull(task_ids='fetch_customers')

    # 첫 번째 고객별 작업 생성
    customer_tasks_stage_1 = create_customer_tasks_stage_1(customer_ids, dag)

    # 두 번째 고객별 작업 생성
    customer_tasks_stage_2 = create_customer_tasks_stage_2(customer_ids, dag)

    # 첫 번째 작업이 완료된 후 두 번째 작업이 실행되도록 설정
    customer_tasks_stage_1 >> customer_tasks_stage_2

    return customer_tasks_stage_1, customer_tasks_stage_2

create_tasks_task = PythonOperator(
    task_id='create_tasks',
    python_callable=create_tasks,
    provide_context=True,
    dag=dag,
)

fetch_customers_task >> create_tasks_task

첫 번째 고객별 작업을 처리한 후, 다시 고객별 작업을 병렬로 실행하는 Airflow DAG를 구성할 수 있습니다.

728x90
반응형

+ Recent posts