Airflow 센서(Sensor)는 어떤 특정한 조건이 충족될 때까지 DAG의 실행을 잠시 멈추고 기다리는 역할을 하는 특별한 종류의 오퍼레이터(Operator)입니다. 마치 문 앞에 서서 특정 손님이 오거나, 특정 시간이 될 때까지 계속 문을 확인하며 기다리는 문지기와 같다고 생각할 수 있습니다. 조건이 충족되면 비로소 문을 열어주고 다음 단계(Downstream Task)가 진행되도록 합니다.
센서는 다른 시스템과의 의존성이 있을 때 매우 유용합니다. 즉, Airflow 외부의 어떤 작업이 끝나야 내 DAG 작업을 시작할 수 있을 때 사용됩니다.
- 파일 도착 대기: 데이터 파이프라인에서 FTP 서버나 AWS S3에 특정 파일이 업로드될 때까지 기다렸다가 처리를 시작해야 할 때
- API 응답 대기: 외부 API를 호출했을 때, 그 결과값이 특정 상태(예: 'Completed')가 될 때까지 기다려야 할 때
- 데이터베이스 상태 확인: 다른 팀의 배치 작업이 끝나고 데이터베이스의 특정 테이블에 'READY'라는 플래그가 기록될 때까지 기다려야 할 때
- 특정 시간 대기: 매일 오전 9시 30분과 같이 정확한 시간에 작업을 시작해야 할 때
1. Sensor의 2가지 모드
센서는 설정된 두 가지 방식으로 "기다리는" 행동을 수행합니다. 이는 Airflow 클러스터의 리소스를 어떻게 사용하느냐에 따라 중요한 차이를 만듭니다.
1) poke 모드 (Default)
센서 Task가 실행되면 Airflow Worker 슬롯 하나를 계속 차지하고 있습니다.
poke_interval (초 단위) 간격으로 주기적으로 조건을 계속 확인합니다.
조건이 충족될 때까지 Worker는 다른 일을 하지 못하고 이 센서에만 매달려 있습니다.
조건 확인 간격이 짧고 즉각적인 반응이 필요할 때 좋습니다.
기다리는 시간이 길어지면 Worker 리소스를 낭비하게 됩니다. 장시간 실행되는 센서가 많아지면 모든 Worker가 센서를 기다리느라 다른 Task를 실행하지 못하는 교착 상태(Deadlock)에 빠질 수 있습니다.
2) reschedule 모드
센서 Task가 실행되어 조건을 한번 확인합니다.
조건이 충족되지 않으면, 자신의 상태를 'up_for_reschedule'로 바꾸고 Worker 슬롯을 즉시 반납(Free)합니다.
poke_interval 시간이 지난 후, 스케줄러에 의해 다시 스케줄링되어 새로운 Worker 슬롯을 할당받아 조건을 다시 확인합니다.
기다리는 동안 Worker 리소스를 낭비하지 않으므로, 장시간 대기하는 센서에 매우 효율적입니다.
스케줄러의 부하가 약간 증가할 수 있고, 조건 확인 주기가 poke 모드보다 덜 즉각적일 수 있습니다.
정리하면, 몇 분 이내의 짧은 대기는 poke 모드를, 수십 분에서 몇 시간 이상 길게 기다려야 하는 작업은 반드시 reschedule 모드를 사용하는 것이 좋습니다.
2. Sensor의 주요 parameter
- mode: poke 또는 reschedule을 지정합니다.
- poke_interval: 조건을 다시 확인할 때까지의 대기 시간(초).
- timeout: 센서가 무한정 기다리는 것을 방지하기 위한 최대 대기 시간(초). 이 시간이 지나도 조건이 충족되지 않으면 센서는 실패(Fail)합니다.
- soft_fail: True로 설정하면, timeout이 발생했을 때 Task를 실패(Failed) 상태 대신 건너뛰기(Skipped) 상태로 만듭니다.
3. 예시
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from airflow.models.dag import DAG
import pendulum
with DAG(
dag_id='example_sensor_dag',
start_date=pendulum.datetime(2025, 9, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
) as dag:
# '/opt/airflow/data/source.csv' 파일이 생길 때까지 최대 10분 동안 1분 간격으로 확인
# 대기하는 동안 Worker 리소스를 반납하는 reschedule 모드 사용
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/opt/airflow/data/source.csv',
mode='reschedule', # 긴 대기를 위해 reschedule 모드 사용
timeout=60 * 10, # 10분 타임아웃
poke_interval=60, # 1분 간격으로 확인
fs_conn_id='fs_default' # Airflow Connection에 정의된 파일 시스템 경로
)
process_file = DummyOperator(
task_id='process_file'
)
wait_for_file >> process_file
'프로그래밍 > DevOps & MLops 관련정보' 카테고리의 다른 글
| [Airflow] Scheduling (0) | 2025.09.06 |
|---|---|
| [Airflow] Hook (0) | 2025.09.05 |
| [Airflow] Provider (0) | 2025.09.03 |
| [Linux 기초] 디렉토리 구조와 파일 찾기 (0) | 2025.09.02 |
| [Linux 기초] Shell vs. KERNEL (2) | 2025.09.01 |