본문 바로가기
프로그래밍/DevOps & MLops 관련정보

[Airflow] XCom

by 물박사의 저장공간 2025. 9. 6.

Airflow의 XCom(Cross-communication의 약자)은 Task 간에 작은 양의 데이터를 주고받기 위한 메시징 시스템입니다. DAG 안에서 한 Task가 실행된 결과를 다른 Task가 이어서 사용해야 할 때, XCom을 통해 그 값을 전달할 수 있습니다.

 

XCom은 이어달리기 경주에서 선수들이 주고받는 바통(Baton) 과 같습니다.

  • Task A (첫 번째 주자): 경주를 시작하고 열심히 달립니다.
  • XCom (바통): 첫 번째 주자가 다음 주자에게 넘겨줄 정보(예: 현재까지의 랩 타임)를 담고 있습니다.
  • Task B (두 번째 주자): 첫 번째 주자로부터 바통(XCom)을 건네받아, 그 안의 정보를 확인하고 다음 구간을 달리기 시작합니다.

 

1. XCom의 작동방식 : Push/Pull

XCom은 Airflow의 Metastore 데이터베이스를 통해 작동하며, key-value 형태로 데이터를 저장합니다. Task 간에 데이터를 주고받는 두 가지 핵심 동작이 있습니다.

1) XCom Push (데이터 밀어넣기)

한 Task가 다른 Task에서 사용할 수 있도록 데이터를 XCom에 기록하는 행위입니다.

  • 자동 Push: PythonOperator에서 함수가 값을 반환(return)하면, Airflow는 기본적으로 그 반환 값을 return_value라는 key로 XCom에 자동으로 push합니다.
  • 수동 Push: ti.xcom_push() 메서드를 사용하여 명시적으로 key와 value를 지정하여 데이터를 push할 수 있습니다. 여러 값을 전달해야 할 때 유용합니다.

2. XCom Pull (데이터 가져오기)

다른 Task가 push해 놓은 데이터를 XCom에서 가져오는 행위입니다.

ti.xcom_pull() 메서드를 사용하며, 어떤 Task(task_ids)의 어떤 key 값을 가져올지 지정할 수 있습니다. key를 지정하지 않으면 기본적으로 return_value를 가져옵니다.

 

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
import pendulum

# 1. 값을 생성하고 반환(return)하는 함수 (자동 XCom Push)
def _push_function():
    print("Pushing a value to XCom!")
    # 이 값이 'return_value'라는 key로 XCom에 저장됩니다.
    return 42

# 2. XCom에서 값을 가져와 사용하는 함수
def _pull_function(**kwargs):
    ti = kwargs['ti']
    
    # 'push_task'가 반환한 값을 가져옵니다.
    # task_ids를 지정하고, key를 생략하면 'return_value'를 가져옵니다.
    pulled_value = ti.xcom_pull(task_ids='push_task')
    
    print(f"Pulled value from XCom: {pulled_value}")
    # 가져온 값으로 추가 작업 수행
    print(f"The doubled value is: {pulled_value * 2}")

with DAG(
    dag_id='example_xcom_dag',
    start_date=pendulum.datetime(2025, 9, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
) as dag:
    
    push_task = PythonOperator(
        task_id='push_task',
        python_callable=_push_function,
    )

    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=_pull_function,
    )

    push_task >> pull_task

 

 

Notice: XCom은 대용량 데이터(예: 큰 DataFrame, 이미지 파일 등)를 전달하기 위해 만들어지지 않았습니다. XCom 데이터는 Airflow Metastore DB에 직접 저장되므로, 큰 데이터를 저장하면 DB에 심각한 부하를 줄 수 있습니다. 데이터베이스 종류에 따라 다르지만 보통 수 KB ~ 몇 MB 이내의 작은 데이터만 전달하는 용도로 사용해야 합니다.
파일 경로, 작은 설정 값, ID, 간단한 통계 수치 등 메타데이터 성격의 정보를 전달하는 데 사용하는 것이 가장 좋습니다. 대용량 데이터는 S3, GCS와 같은 외부 스토리지에 저장하고, 그 파일의 경로(주소)를 XCom으로 전달하는 것이 올바른 사용법입니다.