본문 바로가기
🔥 Data Engineer/Airflow

[Airflow] - Sensor의 Reschedule & poke & Deferrable Operator

by jyu_seo_ 2026. 4. 17.

서론

airflow 공식 블로그에서도 각 모드에 대해 어떻게 동작이 달라지는지 설명이 나와 있지 않아 실제 코드를 보며 설명을 해보겠습니다

 

Slot&Pool

Pool= 리소스를 보호하고 실행량을 통제하는 핵심 장치

겉으로는 쉬워 보이지만, 제대로 이해 못 하면 Queue 쌓이고 DAG 멈추는 원인 1순위가 된다.

Pool (이름)
 └─ Slots (최대 실행 개수)

 

Task가 실행될때

1. Pool 확인
2. Slot 남아있나?
   → YES → 실행 (Running)
   → NO → 대기 (Queued)

Slot = Worker가 동시에 실행할 수 있는 작업 개수 (리소스 단위)

Slot 하나 = CPU/Worker 실행 자리 하나라고 보면 됨

 

오른쪽 큰 박스 = Pool (슬롯 자원 풀)

그 안이 3가지 상태로 나뉨:

  • Running Slot → 현재 실행 중 (worker 점유)
  • Queued Slot → 실행 대기
  • Scheduled Slot → 아직 실행 준비 단계

poke 모드

Sensor에서 poke를 설정하고 해당 sensor가 실행되면 airflow UI에서 다음과 같은 로그를 볼 수 있습니다.

