개요
작업 의존성을 Airflow에서 정의하는 방법과 이러한 기능을 사용하여 조건부 Task, 분기 및 조인을 비롯한 보다 복잡한 패턴을 구현하는 방법에 대해 알아보고, XCom을 이용한 Task 사이의 상태 공유 방법, Airflow 2의 새로운 API인 Taskflow API를 통해, 파이썬 작업과 XCom을 많이 사용하는 DAG를 단순화하는 방법에 대해서 알아보겠습니다.
기본 의존성 유형
1. 선형 의존성 유형
책에서 제공하는 로켓 사진 가져오기 DAG의 Task를 기반으로 선형 의존성 유형을 설명드리겠습니다.
1.1 로켓 사진 가져오기 Task Chain
download_launches=BashOperator(...)
get_pictures=PythonOperator(...)
notify=BashOperator(...)

로켓 사진 가져오기 DAG의 워크플로우를 보면 다음과 같습니다. 이미지를 다운로드 하고, 다운로드한 이미지를 가져옵니다. 전체 프로세스가 완료되면 알려주는 총 3가지의 Task로 이루어져 있습니다.
위 유형의 DAG에서는 앞선 Task의 결과가 다음 Task의 입력 값으로 사용되기 때문에, 다음 Task로 이동하려면 앞선 Task를 Issue 없이 완료하여야 합니다.
1.2 로켓 가져오기 (시프트 연산자)
download_launches >> get_pictures
get_pictures >> notify
download_launches >> get_pictures >> notify
이러한 의존 관계를 시프트 연산자(>>)를 사용해 나타낼 수 있습니다. 위 방법을 보시면 두가지의 방식으로 의존성 여부를 나타내었는데 첫번째 경우에는 작업 의존성을 각각 설정한 경우이고, 두번째 경우에는 여러 개의 의존성을 설정할 수 있습니다. (딱히 크게 상관은 없음)
다음과 같이 Task 의존성을 명시적으로 지정하면 Task의 순서가 명확하게 정의된다는 이점이 있습니다.
모든 Issue는 Airflow에 의해 다운스트림 태스크로 전달되어 실행을 지연시키는 특징을 가지고 있습니다. 예를 들어 download_launches Task가 실패한 경우, Airflow는 download_launches Task가 정상적으로 실행될 때 까지 get_pictures Task를 실행시키지 않습니다.
2. 팬인/팬아웃 (fan-in/fan-out) 의존성
위의 설명드린 선형 체인 말고도 Airflow의 Task 의존성을 사용하여 Task 간 복잡한 의존성 구조를 만들 수 있습니다. 책에서 제공하는 우산 판매 예측 모델을 기반으로 팬인/팬아웃 의존성을 설명드리겠습니다.
2.1 우산 판매 예측 모델 Task Chain

