IT Best Practise/Airflow
Apache Airflow - Custom SQLAlchemy Operator
GilliLab IT
2024. 11. 15. 12:32
728x90
반응형
Apache Airflow - Custom SQLAlchemy Operator
Apache Airflow에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행하는 방법을 알아보겠습니다. 이 글에서는 SQLAlchemy Operator를 정의하고 사용하는 방법을 설명합니다.
Apache Airflow Connections 가져오기
먼저, my_prefix_
로 시작하는 Apache Airflow Connections를 가져오는 예제를 살펴보겠습니다.
from airflow import settings
from airflow.models import Connection
session = settings.Session()
try:
conns = (
session.query(Connection.conn_id)
.filter(Connection.conn_id.ilike('my_prefix_%'))
.all()
)
conn_ids = [conn.conn_id for conn in conns]
finally:
session.commit()
SQLAlchemy 세션 가져오기
PostgresHook을 사용하여 SQLAlchemy 세션을 가져오는 예제입니다.
from airflow.hooks.postgres_hook import PostgresHook
from sqlalchemy.orm import sessionmaker
hook = PostgresHook(postgres_conn_id='my_conn_id')
engine = hook.get_sqlalchemy_engine()
session = sessionmaker(bind=engine)()
SQLAlchemy Operator 정의
Postgres를 위한 SQLAlchemy Operator를 정의하는 예제입니다.
from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook
def get_session(conn_id: str) -> Session:
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()
class SQLAlchemyOperator(PythonOperator):
"""
PythonOperator with SQLAlchemy session management - creates session for the Python callable
and commit/rollback it afterwards.
Set `conn_id` with your DB connection.
Pass `session` parameter to the python callable.
"""
@apply_defaults
def __init__(self, conn_id: str, *args, **kwargs):
self.conn_id = conn_id
super().__init__(*args, **kwargs)
def execute_callable(self):
session = get_session(self.conn_id)
try:
result = self.python_callable(*self.op_args, session=session, **self.op_kwargs)
except Exception:
session.rollback()
raise
session.commit()
return result
사용 방법
정의한 SQLAlchemy Operator를 사용하는 예제입니다.
from airflow import DAG
from datetime import datetime
from sqlalchemy.orm import Session
dag = DAG(
dag_id='SQAlchemyDAG',
schedule_interval='0 2 1 * *', # 매월 1일 오전 2시에 실행
start_date=datetime(2020, 8, 1),
)
def sqlalchemy_task(session: Session, **kwargs):
session.query(YourSQLAlchemyModel)
request_count = SQLAlchemyOperator(
dag=dag,
task_id='sqlalchemy',
conn_id='my_db',
python_callable=sqlalchemy_task,
provide_context=True,
)
이 글을 통해 Apache Airflow에서 SQLAlchemy를 사용하여 데이터베이스 작업을 수행하는 방법을 배웠습니다. SQLAlchemy Operator를 정의하고 사용하는 방법을 이해하고, 이를 통해 데이터 파이프라인을 효율적으로 관리할 수 있습니다.
728x90
반응형