본문 바로가기
🔥 Data Engineer/Airflow

[Airflow] - Airflow Xcom 5가지 방법

by jyu_seo_ 2026. 4. 8.

 

Xcom(Cross Communication)

모든 워크플로우 자동화 도구가 그렇듯, Apache Airflow에서도 테스크(Task)간에 데이터를 주고받아야 하는 상황이 빈번하게 발생합니다. Airflow에서는 이를 Xcom(Cross-Communication)이라는 메커니즘을 제공합니다.

 

Xcom은 Ariflow의 메타데이터베이스에 저장되는 작은 데이터 조각으로, 태스크간에 데이터를 공유할수 있게 해줍니다. 예를들어, 한 태스크에서 생성한 파일 경로, 처리된 데이터의 요약 정보, 또는 다음 태스크에 필요한 설정값 등을 전달할 수 있습니다.

  • 작업(Task)간 데이터나 return 값 등 이전 작업의 Output을 이후 작업에 전달해주기 위한 메커니즘
  • 동일한 Dag 작업 파이프라인 내에서만 데이터 공유가 가능
  • 문자열, 숫자, 리스트 등 직렬화 가능한 소규모 데이터를 공유하도록 설계 (약 10Mb 이하)
  • 'xcom_push()', 'xcom_pull()' 메소드를 활용하여 데이터 공유
  • PythonOperator의 반환 값은 자동으로 Xcom에 저장

Xcom은 하나의 DAG에서 태스크가 연쇄적으로 돌아갈때 태스크들끼리 데이터 공유가 필요한 경우에 그 데이터가 저장되는 소규모 저장소가 XCOM입니다.

 


Xcom의 크기 제한 

Xcom을 사용할때 주의해야 할 중요한점은 데이터 크기 제한입니다. 표준 Xcom 백엔드를 사용할 경우, Xcom의 크기 제한은 사용중인 메타데이터 데이터베이스에 따라 결정됩니다.

데이터베이스 크기 제한
PostgreSQL 1 GB
SQLite 2 GB
MySQL 64 KB

 

보시다시피, 특히 MySQL의 경우 64KB라는 매우 작은 제한이 있어 큰 데이터를 전달하기에는 부적합합니다. 만약 Xcom을 통해서 전달하려는 데이터가 메타 데이터 베이스의 크기 제한을 초과할 가능성이 있다면, CustomXcom 저장소를 구축하여 활용하는 것을 고려해야 합니다.

 


**context 파라미터 활용하기

전통적인 PythonOperator 방식에서 호출되는 함수에 **context를 인자로 받아 ti(Task Instance)객체에 접근하는 방법입니다.

def push_to_xcom(**context):
    message = "사과"
    ti = context["ti"]
    
    ti.xcom_push(
        key='message',
        value=message
    )
    return message # 'return_value'라는 키로도 자동 저장됨 

def pull_from_xcom(**context):
    ti = context["ti"]
    xcom_value = ti.xcom_pull(
        task_ids='py1',
        key='message'
    )
    print("py1에서 전달받은 결과 : ", xcom_value)
  • 장점: Airflow의 모든 컨텍스트 정보에 명시적으로 접근할 수 있습니다.
  • 특징: ti.xcom_push를 사용할 때 고유한 key를 지정할수 있습니다.

**context 에 무엇이 들어있는가?

**context를 프린트 했을때 쏟아지는 방대한 양의 메타데이터에 당황하곤 합니다. 이 데이터들은 현재 실행중인 **DAG과 태스크의 '상태 정보'** 입니다.

 

주요 항목들을 표로 정리하면 다음과 같습니다.

키(Key) 설명 예시
ds 태스크가 실행되는 논리적 날짜 (Date String) '2024-01-14'
ds_nodash 대시(-)가 제거된 날짜 문자열 '20240114'
ti / task_instance 현재 실행 중인 태스크 인스턴스 객체 <TaskInstance: ...>
dag 현재 태스크가 속한 DAG 객체 <DAG: ...>
logical_date 태스크의 논리적 실행 시점 (Pendulum 객체) 2024-01-14T00:00:00+00:00
run_id 현재 DAG Run의 고유 식별자 'scheduled__2024-01-14T00:00:00+00:00'

print(context) 실행 시 실제 출력 예시

{
    'ds': '2024-01-14',
    'ds_nodash': '20240114',
    'logical_date': DateTime(2024, 1, 14, 0, 0, 0, tzinfo=Timezone('UTC')),
    'dag': <DAG: xcom01_dag>,
    'ti': <TaskInstance: xcom01_dag.py1 [running]>,
    'run_id': 'scheduled__2024-01-14T00:00:00+00:00',
    'params': {},
    ... (중략) ...
}

 