Umbella DAG의 주요 목적은 서로 다른 두 소스에서 매일 날씨 및 판매 데이터를 가져와서 두 데이터 세트를 데이터 세트로 결합하여 모델을 학습시키는 것입니다.
위에서 보시는 바와 같이 fetch_weather (날씨 데이터 가져오기), clean_weather (날씨 데이터 정제하기) Task는 서로 선형 의존성을 가지고 있지만 fetch_sales (판매 데이터 가져오기), clean_sales (판매 데이터 정제하기) Task와는 서로 의존성을 가지고 있지 않습니다.
2.2 우산 판매 예측 모델 (시프트 연산자)
fetch_weather >> clean_weather
fetch_sales >> clean_sales
따라서, 시프트 연산자를 통해 위 DAG를 표현하면 다음과 같습니다. (병렬로 실행되는 선형 의존성)
2.3 팬아웃(Fan-out)
여러 개의 입력 Task 연결 수를 제한하는 것을 팬아웃(fan-out) 이라고 합니다.
fetch_weather, fetch_sales Task의 업스트림에 DAG의 시작을 나타내는 start 라는 Dummy Task를 추가할 수도 있습니다. Dummy Task는 반드시 필요하지는 않지만 fetch_weather, fetch_sales Task가 DAG 시작 시 발생하는 암묵적인 팬아웃을 설명하는데 도움이 됩니다.
2.3.1 팬아웃 (fan-out) 종속성 정의
from airflow.operators.dummy import DummyOperator
start=DummyOperator(task_id="start") # 더미 시작 태스크 생성
start >> [fetch_weather, fetch_sales] # 팬아웃(일 대 다) 의존성 태스크 생성
하나의 Task를 여러 다운스트림 Task에 연결하는 것을 팬아웃 (fan-out) 종속성이라고 합니다.
2.4 팬인 (fan-in)
단일 다운스트림 Task가 여러 업스트림 Task에 의존성을 갖는데 이런 구조를 팬인 (fan-in) 이라고 합니다.
2.4-1 팬인 (fan-in) 의존성 정의
[clean_weather, clean_sales] >> join_datasets
팬인 (fan-in) (다 대 일) 의존성을 시프트 연산자로 나타낸 것입니다.
위 팬아웃 구조와 달리 결합된 데이터 세트를 만들기 위해서는 clean_weather, clean_sales Task가 정상적으로 이루어 져야 join_datasets Task가 실행됩니다. 즉 join_datasets Task는 clean_weather, clean_sales Task 업스트림에 종속성을 가지고 있습니다. 이러한 구조를 팬인 구조라고 합니다.
2.5 나머지 Task에 대한 시프트 연산자
join_datasets >> train_model >> deploy_model
차례대로 데이터 셋 결합하기, 머신러닝 모델 학습하기, 머신러닝 모델 배포하기의 순서로 Task가 선형 의존성을 가지고 있습니다.
2.6 Umbella DAG 실행 실습 예제 코드
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
dag = DAG(
dag_id="01_umbrella",
description="Umbrella example with DummyOperators.",
start_date=airflow.utils.dates.days_ago(5),
schedule_interval="@daily",
)
start = DummyOperator(task_id="start")
fetch_weather = DummyOperator(task_id="fetch_weather", dag=dag)
fetch_sales = DummyOperator(task_id="fetch_sales", dag=dag)
clean_weather = DummyOperator(task_id="clean_weather", dag=dag)
clean_sales = DummyOperator(task_id="clean_sales", dag=dag)
join_datasets = DummyOperator(task_id="join_datasets", dag=dag)
train_model = DummyOperator(task_id="train_model", dag=dag)
deploy_model = DummyOperator(task_id="deploy_model", dag=dag)
start >> [fetch_weather, fetch_sales]
fetch_weather >> clean_weather
fetch_sales >> clean_sales
[clean_weather, clean_sales] >> join_datasets
join_datasets >> train_model >> deploy_model
2.6.1 Umbella DAG 구성도
팬아웃/팬인 구조를 설명드리면서 실습한 Umbella DAG의 구성도입니다.


다음과 같이 Task가 정상적으로 실행된 것을 볼 수 있습니다.
보시는 바와 같이 DAG를 실행하면 Airflow가 먼저 start Task를 실행시키고, 병렬로 sales, weather Task가 실행 됩니다. 이 후 업스트림이 Issue 없이 정상적으로 실행된다면 join_datasets Task가 실행되고, 이 후 train_model, deploy_model이 실행됩니다.
Task 분기처리는 왜필요한가?
조건에 따라 실행을 선택해서, 비용·성능·안정성을 동시에 잡기 위해 필요합니다.
Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과입니다 (task 1 >> [task2-1, task2-2, task2-3])
그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리입니다.

Airflow에서 task를 분기처리 하는 방법
Apache Airflow에서 Task를 분기 처리하는 방법은 몇 가지 패턴이 있는데,
실무에서는 3가지 방식이 핵심이다.
1. BranchPythonOperator (가장 많이 씀)
2. BaseBranchOperator (재사용/고급)
3. ShortCircuitOperator (전체 skip)
BranchPythonOperator 사용하는 방법(전통방식)
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'a':
return 'task_a' # task_a 실행(str으로 반환)
elif selected item in ['b','c']:
return ['task_b', 'task_c'] # task_b, task_c 실행 (list로 반환)
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=selected_random # 해당 함수의 리턴값이 해당 task의 후행으로 올 task를 의미함
)
python_branch_task >> [task_a, task_b, task_c]
BranchPythonOperator를 사용하는 방법은 크게 어렵지 않다. BranchPythonOperator에서 호출하는 함수에 상황에 따라 실행할 task의 id를 스트링(1개일 때) 또는 리스트(2개 이상일 때)로 반환시켜주면 된다.
위 함수를 가진 DAG을 제작해 실행해 보았다.(task 1,2,3은 selected_item의 결과를 출력하는 함수를 가진 task로 임의 제작)

