Xcom(Cross Communication)
모든 워크플로우 자동화 도구가 그렇듯, Apache Airflow에서도 테스크(Task)간에 데이터를 주고받아야 하는 상황이 빈번하게 발생합니다. Airflow에서는 이를 Xcom(Cross-Communication)이라는 메커니즘을 제공합니다.
Xcom은 Ariflow의 메타데이터베이스에 저장되는 작은 데이터 조각으로, 태스크간에 데이터를 공유할수 있게 해줍니다. 예를들어, 한 태스크에서 생성한 파일 경로, 처리된 데이터의 요약 정보, 또는 다음 태스크에 필요한 설정값 등을 전달할 수 있습니다.
- 작업(Task)간 데이터나 return 값 등 이전 작업의 Output을 이후 작업에 전달해주기 위한 메커니즘
- 동일한 Dag 작업 파이프라인 내에서만 데이터 공유가 가능
- 문자열, 숫자, 리스트 등 직렬화 가능한 소규모 데이터를 공유하도록 설계 (약 10Mb 이하)
- 'xcom_push()', 'xcom_pull()' 메소드를 활용하여 데이터 공유
- PythonOperator의 반환 값은 자동으로 Xcom에 저장

Xcom은 하나의 DAG에서 태스크가 연쇄적으로 돌아갈때 태스크들끼리 데이터 공유가 필요한 경우에 그 데이터가 저장되는 소규모 저장소가 XCOM입니다.
Xcom의 크기 제한
Xcom을 사용할때 주의해야 할 중요한점은 데이터 크기 제한입니다. 표준 Xcom 백엔드를 사용할 경우, Xcom의 크기 제한은 사용중인 메타데이터 데이터베이스에 따라 결정됩니다.
| 데이터베이스 | 크기 제한 |
| PostgreSQL | 1 GB |
| SQLite | 2 GB |
| MySQL | 64 KB |
보시다시피, 특히 MySQL의 경우 64KB라는 매우 작은 제한이 있어 큰 데이터를 전달하기에는 부적합합니다. 만약 Xcom을 통해서 전달하려는 데이터가 메타 데이터 베이스의 크기 제한을 초과할 가능성이 있다면, CustomXcom 저장소를 구축하여 활용하는 것을 고려해야 합니다.
**context 파라미터 활용하기
전통적인 PythonOperator 방식에서 호출되는 함수에 **context를 인자로 받아 ti(Task Instance)객체에 접근하는 방법입니다.
def push_to_xcom(**context):
message = "사과"
ti = context["ti"]
ti.xcom_push(
key='message',
value=message
)
return message # 'return_value'라는 키로도 자동 저장됨
def pull_from_xcom(**context):
ti = context["ti"]
xcom_value = ti.xcom_pull(
task_ids='py1',
key='message'
)
print("py1에서 전달받은 결과 : ", xcom_value)
- 장점: Airflow의 모든 컨텍스트 정보에 명시적으로 접근할 수 있습니다.
- 특징: ti.xcom_push를 사용할 때 고유한 key를 지정할수 있습니다.
**context 에 무엇이 들어있는가?
**context를 프린트 했을때 쏟아지는 방대한 양의 메타데이터에 당황하곤 합니다. 이 데이터들은 현재 실행중인 **DAG과 태스크의 '상태 정보'** 입니다.
주요 항목들을 표로 정리하면 다음과 같습니다.
| 키(Key) | 설명 | 예시 |
| ds | 태스크가 실행되는 논리적 날짜 (Date String) | '2024-01-14' |
| ds_nodash | 대시(-)가 제거된 날짜 문자열 | '20240114' |
| ti / task_instance | 현재 실행 중인 태스크 인스턴스 객체 | <TaskInstance: ...> |
| dag | 현재 태스크가 속한 DAG 객체 | <DAG: ...> |
| logical_date | 태스크의 논리적 실행 시점 (Pendulum 객체) | 2024-01-14T00:00:00+00:00 |
| run_id | 현재 DAG Run의 고유 식별자 | 'scheduled__2024-01-14T00:00:00+00:00' |
print(context) 실행 시 실제 출력 예시
{
'ds': '2024-01-14',
'ds_nodash': '20240114',
'logical_date': DateTime(2024, 1, 14, 0, 0, 0, tzinfo=Timezone('UTC')),
'dag': <DAG: xcom01_dag>,
'ti': <TaskInstance: xcom01_dag.py1 [running]>,
'run_id': 'scheduled__2024-01-14T00:00:00+00:00',
'params': {},
... (중략) ...
}
따라서 **context를 사용한다는것은, **"Airflow가 태스크를 실행하면서 들고있는 모든 보따리(메타데이터)를 다 넘겨줘"**라고 요청하는 것과 같습니다.