따라서 **context를 사용한다는것은, **"Airflow가 태스크를 실행하면서 들고있는 모든 보따리(메타데이터)를 다 넘겨줘"**라고 요청하는 것과 같습니다.

 

첫번째 함수는 사과라고 하는 메세지를 Xcom에 밀어넣는 함수입니다.

두번째 함수는 Xcom에 저장되어 있는 사과라고 하는 메세지를 가져와서 프린트 하는 역할입니다.

asterisks(*) 두개가 붙은 파라미터가 들어가있습니다. context로 ti라고 하는 key에 들어가 있는 value를 ti라는 변수에 저장을 하고, 그 변수를 기반으로 해서 xcom_push 명령어를 사용하고 있습니다.

Airflow Dag

 

airflow에서 xcom01_dag를 toggle로 실행해본결과 Logs를 확인해보면 task와 ti를 확인해볼수있습니다.

task_instance와 ti는 동일한 value입니다. 이 value를 기반으로 해서 xcom_push(), xcom_pull()같은 메소드를 활용할수 있습니다.

 

pprint(context)파라미터에 asterisk 두개로 넘겨주게 되고 그것을 출력하게 되면 DAG에 대한 메타 정보들이 출력되서 나오는데,

그 메타 정보중에서 ti라고 하는 key에 대한 value를 가져와서 메소드(xcom_push)들을 구현할 수 가 있습니다.

 

pull_from_xcom에서는 ti객체들을 받은것을 기반으로 xcom_pull 메소드를 적은내용 입니다.

xcom_pull을 가져오는데 어떤 task에 어떤 key가 저장된 value를 가져올것인지 라고 명시를 해둔것입니다.

사과를 가져오는 과정

처음 방식은 PythonOperator로 호출되는 Python 함수 파라미터에 **context라고 하는 Key를 넣어서 그 Key에서 메타정보를 뽑아오는데 그 메타정보 중에 ti라고 하는 값을 받아와서 그걸 기반으로 xcom_push하고 pull하는 작업을 실제로 해보았습니다.

python으로 어떤 코드든 함수화 시키게 되면 에어플로우에서 사용할수있습니다. 함수안에서 ti에서 파생된 메소드들을 활용해서

데이터 공유가 가능하다는것을 확인할수 있습니다.


get_current_context() 활용하기

함수의 인자로 **context를 넘기지 않더라도, get_current_context()를 통해 현재 실행 중인 태스크의 컨텍스트를 동적으로 가져올 수 있습니다.

from airflow.sdk import get_current_context

def push_to_xcom():
    message = "사과"
    context = get_current_context()
    ti = context['ti']
    
    ti.xcom_push(key='message', value=message)
    return message

def pull_from_xcom():
    context = get_current_context()
    ti = context['ti']
    
    xcom_value = ti.xcom_pull(task_ids='py1', key='message')
    print("py1에서 전달받은 결과 : ", xcom_value)

 

첫번째 방식이랑 완전 동일한데, 첫번째 DAG에서는 ti라고 하는 객체를 **context에서 뽑아왔지만 

두번째 방식에서는 get_current_context()라고 하는 메소드를 선언해서 가져오는 방식입니다.(이방식을 조금더 선호합니다.)

  • 장점 : 함수의 시그니처를 깔끔하게 유지할 수 있습니다.

TaskFlow API에서 Return 사용하기(현대적인 방식)

Airflow 2.0 이상에서 권장되는 TaskFlow API를 사용하면, 복잡한 xcom_push/pull 코드 없이 함수의 return 값만으로 데이터를 전달할 수 있습니다.

@task(task_id="first")
def first_func(args):
    join_list = ' '.join(args)
    return join_list # 자동으로 XCom에 push됨

@task(task_id='second')
def second_func(message):
    # 인자로 넘겨받은 message는 이전 태스크의 return 값 (XCom pull)
    changed_list = '!' + message + '!'
    return changed_list

# DAG 내에서 호출
message = first_func(['FLOWER', 'AIRFLOW'])
second_func(message)
  • 장점: Python 함수를 호출하듯 직관적으로 태스크 간 데이터 흐름을 정의할 수 있습니다.

@task라고 하는 task decorator를 활용하면 함수를 따로 어떤 Operator에 적용시키지 않고 사용할수있습니다.