DAG을 실행시킨 뒤 Graph 탭을 확인해보니 task_a가 분홍색 테두리로 감싸져 있다. 이는 해당 task가 skipped 처리되었다는 의미이다. select_random 함수에서 'B' 또는 'C'가 선택된 듯하다. 따라서 task_b와 task_c는 초록색으로 감싸진 'success' 처리가 되었다는 것을 볼 수 있다.)

그리고 BranchPythonOperator로 실행 시킨 task의 XCom 정보를 보면 skipmixin_key라는 새로운 항목을 발견할 수 있다. 이후에 XCom을 통해서 어떤 task가 선택되었는지 확인할 수 있다.
@task.branch 데코레이터로 분기 처리하는 방법
from airflow.decorators import task
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_random == 'a':
return 'task_a'
elif selected_item in ['b','c']:
return ['task_b', 'task_c']
select_random() >> ['task_a', 'task_b', 'task_c']
BranchPythonOperator를 쓸 때처럼 간단히 사용할 수 있다. 달라진 점은 임포트하는 라이브러리가 달라졌다는 점과 더불어 @task.branch라는 데코레이터를 함수 위에 붙여 사용했다는 것이다. 이후 데코레이터가 붙은 함수를 실행시켜 flow를 연결 시켜주기만 하면 BranchPythonOperator를 사용한 것과 동일한 결과가 나온다
BaseBranchOperator로 분기처리 하는 방법 (클래스 상속)
from airflow.operators.branch import BaseBranchOperator
with DAG(...
) as dag:
class CustomBranchOperator(BaseBranchOperator): # 오버라이딩(클래스 상속, 클래스 이름은 상관없음)
def choose_branch(self, context): # BaseBranchOperator 상속시 choose_branch 함수를 구현해줘야 함.
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'A':
retrun 'task_a'
elif selected_item in ['b', 'c']:
return ['task_b', 'task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
custom_branch_operator >> [task_a, task_b, task_c]
Airflow에서 BaseBranchOperator를 통해 분기 처리를 하기 위해서는 BaseBranchOperator를 상속받는 클래스를 만들고, 그 아래에 choose_branch()라는 함수를 만들어 분기 처리할 task를 설정해주어야 한다.
Airflow 공식 홈페이지에 따르면, BaseBranchOperator에 대한 설명이 아래와 같이 적혀있다.
A base class for creating operators with branching functionality, like to BranchPythonOperator.
Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.
(번역) BranchPython Operator와 같이 분기 기능을 가진 연산자를 만들기 위한 기본 클래스입니다. 사용자는 이 연산자에서 하위 클래스를 생성하고 choose_branch(자체, 컨텍스트) 함수를 구현해야 합니다. 이 작업은 분기를 결정하는 데 필요한 모든 비즈니스 로직을 실행하고 단일 작업에 대한 task_id 또는 task_id 목록을 반환해야 합니다. 연산자는 반환된 task_id(s)를 계속하고 이 연산자의 바로 아래에 있는 다른 모든 작업은 건너뜁니다.
위에서 사용한 BranchPythonOperator, @task.branch와 같이 return값에 후행에 실행한 task를 반환해주면 된다.

실행한 결과 이번엔 task_a가 선택되어 task_b와 task_c가 skipped된 것을 볼 수 있다.

XCom을 확인했을 때에도 의도한 대로 skipmixin_key에 task_a가 선택되어 저장된 것을 알 수 있다.
조건부 Task
Airflow는 특정 조건에 따라 DAG에서 특정 Task를 건너뛸 수 있는 다른 방법도 제공합니다.
Task 내에서 조건
PythonOperator를 사용하여 배포를 구현하고 배포 함수 내에서 DAG의 실행 날짜를 명시적으로 확인하여 모델의 특정 버전에 대해서 배포가 가능합니다.
Task 내에서 조건 구현
def _deploy(**context):
if context["execution_date"] == ...:
deploy_model()
deploy=PythonOperator(
task_id="deploy_model",
python_callable=_deploy,
)
조건부 Task 만들기
배포 Task 자체를 조건부화하는 방법이 있습니다. 이 방법은 미리 정의된 조건에 따라서만 실행됩니다. 예를 들어 Airflow에서 해당 조건을 테스트하고 조건이 실패할 경우 모든 다운스트림 작업을 건너뛰는 Task를 DAG에 추가하여 Task를 조건부화 할 수 있습니다.
DAG에서 조건부 빌드하기
def _latest_only(**context):
...
latest_only=PythonOperator(
task_id="latest_only",
python_callable=_latest_only
)
latest_only >> deploy_model
조건부 배포가 포함된 DAG 구현

조건부 배포가 포함된 umbrella DAG의 구현입니다. 조건이 DAG Task에 포함되어 이전 구현보다 훨씬 더 명확해진 것을 볼 수 있습니다.
latest_only 함수 작성
execution_date가 가장 최근 실행에 속하지 않는 경우, 다운스트림 작업을 건너뛰도록 _latest_only 함수를 작성합니다.이를 위해 실행 날짜를 확인하고, 필요한 경우 AirflowSkipException 함수를 실행합니다.
* AirflowSkipException 란?
Airflow가 조건과 모든 다운스트림 Task를 건너뛰라는 것을 나타내는 함수입니다.
latest_only 조건 함수 구현
from airflow.exceptions import AirflowSkipException
def _latest_only(**context):
left_window=context["dag"].following_schedule(context["execution_date"]) # 실행 윈도우에서 경계를 확인
right_window=context["dag"].following_schedule(left_window)
now=pendulum.now("UTC") # 현재 시간이 윈도우 안에 있는지 확인
if not left_window < now <= right_window:
raise AirflowSkipException("Not the most recent run!")
latest_only 조건을 적용한 결과

내장 Operator 사용
Airflow의 내장 클래스인 LastOnlyOperator 클래스를 사용하여 가장 최근 실행한 DAG만 실행하는 예를 구현할 수 있습니다.
내장 LatestOnlyOperator사용하기
from airflow.operators.latest_only import LatestOnlyOperator
latest_only=LatestOnlyOperator(
task_id="latest_only",
dag=dag
)
join_datasets >> train_model >> deploy_model
latest_only >> deploy_model
가장 최근 실행한 DAG만 실행하는 부분을 구현한 예입니다. Airflow의 내장 클래스인 LastOnlyOperator 클래스를 사용하였고 이 Operator는 PythonOperator를 기반으로 동일한 작업을 가능하게 합니다.
따라서, LatestOnlyOperator를 사용하면 조건부 배포를 구현하기 위해 복잡한 로직을 작성할 필요가 없습니다.
Task 간 Data 공유
Airflow의 XCom을 사용하여 Task 간에 작은 Data를 공유할 수 있습니다.
*XCom이란?
기본적으로 Task 간에 Message를 교환하여 특정 상태를 공유할 수 있게 해줍니다.
XCom을 사용하여 Data 공유하기 (push)
def _train_model(**context):
model_id=str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
train_model=PythonOperator(
task_id="train_model",
python_callable=_train_model
)
다음은 XCom을 사용하여 train_model 및 deploy_model 작업 간에 모델 식별자를 고유하는 것입니다.
- train_model Task는 다른 Task에서 XCom 값을 사용할 수 있도록 XCom에 모델 식별자 값을 보냅니다.
- Airflow 컨택스트의 태스크 인스턴스의 xcom_push 메서드를 사용하여 값을 게시할 수 있습니다.
XCom값 확인

XCom을 사용하여 Data공유(Pull)
def _deploy_model(**context):
model_id=context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}")
deploy_model=PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model
)