첫번째 함수는 사과라고 하는 메세지를 Xcom에 밀어넣는 함수입니다.
두번째 함수는 Xcom에 저장되어 있는 사과라고 하는 메세지를 가져와서 프린트 하는 역할입니다.
asterisks(*) 두개가 붙은 파라미터가 들어가있습니다. context로 ti라고 하는 key에 들어가 있는 value를 ti라는 변수에 저장을 하고, 그 변수를 기반으로 해서 xcom_push 명령어를 사용하고 있습니다.
Airflow Dag

airflow에서 xcom01_dag를 toggle로 실행해본결과 Logs를 확인해보면 task와 ti를 확인해볼수있습니다.
task_instance와 ti는 동일한 value입니다. 이 value를 기반으로 해서 xcom_push(), xcom_pull()같은 메소드를 활용할수 있습니다.
pprint(context)파라미터에 asterisk 두개로 넘겨주게 되고 그것을 출력하게 되면 DAG에 대한 메타 정보들이 출력되서 나오는데,
그 메타 정보중에서 ti라고 하는 key에 대한 value를 가져와서 메소드(xcom_push)들을 구현할 수 가 있습니다.

pull_from_xcom에서는 ti객체들을 받은것을 기반으로 xcom_pull 메소드를 적은내용 입니다.
xcom_pull을 가져오는데 어떤 task에 어떤 key가 저장된 value를 가져올것인지 라고 명시를 해둔것입니다.


처음 방식은 PythonOperator로 호출되는 Python 함수 파라미터에 **context라고 하는 Key를 넣어서 그 Key에서 메타정보를 뽑아오는데 그 메타정보 중에 ti라고 하는 값을 받아와서 그걸 기반으로 xcom_push하고 pull하는 작업을 실제로 해보았습니다.
python으로 어떤 코드든 함수화 시키게 되면 에어플로우에서 사용할수있습니다. 함수안에서 ti에서 파생된 메소드들을 활용해서
데이터 공유가 가능하다는것을 확인할수 있습니다.
get_current_context() 활용하기
함수의 인자로 **context를 넘기지 않더라도, get_current_context()를 통해 현재 실행 중인 태스크의 컨텍스트를 동적으로 가져올 수 있습니다.
from airflow.sdk import get_current_context
def push_to_xcom():
message = "사과"
context = get_current_context()
ti = context['ti']
ti.xcom_push(key='message', value=message)
return message
def pull_from_xcom():
context = get_current_context()
ti = context['ti']
xcom_value = ti.xcom_pull(task_ids='py1', key='message')
print("py1에서 전달받은 결과 : ", xcom_value)
첫번째 방식이랑 완전 동일한데, 첫번째 DAG에서는 ti라고 하는 객체를 **context에서 뽑아왔지만
두번째 방식에서는 get_current_context()라고 하는 메소드를 선언해서 가져오는 방식입니다.(이방식을 조금더 선호합니다.)
- 장점 : 함수의 시그니처를 깔끔하게 유지할 수 있습니다.
TaskFlow API에서 Return 사용하기(현대적인 방식)
Airflow 2.0 이상에서 권장되는 TaskFlow API를 사용하면, 복잡한 xcom_push/pull 코드 없이 함수의 return 값만으로 데이터를 전달할 수 있습니다.
@task(task_id="first")
def first_func(args):
join_list = ' '.join(args)
return join_list # 자동으로 XCom에 push됨
@task(task_id='second')
def second_func(message):
# 인자로 넘겨받은 message는 이전 태스크의 return 값 (XCom pull)
changed_list = '!' + message + '!'
return changed_list
# DAG 내에서 호출
message = first_func(['FLOWER', 'AIRFLOW'])
second_func(message)
- 장점: Python 함수를 호출하듯 직관적으로 태스크 간 데이터 흐름을 정의할 수 있습니다.
@task라고 하는 task decorator를 활용하면 함수를 따로 어떤 Operator에 적용시키지 않고 사용할수있습니다.




