IT Best Practise/Airflow

Apache Airflow - Custom SQLAlchemy Operator

GilliLab IT 2024. 11. 15. 12:32
728x90
반응형

Apache Airflow - Custom SQLAlchemy Operator

Apache Airflow에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행하는 방법을 알아보겠습니다. 이 글에서는 SQLAlchemy Operator를 정의하고 사용하는 방법을 설명합니다.

Apache Airflow Connections 가져오기

먼저, my_prefix_로 시작하는 Apache Airflow Connections를 가져오는 예제를 살펴보겠습니다.

from airflow import settings
from airflow.models import Connection

session = settings.Session()
try:
    conns = (
        session.query(Connection.conn_id)
        .filter(Connection.conn_id.ilike('my_prefix_%'))
        .all()
    )
    conn_ids = [conn.conn_id for conn in conns]
finally:
    session.commit()

SQLAlchemy 세션 가져오기

PostgresHook을 사용하여 SQLAlchemy 세션을 가져오는 예제입니다.

from airflow.hooks.postgres_hook import PostgresHook
from sqlalchemy.orm import sessionmaker

hook = PostgresHook(postgres_conn_id='my_conn_id')
engine = hook.get_sqlalchemy_engine()
session = sessionmaker(bind=engine)()

SQLAlchemy Operator 정의

Postgres를 위한 SQLAlchemy Operator를 정의하는 예제입니다.

from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook

def get_session(conn_id: str) -> Session:
    hook = PostgresHook(postgres_conn_id=conn_id)
    engine = hook.get_sqlalchemy_engine()
    return sessionmaker(bind=engine)()

class SQLAlchemyOperator(PythonOperator):
    """
    PythonOperator with SQLAlchemy session management - creates session for the Python callable
    and commit/rollback it afterwards.

    Set `conn_id` with your DB connection.

    Pass `session` parameter to the python callable.
    """
    @apply_defaults
    def __init__(self, conn_id: str, *args, **kwargs):
        self.conn_id = conn_id
        super().__init__(*args, **kwargs)

    def execute_callable(self):
        session = get_session(self.conn_id)
        try:
            result = self.python_callable(*self.op_args, session=session, **self.op_kwargs)
        except Exception:
            session.rollback()
            raise
        session.commit()
        return result

사용 방법

정의한 SQLAlchemy Operator를 사용하는 예제입니다.

from airflow import DAG
from datetime import datetime
from sqlalchemy.orm import Session

dag = DAG(
    dag_id='SQAlchemyDAG',
    schedule_interval='0 2 1 * *',  # 매월 1일 오전 2시에 실행
    start_date=datetime(2020, 8, 1),
)

def sqlalchemy_task(session: Session, **kwargs):
    session.query(YourSQLAlchemyModel)

request_count = SQLAlchemyOperator(
    dag=dag,
    task_id='sqlalchemy',
    conn_id='my_db',
    python_callable=sqlalchemy_task,
    provide_context=True,
)

이 글을 통해 Apache Airflow에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행하는 방법을 배웠습니다. SQLAlchemy Operator를 정의하고 사용하는 방법을 이해하고, 이를 통해 데이터 파이프라인을 효율적으로 관리할 수 있습니다.

728x90
반응형