본문 바로가기
프로그래밍/DevOps & MLops 관련정보

[Airflow] Branch Operator/Trigger Rule

by 물박사의 저장공간 2025. 9. 8.

Airflow의 브랜치 오퍼레이터(Branch Operator)는 특정 조건에 따라 DAG의 실행 경로를 동적으로 선택하게 해주는 도구입니다. 코드의 if-else 문처럼, 워크플로우 내에서 분기점을 만들어 조건에 맞는 Task는 실행하고, 맞지 않는 Task는 건너뛰게(Skip) 합니다. Langgraph의 Conditional Edge 느낌이라고 해야할까요?

 

1. Branch 를 사용하는 이유

브랜치를 사용하면 고정된 워크플로우가 아닌, 상황에 따라 유연하게 대처하는 동적인 파이프라인을 만들 수 있습니다.

  • 요일별 다른 작업 수행: 평일에는 일일 보고서를 만들고, 주말에는 주간 보고서를 만드는 작업을 하나의 DAG에서 처리할 때
  • A/B 테스트: 랜덤하게 두 개의 다른 데이터 처리 방식 중 하나를 선택하여 실행하고 싶을 때
  • 데이터 품질 검사: 데이터 검증 Task의 결과가 '성공'이면 다음 처리 단계를 실행하고, '실패'이면 데이터 정제 Task를 실행하도록 분기할 때
  • 작업 성공/실패 처리: 특정 Task의 성공 여부에 따라 성공 알림 Task 또는 실패 알림 Task를 선택적으로 실행할 때

 

2. 사용 방법

1) Branch 함수 작성: 어떤 경로로 갈지 결정하는 Python 함수를 만듭니다. 이 함수는 반드시 다음에 실행할 Task의 ID (task_id)를 문자열로 반환해야 합니다(이것조차도 langgraph랑 비슷하군요..)

2) 의존성 설정: 브랜치 오퍼레이터 뒤에 모든 가능한 분기 Task들을 연결합니다.

 

분기된 경로를 마지막에 다시 하나의 Task로 합치고 싶을 때가 많습니다. 이때 중요한 것이 trigger_rule 입니다. 기본 trigger_rule은 all_success이므로, 한쪽 경로는 skipped 상태가 되어 모든 부모 Task가 성공한 것이 아니게 됩니다. 따라서 합류 지점의 Task는 실행되지 않습니다. 이 문제를 해결하기 위해 합류 Task의 trigger_rule을 none_failed 또는 all_done 으로 설정해야 합니다.

  • all_success: "조원 A, B, C 모두가 자기 파트를 끝내야만 과제를 제출한다." (기본 규칙)
  • one_failed: "조원 중 한 명이라도 못하겠다고 하면, 그냥 지금까지 한 것만이라도 제출해버린다."
  • all_done: "조원들이 숙제를 다 했든, 못 했든, 그냥 마감 시간 되면 무조건 제출한다."

 

3. 예시

오늘이 수요일이면 run_on_wednesday Task를, 다른 요일이면 run_on_other_days Task를 실행한 뒤, 마지막에 end Task로 합류하는 예제입니다.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator

# 1. 브랜치 함수: 실행할 task_id를 반환
def _branch_on_day_of_week(**kwargs):
    execution_date = kwargs["data_interval_start"]
    day_of_week = execution_date.weekday()  # 월요일=0, 일요일=6
    
    if day_of_week == 2:  # 수요일(Wednesday)
        return "run_on_wednesday"
    else:
        return "run_on_other_days"

with DAG(
    dag_id="example_branch_operator_dag",
    start_date=pendulum.datetime(2025, 9, 1, tz="Asia/Seoul"),
    catchup=False,
    schedule="@daily",
) as dag:
    start = BashOperator(task_id="start", bash_command='echo "start"')

    # 2. BranchPythonOperator 정의
    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=_branch_on_day_of_week,
    )

    # 3. 분기될 Task들 정의
    run_on_wednesday = BashOperator(
        task_id="run_on_wednesday",
        bash_command='echo "Today is Wednesday!"',
    )
    
    run_on_other_days = BashOperator(
        task_id="run_on_other_days",
        bash_command='echo "Today is not Wednesday."',
    )

    # 4. 합류 Task 정의 (trigger_rule 설정이 핵심!)
    end = BashOperator(
        task_id="end",
        bash_command='echo "end"',
        trigger_rule="none_failed",  # 중요!
    )
    
    # 5. 의존성 설정
    start >> branching >> [run_on_wednesday, run_on_other_days]
    [run_on_wednesday, run_on_other_days] >> end