xcom_push,pull하지 않아도 자동적으로 함수들끼리의 데이터 공유도 가능하다.
공백기준으로 split해서 list로 반환해주는걸 확인해볼수 있습니다.


TaskFlow API 옵션 활용 (multiple_outputs)

태스크가 여러 개의 결과값을 딕셔너리 형태로 반환할 때, multiple_outputs=True 옵션을 주면 각각의 키 값이 개별 XCom 항목으로 저장됩니다.

@task(task_id="first", do_xcom_push=True, multiple_outputs=True)
def first_func(args):
    join_list = ' '.join(args)
    return {"key1": join_list} # 'key1'이라는 이름으로 XCom에 저장됨
  • 특징: 반환된 딕셔너리의 키를 통해 특정 데이터만 Pull 할 수 있어 관리가 용이합니다.

위에 코드와 거의 동일하지만 결국에는 리턴을 해야지만 Xcom에 들어가기 때문에, key가 return_value 하나로만 고정이 되게 됩니다. 하지만 key가 return_value말고 다른 key로 넣고싶을때, ti 객체를 import한 다음에 xcom_push 이런식으로 구현해도 되는데,그렇게 되면 코드도 더 늘어나고 첫번째 두번째 했던것들이랑 크게 차이가 없기 때문에 다른방식으로 return_value 말고 다른 key 이름으로 Xcom에 저장할수 있는 방법을 알아보도록 하겠습니다.

 

multiple_outputs는 기본설정값이 false로 설정이 되어있습니다.

 

첫번째 task만 가져왔는 옵션들을 2개를 주었고, join한 다음에 return을 하게 되는데 key1이라고 하는이름으로 들어갈수 있게 return을 시켜주는 과정입니다.

 

이런식으로 저장하게되면 return_value는 동일하게 들어와있지만 다른이름으로 key를 저장하고싶을때 사용하는 방식입니다.


다른 Operator에서 Jinja 템플릿 활용하기

Python이 아닌 다른 오퍼레이터(예: BashOperator)에서 XCom 데이터를 사용하고 싶을 때는 Jinja 템플릿 형식을 사용합니다.

t1 = PythonOperator(
    task_id="make_dirname",
    python_callable=make_dirname, # 내부에서 ti.xcom_push(key="dir_path", ...) 수행
)

t2 = BashOperator(
    task_id="make_dir",
    bash_command="mkdir -p {{ ti.xcom_pull(task_ids='make_dirname', key='dir_path') }}"
)
  • 장점: 서로 다른 언어나 환경을 사용하는 오퍼레이터 간의 협업이 가능해집니다.

지금까지는 PythonOperator를 대상으로만 Xcom공유를 진행해봤다면, Jinja 템플릿을 사용하는 방식인데요

 

첫번째 테스크로 PythonOperator를 실행했는데, 두번째 테스크로 BashOperator (특정 서버에 리눅스 명령어를 내릴수가 있는 Operator입니다.) 이 Operator에서 첫번째 Task에 나온 결과를 받아와야 한다. 그런경우에는 이 오퍼레이터가 파이썬 함수를 실행하는 오퍼레이터가 아니기 때문에 Xcom에 데이터를 넣고 빼고 하는 과정을 개발해가지고 그걸 호출하고는 할수 없습니다.

그런 경우에는 Jinja 탬플릿을 활용하게 됩니다.

bash command 쓰는곳이 있습니다. mkdir 즉 디렉토리를 만들고, 그 이름을 t1에서 받아오는 방식입니다. 중괄호 두개를 활용한 JinJa 템플릿을 활용하는 방법입니다. jinja 템플릿을 활용하는것은 Xcom_pull만 가능하고 Push는 불가능합니다.


요약

방식 특징 추천 상황
**context 명시적 객체 전달 복잡한 컨텍스트 제어가 필요할 때
get_current_context 깔끔한 함수 시그니처 함수 인자를 단순하게 유지하고 싶을 때
TaskFlow return 가장 간결하고 직관적 일반적인 Python 기반 태스크 연결 시
multiple_outputs 딕셔너리 자동 분리 여러 결과값을 개별적으로 전달할 때
Jinja 템플릿 오퍼레이터 간 통합 Bash, SQL 등 다른 오퍼레이터로 전달할 때

참고 문서

 

XComs — Airflow 3.2.0 Documentation

 

airflow.apache.org

 

Pass data between tasks | Astronomer Documentation

Learn more about the most common methods to implement data sharing between your Airflow tasks, including an in-depth explanation of XCom.

www.astronomer.io