본문 바로가기
프로그래밍/DevOps & MLops 관련정보

[Airflow] Scheduling

by 물박사의 저장공간 2025. 9. 6.

Airflow의 스케줄링은 "언제, 그리고 얼마나 자주" DAG를 실행할지를 정의하는 핵심 기능입니다. 하지만 처음 접할 때 가장 헷갈리는 부분이기도 합니다. 핵심은 "처리할 데이터 구간이 완전히 종료된 시점에 해당 구간에 대한 DAG 실행이 시작된다"는 점을 이해하는 것입니다.

 

DAG을 정의할 때 사용하는 스케줄링 관련 주요 파라미터는 다음과 같습니다.

1. schedule (또는 schedule_interval)

DAG의 실행 주기를 정의합니다.

  • Cron 프리셋: '@daily', '@hourly', '@weekly' 등 직관적인 문자열을 사용합니다. @daily는 매일 밤 00:00를 의미합니다.
  • Cron 표현식: 0 9 * * * 와 같이 유닉스 Cron과 동일한 형식으로 더 세밀하게 지정할 수 있습니다. (예: 매일 오전 9시)
  • timedelta: timedelta(minutes=30) 와 같이 시간 간격으로 지정할 수도 있습니다.

2. start_date

가장 중요한 파라미터 중 하나입니다. 이는 DAG가 처음 실행될 날짜가 아니라, 첫 번째 데이터 구간이 시작될 수 있는 날짜를 의미합니다. 이 날짜를 기준으로 모든 미래의 스케줄이 계산됩니다.

예를 들어 @daily 스케줄에 start_date를 2025-09-01로 설정하면,

  • 첫 번째 데이터 구간은 2025-09-01 00:00 ~ 2025-09-02 00:00 입니다.
  • 이 구간이 끝나야 실행되므로, 첫 번째 DAG 실행은 2025-09-02 00:00에 트리거됩니다.
  • 이 실행의 execution_date는 2025-09-01이 됩니다.

3. catchup

과거에 실행되지 못한 스케줄을 소급해서 실행할지 여부를 결정합니다.

  • catchup = True (기본값): 만약 오늘이 9월 5일인데, start_date가 9월 1일인 DAG를 처음 실행하면, Airflow는 9월 1일, 2일, 3일, 4일에 대한 DAG 실행을 순차적으로 모두 실행하려고 시도합니다. 이를 "백필(Backfill)"이라고 합니다.
  • catchup = False: 과거의 놓친 스케줄은 모두 무시하고, start_date 이후 가장 가까운 미래의 스케줄부터 실행합니다. 의도한 백필 작업이 아니라면, 불필요한 리소스 낭비를 막기 위해 보통 False로 설정하는 것을 권장합니다.

참고) Backfill

Backfill은 catchup 설정과 관계없이, 사용자가 CLI(명령어 라인)를 통해 특정 과거 기간의 DAG를 강제로 실행하도록 하는 수동 명령어입니다.

주로 다음과 같은 목적을 위해 사용됩니다.

  • 데이터 재처리: 기존에 처리했던 데이터에 버그가 있었거나, 로직을 변경하여 과거 데이터 전체를 새로운 로직으로 다시 처리하고 싶을 때
  • 데이터 마이그레이션: 특정 기간의 데이터를 다른 시스템으로 옮기는 일회성 작업을 수행할 때
  • 누락된 데이터 채우기: catchup=False로 운영하다가 특정 과거 기간의 데이터만 선별적으로 처리하고 싶을 때
airflow dags backfill \
    --start-date YYYY-MM-DD \
    --end-date YYYY-MM-DD \
    <dag_id>

 

 


Data Aware Scheduling

데이터 인식 스케줄링(Data-Aware Scheduling)은 Airflow 2.4 버전부터 도입된 현대적인 스케줄링 방식으로, 정해진 시간(Time-based)이 아니라 특정 데이터의 상태 변화(Event-based)에 따라 DAG 실행을 트리거하는 기능입니다. 즉, "매일 새벽 2시에 실행해"가 아니라, "A 테이블에 새로운 데이터가 들어오면 실행해" 와 같이 데이터의 흐름에 따라 파이프라인이 유기적으로 동작하게 만드는 것입니다.

 

데이터 인식 스케줄링은 데이터셋(Dataset) 이라는 객체를 통해 구현됩니다. 데이터셋(Dataset): 특정 데이터의 위치를 나타내는 가상의 포인터입니다. 보통 URI(Uniform Resource Identifier) 형식의 문자열로 정의합니다.

Notice: Airflow는 이 URI의 실제 데이터를 읽거나 확인하지 않습니다. 단지 이 문자열을 고유한 식별자로 사용하여 생산자와 소비자 DAG를 연결하는 "신호등" 역할만 합니다.

데이터 생산자 (Producer DAG)

데이터를 생성하거나 업데이트하는 DAG입니다. Task 레벨에서 outlets 파라미터에 데이터셋 객체를 지정합니다. 해당 Task가 성공적으로 완료되면, Airflow는 이 데이터셋이 "업데이트되었다"고 기록합니다.

데이터 소비자 (Consumer DAG)

특정 데이터셋이 업데이트되기를 기다리는 DAG입니다. DAG 레벨에서 schedule 파라미터에 Cron 표현식 대신 데이터셋 객체를 지정합니다.

 

예시)

# common/datasets.py
from airflow.datasets import Dataset

# 데이터셋 객체 생성. URI는 단순한 식별자.
CUSTOMER_TABLE_DATASET = Dataset("postgres://prod_db/customers")

 

# dags/dag_a_updates_customers.py
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import pendulum
from common.datasets import CUSTOMER_TABLE_DATASET

with DAG(
    dag_id='dag_a_updates_customers',
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule='@daily',  # 이 DAG 자체는 매일 실행됨
    catchup=False,
) as dag:
    update_customer_table = BashOperator(
        task_id='update_customer_table',
        bash_command='echo "New customer data has been inserted."',
        # 이 Task가 성공하면 CUSTOMER_TABLE_DATASET이 업데이트되었다고 알림
        outlets=[CUSTOMER_TABLE_DATASET],
    )

 

# dags/dag_b_summarizes_customers.py
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import pendulum
from common.datasets import CUSTOMER_TABLE_DATASET

with DAG(
    dag_id='dag_b_summarizes_customers',
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    # schedule에 데이터셋 객체를 지정!
    schedule=[CUSTOMER_TABLE_DATASET],
    catchup=False,
) as dag:
    summarize_data = BashOperator(
        task_id='summarize_data',
        bash_command='echo "Summarizing customer data..."',
    )

 

'프로그래밍 > DevOps & MLops 관련정보' 카테고리의 다른 글

[Airflow] Task Group  (0) 2025.09.06
[Airflow] Flower로 모니터링 하기  (0) 2025.09.06
[Airflow] Hook  (0) 2025.09.05
[Airflow] Sensor  (0) 2025.09.05
[Airflow] Provider  (0) 2025.09.03