[2023-04-10, 11:22:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ... 
[2023-04-10, 11:23:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ...

 

poke모드

센서 Task가 실행되면 Airflow Worker 슬롯 하나를 계속 차지하고 있습니다.

poke_interval (초 단위) 간격으로 주기적으로 조건을 계속 확인합니다.

조건이 충족될 때까지 Worker는 다른 일을 하지 못하고 이 센서에만 매달려 있습니다.

조건 확인 간격이 짧고 즉각적인 반응이 필요할 때 좋습니다.

기다리는 시간이 길어지면 Worker 리소스를 낭비하게 됩니다. 장시간 실행되는 센서가 많아지면 모든 Worker가 센서를 기다리느라 다른 Task를 실행하지 못하는 교착 상태(Deadlock)에 빠질 수 있습니다.

  • Sensor가 활성 상태를 유지
  • 설정된 poke_interval 주기마다 조건 확인
  • Sensor가 대기 중인 동안 Worker 슬롯 점유
  • 기본 동작 모드

 

Reschedule 모드

센서 Task가 실행되어 조건을 한번 확인합니다.

조건이 충족되지 않으면, 자신의 상태를 'up_for_reschedule'로 바꾸고 Worker 슬롯을 즉시 반납(Free)합니다.

poke_interval 시간이 지난 후, 스케줄러에 의해 다시 스케줄링되어 새로운 Worker 슬롯을 할당받아 조건을 다시 확인합니다.

기다리는 동안 Worker 리소스를 낭비하지 않으므로, 장시간 대기하는 센서에 매우 효율적입니다.

스케줄러의 부하가 약간 증가할 수 있고, 조건 확인 주기가 poke 모드보다 덜 즉각적일 수 있습니다.

 

정리하면, 몇 분 이내의 짧은 대기는 poke 모드를, 수십 분에서 몇 시간 이상 길게 기다려야 하는 작업은 반드시 reschedule 모드를 사용하는 것이 좋습니다.

  • Sensor가 일정 시간동안 비활성 상태로 유지
  • poke_interval 간격으로 다시 활성화되어 조건 확인
  • Sensor가 대기 중일 때는 Worker 슬롯을 반환하여 다른 작업에 사용 가능
  • 리소스 효율적
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=10,  # 10초 간격으로 상태 확인
    timeout=600,  # 10분 동안 대기
    mode='reschedule'  # Reschedule 모드 활성화
)
비교 Poke Reschedule
원리 DAG이 수행되는 내내 Running Slot을 차지함. 다만 Slot 안에서 Sleep, active를 반복함 센서가 동작하는 시기에만 Slot을 차지함. 그 외는 Slot을 차지하지 않음
Wait에서 상태 running up_for_reschedule
유리한 적용 시점 짧은 센싱 간격 (interval, 초단위) 긴 센싱 간격, 주로 분단위 Reschedule 될때 스케줄러의 부하 발생

 

poke모드(기본값) : 센서가 태스크 슬롯을 계속 점유하면서 주기적으로 상태를 확인한다. 조건이 충족될때까지 태스크 슬롯을 잡고 있음.

reschedule 모드 : 센서가 상태를 확인한 후 태스크 슬롯을 반환하고, 다음 체크 시간에 다시 슬롯을 요청한다. 리소스가 효율적이다.

 


Deferrable Operator

Deferrable 의 의미

 

“Deferrable”은 “연기할 수 있는” 또는 “지연 가능한”이라는 의미를 갖고 있습니다.

Deferrable Operator는 기다리는 동안 Worker를 점유하지 않는 Operator입니다.

 

기존의 Operator: 작업이 대기 상태에 있을 때도 워커를 점유하면서 리소스를 사용하지만,
Deferrable Operator: 작업을 중지하고 필요할 때 다시 시작할 수 있어 워커 리소스를 절약합니다
이를 통해 시스템의 리소스 효율성을 높이고, 더 많은 작업을 동시에 처리할 수 있습니다.
Airflow에서 특정 Task를 수행한다는 것은 기본적으로 워커의 슬롯을 사용하는 것을 의미합니다.
하지만, Deferrable Operator는 워커에서 동작하는 것이 아닌 Triggerer라는 워커에서 동작합니다.
Triggerer 워커는 Airflow의 Optional Component로 Airflow의 구성에 꼭 필요한 컴포넌트는 아닙니다.

기존 vs Deferrable 비교

구분 기존 Operator Deferrable
대기 방식 Worker 점유 비동기 대기
리소스 많이 사용 적게 사용
확장성 낮음 높음
복잡도 단순 복잡

Poke vs Reshedule vs Deferable

구분 poke reschedule Deferrable
Worker 점유 계속 반환 반환
방식 반복 체크 재실행 이벤트
효율
Scheduler 부하 낮음 높음 낮음
최신성 구식 중간 최신

poke모드일때는 worker가 기다리고 Deferrable일때는 Trigger가 기다립니다.

👉 Worker가 기다림 = 리소스를 붙잡고 멈춰있음
👉 Trigger가 기다림 = 이벤트만 감시하고 리소스는 안 씀

Worker가 기다리는 방식

Worker
 └── Task 실행
       └── 계속 대기 (sleep 반복)
       
       
 조건 확인 → 아직 아님 → sleep → 다시 확인 → 반복

Trigger가 기다리는 방식

Worker
 └── Task 시작 → defer → 종료

Triggerer
 └── Trigger가 조건 감시

 

비유를하면 "직원이 자리를 떠나고, 알림이 울리면 다시돌아오는 구조"입니다.

Task 100개 대기 → Worker 100개 필요
Task 1000개 대기 → Triggerer 1개로 가능


Worker 대기 VS Trigger 대기

구분 Worker 대기 Trigger 대기
리소스 사용 계속 사용 ❌ 거의 없음 ✅
방식 polling (반복 체크) event 기반
확장성 낮음 매우 높음
비용 높음 낮음
구조 blocking async

 

Deferrable Operator를 사용해 워커에 영향을 주지 않도록 작성된 프로세스들이 비동기로 동작하기 위한 컴포넌트입니다. 따라서 Deferrable Operator를 사용하지 않는다면 낭비되는 리소스입니다.

아래의 실험을 위해 GCP에서 제공하는 Managed Airflow인 Composer를 구성했습니다. 이때 아래 사진과 같이 Triggered도 함께 생성했습니다.

 

Deferrable Operator 사용 및 작성

Airflow에서 기본적으로 제공하는 Operator 중에도 Deferrable Operator가 있습니다.
TimeSensorAsync, SqsSensor Operator와 같이 비동기로 동작할 수 있는 Sensor Operator들이 있습니다.

  • TimeSensorAsync의 경우 이름에서 알 수 있듯이 비동기로만 동작합니다. (<-> TimeSensor는 동기로 동작합니다.)
  • SqsSensor의 경우 동기 혹은 비동기를 선택할 수 있습니다. (argument 중 deferrable=True 혹은 False로 설정할 수 있습니다.)

TimeSensorAsync

import datetime
 
from airflow import DAG
from airflow.sensors.time_sensor import TimeSensorAsync
 
 
with DAG(
    "deferrable_dag",
    start_date=datetime.datetime(2024, 1, 1),
    catchup=False,
) as dag:
    t1 = TimeSensorAsync(
        task_id="wait_5_minutes",
        target_time=(
            datetime.datetime.now(tz=datetime.timezone.utc)
            + datetime.timedelta(minutes=5)
        ).time(),
    )

 

위 코드는 5분간 대기하는 Task입니다. 해당 코드가 Sync로 동작한다면 워커의 Slot을 점유하면서 5분간 기다릴 것입니다.

아래는 동기로 동작하는 TimeSensor의 코드를 일부 가져왔습니다.

class TimeSensor(BaseSensorOperator):
def __init__(self, *, target_time: datetime.time, **kwargs) -> None:
        super().__init__(**kwargs)
        self.target_time = target_time
 
def poke(self, context: Context) -> bool:
        self.log.info("Checking if the time (%s) has come", self.target_time)
        return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time

 

poke 메서드는 현재 시간과 target_time을 비교해서 True 혹은 False를 반환합니다.

airflow는 Operator는 execute 메서드를 통해 작업을 수행하는데 BaseSensorOperator의 execute 메서드의 일부는 아래와 같습니다.

def execute(self, context: Context) -> Any:
    while True:
        try:
            poke_return = self.poke(context)
 
        if poke_return:
            break

 

무한루프를 돌면서 앞서 구현된 poke 메서드를 실행해 특정 시간이 되었는지 확인합니다. -> 리소스를 점유 ❗

하지만, TimeSensorAsync는 워커 노드에서 동작하는 것이 아닌 Triggered에서 실행되며 비동기로 동작합니다.

TimeSensorAsync의 execute 메서드부터 천천히 코드를 살펴보겠습니다.

 def execute(self, context: Context) -> NoReturn:
        self.defer(
            trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
            method_name="execute_complete",
        )

 

self.defer 메서드는 해당 Task를 deferred 상태로 변경하고 작업의 실행을 중단시킵니다. 인자로 넘긴 Trigger가 특정 이벤트를 발생시킬 때까지 대기합니다.

그리고 정의된 DateTimeTrigger는 async def run으로 비동기 메서드가 정의되었고 이를 Triggered에서 실행합니다.

async def run(self) -> AsyncIterator[TriggerEvent]:
        for step in 3600, 60, 10:
            seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
            while seconds_remaining > 2 * step:
                self.log.info("%d seconds remaining; sleeping %s seconds", seconds_remaining, step)
                await asyncio.sleep(step)
                seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
        ...

특정 시간임을 알기 위해선 시간을 계속해서 검사할 수밖에 없긴 하지만, 지수 백오프 기법을 사용해 대기 시간을 줄이고 있고 해당 대기 또한 비동기로 동작합니다.

해당 메서드는 Custom Deferrable Operator 정의 시 직접 구현해줘야 하는 부분입니다.

그리고 Triggered 프로세스에서 실행을 확인하기 위해 해당 Pod의 로그를 확인해 보겠습니다.

저의 경우 ex) kubectl logs -f -n composer-2-9-7-airflow-2-9-3-1a3758bc airflow-triggerer-7bcc7bb8f-r56lb

 

