본문 바로가기
카테고리 없음

[Airflow] Queue

by 물박사의 저장공간 2025. 9. 6.

역시 오늘도 gemini의 도움을 받아 Airflow에 대해 간단히 정리해보겠습니다. Airflow 큐(Queue)는 Celery와 같은 분산 Executor를 사용할 때, Task들을 성격에 맞게 분류하고 특정 Worker 그룹에 할당하기 위한 가상의 대기열입니다. 스케줄러가 실행을 지시한 Task들을 바로 Worker에게 보내는 것이 아니라, 먼저 정해진 큐에 넣고, 해당 큐를 구독(listening)하도록 설정된 Worker가 그 Task를 가져가 처리하도록 하는 라우팅(routing) 시스템입니다.

 

Airflow의 큐 시스템은 대형 마트의 계산대와 똑같습니다.

  • Task (작업): 계산을 기다리는 손님들
  • Worker (작업자): 계산원
  • Queue (큐): 계산대 줄

마트에는 여러 종류의 계산대가 있습니다.

  • 일반 계산대 (Default Queue): 모든 손님이 이용할 수 있는 기본 줄입니다.
  • 소량 계산대 (Lightweight Queue): 10개 이하의 물건을 가진 손님만 이용할 수 있는 빠른 줄입니다.
  • 셀프 계산대 (GPU/High-Memory Queue): 특별한 장비(셀프 계산 기계)가 필요한 손님을 위한 줄입니다.

손님(Task)은 자신의 상황에 맞는 계산대(Queue)에 줄을 서고, 계산원(Worker)은 자신이 맡은 계산대(Queue)의 손님만 처리합니다. 이렇게 함으로써 마트 전체의 계산 흐름이 훨씬 효율적으로 관리됩니다.

 

1. Queue를 사용하는 이유

 

리소스 분리 (Resource Segregation)

  • 가벼운 작업(Lightweight): 간단한 SQL 쿼리, Bash 명령어 등 금방 끝나는 작업
  • 무거운 작업(Heavyweight): 머신러닝 모델 학습, 대용량 데이터 처리(Spark) 등 많은 CPU/메모리를 사용하는 작업
  • 만약 모든 작업을 하나의 큐에 넣으면, 무거운 작업 몇 개가 모든 Worker를 독차지해서 가벼운 작업들이 하염없이 기다리는 "자원 기아(Starvation)" 현상이 발생할 수 있습니다. 큐를 분리하면 이런 병목 현상을 막을 수 있습니다.

우선순위 제어 (Prioritization)

  • 중요도가 높은 DAG의 Task는 더 많은 Worker가 할당된 우선순위 큐(high-priority queue)로 보내 빠르게 처리하고, 중요도가 낮은 Task는 적은 Worker가 할당된 큐로 보내 천천히 처리할 수 있습니다.

특수 작업자 할당 (Specialized Workers)

  • GPU가 필요한 머신러닝 Task는 GPU가 장착된 Worker에만 할당된 큐(gpu_queue)로 보내야 합니다. 이렇게 하면 특정 하드웨어나 라이브러리가 필요한 작업을 정확한 환경에서 실행할 수 있습니다.

 

2. Queue 사용방법

Queue 시스템은 Task를 특정 큐에 할당하고, Worker가 특정 큐를 바라보도록 설정하는 두 가지 과정으로 이루어집니다. 

1) DAG에서 Task에 큐 할당하기

오퍼레이터를 정의할 때 queue 파라미터를 사용하여 보낼 큐의 이름을 지정합니다. 지정하지 않으면 default 큐로 보내집니다.

from airflow.operators.bash import BashOperator

# 이 Task는 'lightweight_queue'로 보내집니다.
run_simple_query = BashOperator(
    task_id='run_simple_query',
    bash_command='echo "Running a quick query..."',
    queue='lightweight_queue',  # 큐 지정
    dag=dag,
)

# 이 Task는 'heavy_spark_queue'로 보내집니다.
run_spark_job = BashOperator(
    task_id='run_spark_job',
    bash_command='spark-submit ...',
    queue='heavy_spark_queue',  # 큐 지정
    dag=dag,
)

# queue 파라미터가 없으므로 'default' 큐로 보내집니다.
run_default_task = BashOperator(
    task_id='run_default_task',
    bash_command='echo "I am a default task."',
    dag=dag,
)

 

 

2) 특정 큐를 처리할 Worker 실행하기

터미널에서 Celery Worker를 시작할 때 -q 옵션으로 처리할 큐의 이름을 지정합니다.

# 이 Worker는 'lightweight_queue'에 들어온 Task만 처리합니다.
# 보통 CPU/메모리가 낮은 일반 사양의 서버에서 실행합니다.
airflow celery worker -q lightweight_queue

# 이 Worker는 'heavy_spark_queue'에 들어온 Task만 처리합니다.
# 보통 고사양 서버에서 별도로 실행합니다.
airflow celery worker -q heavy_spark_queue

# 'default' 큐를 처리하는 Worker
airflow celery worker -q default