본문 바로가기
프로그래밍/DevOps & MLops 관련정보

[Airflow] Hook

by 물박사의 저장공간 2025. 9. 5.

Airflow 훅(Hook)은 외부 시스템(데이터베이스, 클라우드 스토리지 등)에 대한 연결 및 상호작용을 표준화하고 추상화한 저수준(low-level) 인터페이스입니다. 간단히 말해, 특정 외부 서비스와 "대화"하는 데 필요한 모든 복잡한 코드(인증, 세션 관리, API 호출 등)를 미리 포장해 놓은 파이썬 클래스입니다.

 

1. 훅(Hook)과 오퍼레이터(Operator)의 관계

이 둘의 관계를 이해하는 것이 매우 중요합니다.

  • 훅 (Hook) - "어떻게 (How)": 특정 시스템과 통신하는 방법을 알고 있는 전문가입니다. DB에 접속하고, 쿼리를 보내고, 파일을 업로드하는 등의 실질적인 기술을 구현합니다.
  • 오퍼레이터 (Operator) - "무엇을 (What)": 어떤 작업을 해야 하는지 정의하는 관리자입니다. "이 SQL 파일을 저 DB에 실행해" 또는 "이 파일을 저 S3 버킷에 올려" 와 같은 지시를 내립니다.

대부분의 오퍼레이터는 내부적으로 훅을 사용합니다. 예를 들어, PostgresOperator는 실행될 때 내부적으로 PostgresHook을 호출하여 데이터베이스 연결을 얻고 SQL을 실행합니다. 오퍼레이터는 훅을 사용하기 편하게 포장해 놓은 완제품이라고 볼 수 있습니다.

 

2. Hook을 직접 접근하여 사용하는 경우

오퍼레이터가 제공하는 기능 이상을 원할 때, 특히 PythonOperator 내에서 훅을 직접 사용하는 경우가 많습니다.

  • 데이터를 가져와 가공해야 할 때: DB에서 쿼리 결과를 가져온 후(SELECT), Airflow 환경에서 Python 코드로 데이터를 변환하고 다음 Task로 전달해야 할 때. (PostgresOperator는 주로 SQL 실행만 하고 결과를 반환하지 않습니다.)
  • 조건부 로직이 필요할 때: DB에서 특정 값을 읽어온 뒤, 그 값에 따라 다른 작업을 수행하도록 분기해야 할 때.
  • 여러 동작을 묶어야 할 때: 하나의 Task 안에서 DB에 접속해 데이터를 읽고, 그 데이터로 파일을 만들어 S3에 업로드하는 등 여러 시스템과 상호작용해야 할 때.

 

3. 예시

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pendulum

# 1. 훅을 사용하여 DB 작업을 수행할 Python 함수 정의
def get_and_log_user_count(**kwargs):
    # postgres_conn_id에 Airflow UI에 등록한 Connection ID를 지정
    hook = PostgresHook(postgres_conn_id='my_postgres_connection')
    
    # hook.get_records()는 쿼리 결과를 리스트의 튜플 형태로 반환
    # 예: [ (120,) ]
    records = hook.get_records("SELECT COUNT(*) FROM public.users;")
    
    if records:
        user_count = records[0][0]
        print(f"Total number of users: {user_count}")
        
        # 조회한 값을 XCom에 저장하여 다른 Task가 사용할 수 있도록 함
        ti = kwargs['ti']
        ti.xcom_push(key='user_count', value=user_count)
    else:
        print("No records found.")


with DAG(
    dag_id='example_hook_dag',
    start_date=pendulum.datetime(2023, 1, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:
    
    # 2. PythonOperator가 위에서 정의한 함수를 실행하도록 설정
    query_user_count = PythonOperator(
        task_id='query_user_count',
        python_callable=get_and_log_user_count,
    )

 

'프로그래밍 > DevOps & MLops 관련정보' 카테고리의 다른 글

[Airflow] Flower로 모니터링 하기  (0) 2025.09.06
[Airflow] Scheduling  (0) 2025.09.06
[Airflow] Sensor  (0) 2025.09.05
[Airflow] Provider  (0) 2025.09.03
[Linux 기초] 디렉토리 구조와 파일 찾기  (0) 2025.09.02