TaskFlow API 옵션 활용 (multiple_outputs)
태스크가 여러 개의 결과값을 딕셔너리 형태로 반환할 때, multiple_outputs=True 옵션을 주면 각각의 키 값이 개별 XCom 항목으로 저장됩니다.
@task(task_id="first", do_xcom_push=True, multiple_outputs=True)
def first_func(args):
join_list = ' '.join(args)
return {"key1": join_list} # 'key1'이라는 이름으로 XCom에 저장됨
- 특징: 반환된 딕셔너리의 키를 통해 특정 데이터만 Pull 할 수 있어 관리가 용이합니다.
위에 코드와 거의 동일하지만 결국에는 리턴을 해야지만 Xcom에 들어가기 때문에, key가 return_value 하나로만 고정이 되게 됩니다. 하지만 key가 return_value말고 다른 key로 넣고싶을때, ti 객체를 import한 다음에 xcom_push 이런식으로 구현해도 되는데,그렇게 되면 코드도 더 늘어나고 첫번째 두번째 했던것들이랑 크게 차이가 없기 때문에 다른방식으로 return_value 말고 다른 key 이름으로 Xcom에 저장할수 있는 방법을 알아보도록 하겠습니다.



첫번째 task만 가져왔는 옵션들을 2개를 주었고, join한 다음에 return을 하게 되는데 key1이라고 하는이름으로 들어갈수 있게 return을 시켜주는 과정입니다.

이런식으로 저장하게되면 return_value는 동일하게 들어와있지만 다른이름으로 key를 저장하고싶을때 사용하는 방식입니다.
다른 Operator에서 Jinja 템플릿 활용하기
Python이 아닌 다른 오퍼레이터(예: BashOperator)에서 XCom 데이터를 사용하고 싶을 때는 Jinja 템플릿 형식을 사용합니다.
t1 = PythonOperator(
task_id="make_dirname",
python_callable=make_dirname, # 내부에서 ti.xcom_push(key="dir_path", ...) 수행
)
t2 = BashOperator(
task_id="make_dir",
bash_command="mkdir -p {{ ti.xcom_pull(task_ids='make_dirname', key='dir_path') }}"
)
- 장점: 서로 다른 언어나 환경을 사용하는 오퍼레이터 간의 협업이 가능해집니다.
지금까지는 PythonOperator를 대상으로만 Xcom공유를 진행해봤다면, Jinja 템플릿을 사용하는 방식인데요
첫번째 테스크로 PythonOperator를 실행했는데, 두번째 테스크로 BashOperator (특정 서버에 리눅스 명령어를 내릴수가 있는 Operator입니다.) 이 Operator에서 첫번째 Task에 나온 결과를 받아와야 한다. 그런경우에는 이 오퍼레이터가 파이썬 함수를 실행하는 오퍼레이터가 아니기 때문에 Xcom에 데이터를 넣고 빼고 하는 과정을 개발해가지고 그걸 호출하고는 할수 없습니다.
그런 경우에는 Jinja 탬플릿을 활용하게 됩니다.
bash command 쓰는곳이 있습니다. mkdir 즉 디렉토리를 만들고, 그 이름을 t1에서 받아오는 방식입니다. 중괄호 두개를 활용한 JinJa 템플릿을 활용하는 방법입니다. jinja 템플릿을 활용하는것은 Xcom_pull만 가능하고 Push는 불가능합니다.

요약
| 방식 | 특징 | 추천 상황 |
| **context | 명시적 객체 전달 | 복잡한 컨텍스트 제어가 필요할 때 |
| get_current_context | 깔끔한 함수 시그니처 | 함수 인자를 단순하게 유지하고 싶을 때 |
| TaskFlow return | 가장 간결하고 직관적 | 일반적인 Python 기반 태스크 연결 시 |
| multiple_outputs | 딕셔너리 자동 분리 | 여러 결과값을 개별적으로 전달할 때 |
| Jinja 템플릿 | 오퍼레이터 간 통합 | Bash, SQL 등 다른 오퍼레이터로 전달할 때 |
참고 문서
XComs — Airflow 3.2.0 Documentation
airflow.apache.org
Pass data between tasks | Astronomer Documentation
Learn more about the most common methods to implement data sharing between your Airflow tasks, including an in-depth explanation of XCom.
www.astronomer.io
'🔥 Data Engineer > Airflow' 카테고리의 다른 글
| [Airflow] - Sensor(BashSensor, FileSensor, ExternalTaskSensor, TriggerDagRunOperator) (0) | 2026.04.14 |
|---|---|
| [Airflow] - Custom Operator (0) | 2026.04.14 |
| [Airflow] - Airflow Connection (0) | 2026.04.08 |
| [Airflow] - Airflow (1) | 2026.04.05 |
| [Airflow] - Architecture on Celery (0) | 2026.01.27 |