본문 바로가기
프로그래밍/DevOps & MLops 관련정보

[Airflow] Sensor

by 물박사의 저장공간 2025. 9. 5.

Airflow 센서(Sensor)는 어떤 특정한 조건이 충족될 때까지 DAG의 실행을 잠시 멈추고 기다리는 역할을 하는 특별한 종류의 오퍼레이터(Operator)입니다. 마치 문 앞에 서서 특정 손님이 오거나, 특정 시간이 될 때까지 계속 문을 확인하며 기다리는 문지기와 같다고 생각할 수 있습니다. 조건이 충족되면 비로소 문을 열어주고 다음 단계(Downstream Task)가 진행되도록 합니다.

 

센서는 다른 시스템과의 의존성이 있을 때 매우 유용합니다. 즉, Airflow 외부의 어떤 작업이 끝나야 내 DAG 작업을 시작할 수 있을 때 사용됩니다.

  • 파일 도착 대기: 데이터 파이프라인에서 FTP 서버나 AWS S3에 특정 파일이 업로드될 때까지 기다렸다가 처리를 시작해야 할 때
  • API 응답 대기: 외부 API를 호출했을 때, 그 결과값이 특정 상태(예: 'Completed')가 될 때까지 기다려야 할 때
  • 데이터베이스 상태 확인: 다른 팀의 배치 작업이 끝나고 데이터베이스의 특정 테이블에 'READY'라는 플래그가 기록될 때까지 기다려야 할 때
  • 특정 시간 대기: 매일 오전 9시 30분과 같이 정확한 시간에 작업을 시작해야 할 때

 

1. Sensor의 2가지 모드

센서는 설정된 두 가지 방식으로 "기다리는" 행동을 수행합니다. 이는 Airflow 클러스터의 리소스를 어떻게 사용하느냐에 따라 중요한 차이를 만듭니다.

1) poke 모드 (Default)

센서 Task가 실행되면 Airflow Worker 슬롯 하나를 계속 차지하고 있습니다.

poke_interval (초 단위) 간격으로 주기적으로 조건을 계속 확인합니다.

조건이 충족될 때까지 Worker는 다른 일을 하지 못하고 이 센서에만 매달려 있습니다.

조건 확인 간격이 짧고 즉각적인 반응이 필요할 때 좋습니다.

기다리는 시간이 길어지면 Worker 리소스를 낭비하게 됩니다. 장시간 실행되는 센서가 많아지면 모든 Worker가 센서를 기다리느라 다른 Task를 실행하지 못하는 교착 상태(Deadlock)에 빠질 수 있습니다.

2) reschedule 모드

센서 Task가 실행되어 조건을 한번 확인합니다.

조건이 충족되지 않으면, 자신의 상태를 'up_for_reschedule'로 바꾸고 Worker 슬롯을 즉시 반납(Free)합니다.

poke_interval 시간이 지난 후, 스케줄러에 의해 다시 스케줄링되어 새로운 Worker 슬롯을 할당받아 조건을 다시 확인합니다.

기다리는 동안 Worker 리소스를 낭비하지 않으므로, 장시간 대기하는 센서에 매우 효율적입니다.

스케줄러의 부하가 약간 증가할 수 있고, 조건 확인 주기가 poke 모드보다 덜 즉각적일 수 있습니다.

 

정리하면, 몇 분 이내의 짧은 대기는 poke 모드를, 수십 분에서 몇 시간 이상 길게 기다려야 하는 작업은 반드시 reschedule 모드를 사용하는 것이 좋습니다.

 

2. Sensor의 주요 parameter

 

  • mode: poke 또는 reschedule을 지정합니다.
  • poke_interval: 조건을 다시 확인할 때까지의 대기 시간(초).
  • timeout: 센서가 무한정 기다리는 것을 방지하기 위한 최대 대기 시간(초). 이 시간이 지나도 조건이 충족되지 않으면 센서는 실패(Fail)합니다.
  • soft_fail: True로 설정하면, timeout이 발생했을 때 Task를 실패(Failed) 상태 대신 건너뛰기(Skipped) 상태로 만듭니다.

 

3. 예시

from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from airflow.models.dag import DAG
import pendulum

with DAG(
    dag_id='example_sensor_dag',
    start_date=pendulum.datetime(2025, 9, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:
    
    # '/opt/airflow/data/source.csv' 파일이 생길 때까지 최대 10분 동안 1분 간격으로 확인
    # 대기하는 동안 Worker 리소스를 반납하는 reschedule 모드 사용
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/opt/airflow/data/source.csv',
        mode='reschedule',  # 긴 대기를 위해 reschedule 모드 사용
        timeout=60 * 10,     # 10분 타임아웃
        poke_interval=60,    # 1분 간격으로 확인
        fs_conn_id='fs_default' # Airflow Connection에 정의된 파일 시스템 경로
    )

    process_file = DummyOperator(
        task_id='process_file'
    )

    wait_for_file >> process_file

 

 

'프로그래밍 > DevOps & MLops 관련정보' 카테고리의 다른 글

[Airflow] Scheduling  (0) 2025.09.06
[Airflow] Hook  (0) 2025.09.05
[Airflow] Provider  (0) 2025.09.03
[Linux 기초] 디렉토리 구조와 파일 찾기  (0) 2025.09.02
[Linux 기초] Shell vs. KERNEL  (2) 2025.09.01