Airflow의 브랜치 오퍼레이터(Branch Operator)는 특정 조건에 따라 DAG의 실행 경로를 동적으로 선택하게 해주는 도구입니다. 코드의 if-else 문처럼, 워크플로우 내에서 분기점을 만들어 조건에 맞는 Task는 실행하고, 맞지 않는 Task는 건너뛰게(Skip) 합니다. Langgraph의 Conditional Edge 느낌이라고 해야할까요?
1. Branch 를 사용하는 이유
브랜치를 사용하면 고정된 워크플로우가 아닌, 상황에 따라 유연하게 대처하는 동적인 파이프라인을 만들 수 있습니다.
- 요일별 다른 작업 수행: 평일에는 일일 보고서를 만들고, 주말에는 주간 보고서를 만드는 작업을 하나의 DAG에서 처리할 때
- A/B 테스트: 랜덤하게 두 개의 다른 데이터 처리 방식 중 하나를 선택하여 실행하고 싶을 때
- 데이터 품질 검사: 데이터 검증 Task의 결과가 '성공'이면 다음 처리 단계를 실행하고, '실패'이면 데이터 정제 Task를 실행하도록 분기할 때
- 작업 성공/실패 처리: 특정 Task의 성공 여부에 따라 성공 알림 Task 또는 실패 알림 Task를 선택적으로 실행할 때
2. 사용 방법
1) Branch 함수 작성: 어떤 경로로 갈지 결정하는 Python 함수를 만듭니다. 이 함수는 반드시 다음에 실행할 Task의 ID (task_id)를 문자열로 반환해야 합니다(이것조차도 langgraph랑 비슷하군요..)
2) 의존성 설정: 브랜치 오퍼레이터 뒤에 모든 가능한 분기 Task들을 연결합니다.
분기된 경로를 마지막에 다시 하나의 Task로 합치고 싶을 때가 많습니다. 이때 중요한 것이 trigger_rule 입니다. 기본 trigger_rule은 all_success이므로, 한쪽 경로는 skipped 상태가 되어 모든 부모 Task가 성공한 것이 아니게 됩니다. 따라서 합류 지점의 Task는 실행되지 않습니다. 이 문제를 해결하기 위해 합류 Task의 trigger_rule을 none_failed 또는 all_done 으로 설정해야 합니다.
- all_success: "조원 A, B, C 모두가 자기 파트를 끝내야만 과제를 제출한다." (기본 규칙)
- one_failed: "조원 중 한 명이라도 못하겠다고 하면, 그냥 지금까지 한 것만이라도 제출해버린다."
- all_done: "조원들이 숙제를 다 했든, 못 했든, 그냥 마감 시간 되면 무조건 제출한다."
3. 예시
오늘이 수요일이면 run_on_wednesday Task를, 다른 요일이면 run_on_other_days Task를 실행한 뒤, 마지막에 end Task로 합류하는 예제입니다.
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
# 1. 브랜치 함수: 실행할 task_id를 반환
def _branch_on_day_of_week(**kwargs):
execution_date = kwargs["data_interval_start"]
day_of_week = execution_date.weekday() # 월요일=0, 일요일=6
if day_of_week == 2: # 수요일(Wednesday)
return "run_on_wednesday"
else:
return "run_on_other_days"
with DAG(
dag_id="example_branch_operator_dag",
start_date=pendulum.datetime(2025, 9, 1, tz="Asia/Seoul"),
catchup=False,
schedule="@daily",
) as dag:
start = BashOperator(task_id="start", bash_command='echo "start"')
# 2. BranchPythonOperator 정의
branching = BranchPythonOperator(
task_id="branching",
python_callable=_branch_on_day_of_week,
)
# 3. 분기될 Task들 정의
run_on_wednesday = BashOperator(
task_id="run_on_wednesday",
bash_command='echo "Today is Wednesday!"',
)
run_on_other_days = BashOperator(
task_id="run_on_other_days",
bash_command='echo "Today is not Wednesday."',
)
# 4. 합류 Task 정의 (trigger_rule 설정이 핵심!)
end = BashOperator(
task_id="end",
bash_command='echo "end"',
trigger_rule="none_failed", # 중요!
)
# 5. 의존성 설정
start >> branching >> [run_on_wednesday, run_on_other_days]
[run_on_wednesday, run_on_other_days] >> end
'프로그래밍 > DevOps & MLops 관련정보' 카테고리의 다른 글
| [Airflow] Airflow CLI 기본 명령어 (0) | 2025.10.19 |
|---|---|
| [Google Cloud] Virtual Machine 만들고 사용해보기 (0) | 2025.10.19 |
| [Airflow] XCom (0) | 2025.09.06 |
| [Airflow] Task Group (0) | 2025.09.06 |
| [Airflow] Flower로 모니터링 하기 (0) | 2025.09.06 |