xcom_pull 메서드를 사용하여 다른 Task에서 XCom 값을 확인할 수 있습니다.
XCom 사용 시 고려사항
XCom은 작업 간에 상태를 공유하는 데 매우 유용해 보일 수 있지만, 몇 가지 단점이 존재합니다.
1) 의존성 문제
pull Task는 묵시적인 의존성이 필요하고, DAG에 표시되지 않으며 Task Schedule시에 고려되지 않습니다. 따라서 숨겨진 의존성은 서로 다른 DAG에서 실행 날짜 사이에 XCom 값을 공유할 매우 복잡해지기 때문에 권장하지 않습니다.
2) 원자성을 무너뜨리는 패턴 문제
예를 들어 Token 값을 가지고 API에 접근할 때 Token 값이 만료 되어 다음 Task를 재실행 하지 못할 수 있습니다.
3) XCom이 저장하는 모든 값은 직렬화를 지원해야 하는 문제
람다 또는 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 XCom에 저장할 수 없습니다.
4) 백엔드에 의해 XCom 값의 저장 크기가 제한되는 문제
기본적으로 XCom은 Airflow의 메타스토어에 저장되며 다음과 같이 크기가 제한됩니다.
| SQLite | BLOB 유형으로 저장, 2GB 제한 |
| PostgreSQL | BYTEA 유형으로 저장, 1GB 제한 |
| MySQL | BLOB 유형으로 저장, 64KB 제한 |
커스텀 XCom 백엔드 사용하기
Airflow 메타스토어를 사용하여 XCom을 저장 시에 제한 사항은 큰 데이터 볼륨을 저장할 때 확장할 수 없다는 점입니다. XCom은 일반적으로 작은 값이나 결과값을 저장하는데 사용되고, 큰 데이터 세트를 저장시에는 사용하지 않습니다.
커스텀 XCom 백엔드를 위한 구조
from typing import Any
from airflow.models.xcom import BaseXCom
class CustomXComBackend(BaseXCom):
@staticmethod
def serialize_value(value: Any):
...
@staticmethod
def deserizlize_value(result) -> Any:
...
Airflow 2 에서는 XCom을 좀 더 유연하게 활용하기 위해 XCom 백엔드를 지정할 수 있는 옵션이 있습니다. 이 옵션을 사용하면 커스텀 클래스를 정의하여 XCom을 저장, 검색할 수 있습니다.
- 다음 클래스를 사용하기 위해서는 'BaseXCom 기본 클래스가 상속되어야 합니다.
- 값을 직렬화 및 역직렬화 하기 위해 두 가지 정적 매서드를 각각 구현해야 합니다.
- 커스텀 백엔드 클래스에서 직렬화 메서드는 XCom 값이 Operator 내에서 게시될 때마다 호출됩니다.
- 역직렬화 메서드는 XCom 값이 백엔드에서 가져올 때 호출됩니다.
- 원하는 백엔드 클래스가 있으면 Airflow 구성에서 xcom_backend 매개변수를 사용해 클래스를 사용하도록 Airflow를 구성할 수 있습니다.
커스텀 XCom 백엔드는 XCom 값 저장 선택을 다양하게 하며 상대적으로 저렴하고 확장 가능한 클라우드 스토리지에 더 큰 XCom 값의 저장이 가능합니다. 따라서 클라우드 서비스를 위한 커스텀 백엔드를 구현할 수 있습니다.
'🔥 Data Engineer > Airflow' 카테고리의 다른 글
| [Airflow] - Sensor의 Reschedule & poke & Deferrable Operator (0) | 2026.04.17 |
|---|---|
| [Airflow] - DAG CI/CD (0) | 2026.04.15 |
| [Airflow] - Sensor(BashSensor, FileSensor, ExternalTaskSensor, TriggerDagRunOperator) (0) | 2026.04.14 |
| [Airflow] - Custom Operator (0) | 2026.04.14 |
| [Airflow] - Airflow Xcom 5가지 방법 (0) | 2026.04.08 |