한 가지 더 알고 있으면 좋은 점은 Custom Deferrable Operator를 작성할 때 Triggered Pod에서는 DAG Folder를 마운트 하지 않는다는 점입니다. 따라서 정의한 Trigger 클래스에서 Dag folder 내의 파일을 참조한다면 참조 에러가 발생합니다. 이는 Triggered에서 독립적으로 일관성 있게 작업을 수행하기 위한 제약입니다.

airflow에서 SqsSensor를 사용하면서 궁금했던 Deferrable에 대해 알아보았습니다. 사실 위 내용을 작성하면서도 정확하게 이해하지 못하고 있는 부분은 있습니다.

  • def serialize 메서드를 통해 직렬화되어 Triggered에 전달될 텐데 어떻게 전달되는지?
  • worker에서도 비동기로 실행되게끔 할 수 있을 텐데, 꼭 Triggered로 분리할 필요가 있는지?

주의사항

  • 장시간 대기 시
    • poke_interval, timeout 값의 적절한 조정 필요
    • Sensor의 과도한 사용으로 인해 Scheduler 성능에 영향 발생 가능
  • 리소스 효율성
    • 리소스 효율을 위해 poke가 아닌 reschedule 모드 권장

References

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#writing-deferrable-operators

 

Deferrable Operators & Triggers — Airflow 3.2.0 Documentation

 

airflow.apache.org