Airflow에서의 센서란?
센서는 오퍼레이터의 일종으로 특정 조건을 파악하는 것에 특화된 오퍼레이터라고 할 수 있다.미리 설정한 조건이 만족되기를 기다리고, 만족된다면 True를 반환하는 Task를 가지고 있다.
모든 센서는 BaseSensorOperator를 상속해 구현되며, 상속 시에는 __init()__ 함수와 poke(context) 함수의 재정의가 필요하다. 이 중 센싱(Sensing)하는 로직은 poke 함수에 정의되어야 한다.
Airflow센서 공식문서 참조
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html
Sensors — Airflow 3.2.0 Documentation
airflow.apache.org
BaseSensorOperator 기준 파라미터
| soft_fail (bool) | Timeout으로 인해 failure 처리 대신 skipped 처리 여부 |
| poke_interval (float) | sensor task를 작동시키는 주기(특정 조건 달성 확인 주기) (초 단위) |
| timeout (float) | 해당 sensor task가 fail로 처리될 최대 시간 (초 단위) DAG이 하루 단위일 경우 해당 값도 하루 단위로 넣는 경우가 다수 |
| mode (str) | 'poke' 또는 'reschedule' 중에 하나 선택 가능 poke: DAG 수행 내내 Running Slot을 차지함 (짧은 센싱 간격에 유리 / 초 단위) reschedule: 센서가 동작할 때에만 Slot을 차지 (긴 센싱 간격에 유리 / 분 단위) |
| exponential_backoff (bool) | True로 전달 시 task 주기가 2의 지수승 만큼 늘어남 (초반에는 자주, 후반에는 가끔) |
| max_wait (timedelta | float | None) | exponential_backoff 활성화에만 사용 가능 체크하는 주기의 상한선을 설정할 수 있음 (timedelta or 초 단위 입력) |
BashSensor

BashSensor는 Apache Airflow에서 제공하는 Sensor(감지용 태스크) 중 하나로,
특정 Bash 명령어가 “성공 상태(0 exit code)”를 반환할 때까지 계속 확인(polling)하는 역할을 합니다.
BashSensor는 보통 외부 상태 체크에 사용됩니다.
- 특정 파일이 생성될 때까지 대기
- 서버 상태 확인 (curl, ping)
- 특정 프로세스 완료 여부 확인
- DB 쿼리 결과 조건 만족 여부 체크
동작 방식
- 지정한 Bash 명령어 실행
- exit code 확인
- 0 → 성공 → 다음 Task 진행
- 0이 아님 → 실패 → 일정 시간 후 재시도
- 조건 만족할 때까지 반복
from airflow.sensors.bash import BashSensor
check_file = BashSensor(
task_id="check_file_exists",
bash_command="test -f /data/input/file.csv",
poke_interval=30, # 30초마다 체크
timeout=600 # 최대 10분 대기
)
- bash_command : 실행할 쉘 명령어
- poke_interval : 몇 초마다 체크할지
- timeout : 최대 대기 시간
- mode :
- poke → 계속 worker 점유 (기본)
- reschedule → 체크할 때만 실행 (리소스 절약)
BashSensor docs를 확인해보면 파라미터가 BashOperator와 비슷한 것을 알 수 있다. Sensor 또한 BaseOperator를 상속받아 만든 것이기 때문이다.
또한 Return값을 0으로 했을 때 True로 받아들인다고 한다. 다른 값이 출력되면 False 처리가 될 것이다.
파이썬에서 True 값을 줄 때 return True로 코드를 작성하듯이, 쉘 스크립트에서는 exit 0(0~255까지의 STATUS 값이 있지만 0만 정상의 의미)이 이와 같은 의미이다. 쉘 스크립트에서 마지막 명령 수행의 STATUS를 확인하려면 'echo $?'로 확인 가능하다.
BashSensor vs BashOperator
| 구분 | BashSensor | BashOperator |
| 목적 | 조건 대기 | 명령 실행 |
| 실행 방식 | 반복 체크 | 1회 실행 |
| 결과 기준 | exit code 0 될 때까지 기다림 | 실행 결과 그대로 |
FileSensor
FileSensor는 Apache Airflow에서 제공하는 Sensor 중 하나로,
특정 파일이 “존재할 때까지 기다리는” 감지용 태스크입니다.
데이터 파이프라인에서 파일 기반 트리거에 가장 많이 사용됩니다.
예시:
- ETL 전 단계에서 CSV/Parquet 파일 도착 대기
- 외부 시스템(SFTP, 로그 서버)에서 파일 생성 확인
- 배치 결과 파일 생성 후 다음 단계 실행

파라미터는 fs_conn_id, filepath, recursive, deferrable 총 4가지가 있다. fs_conn_id는 해당 파일에 대한 connection id를 말하는 것으로, FileSensor를 사용하기 전에 Connections에 등록이 필요한 것을 알 수 있다.
filepath는 센싱(체크)할 파일(폴더)의 경로(Connection에 입력한 Base 경로 뒤의 상대경로로 입력), recursive는 glob을 이용한 하위 파일 경로들의 list를 확인해 해당 파일이 해당 경로에 제대로 들어갔는지를 확인해줄 지의 여부를 뜻한다.
* File Connections 등록
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/data/input/file.csv",
poke_interval=60, # 1분마다 체크
timeout=3600, # 최대 1시간 대기
mode="reschedule"
- /data/input/file.csv 파일이 생길 때까지 대기
- 1분마다 체크
- 1시간 안에 안 생기면 실패
주요 옵션
- filepath : 확인할 파일 경로
- poke_interval : 체크 주기 (초)
- timeout : 최대 대기 시간
- mode
- poke → 계속 worker 점유
- reschedule → 체크할 때만 실행 (추천)
ExternalTaskSensor
ExternalTaskSensor는 Apache Airflow에서 제공하는 Sensor로,
다른 DAG 또는 다른 Task가 끝날 때까지 기다리는 센서입니다.
“외부 DAG/Task의 완료 상태를 감지해서 의존성을 만드는 Sensor”
ExternalTaskSensor은 Logical Date를 기준으로 upstream DAG의 수행을 판단한다. 스케줄링된 시간이 다르거나, 수동으로 DAG를 트리거 한 경우에는 Sensor가 제대로 동작하지 않게 된다.
아래와 같이 DAG의 Logical Date가 동일한 경우에만 ExternalTaskSensor는 upstream DAG가 수행된 것으로 판단한다는 점에 주의하자.


언제 쓰냐 (핵심 용도)
Airflow에서 DAG을 분리해서 운영할 때 필수입니다.
예시:
- A DAG (데이터 수집) → B DAG (데이터 처리)
- 팀별로 DAG 나눠서 운영할 때
- 공통 파이프라인 완료 후 후속 작업 실행
동작 방식
- 다른 DAG 또는 Task 상태 조회
- 상태 체크
- success → 다음 Task 실행
- failed → 실패 처리
- running → 계속 대기
- 조건 만족할 때까지 반복
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_task = ExternalTaskSensor(
task_id="wait_for_external_task",
external_dag_id="data_ingestion_dag",
external_task_id="load_data",
allowed_states=["success"],
poke_interval=60,
timeout=3600,
mode="reschedule"
)
- data_ingestion_dag의 load_data Task가 성공할 때까지 대기
- external_dag_id : 기다릴 DAG 이름
- external_task_id : 특정 Task (없으면 DAG 전체)
- allowed_states : 성공으로 인정할 상태 (기본: success)
- failed_states : 실패로 간주할 상태
- execution_date_fn : 실행 시간 매핑 (매우 중요 포인트)

그렇다면 각 DAG의 스케줄이 달라 Logical Date이 다른 경우에는 어떻게 해야 할까?
이때는 execution_delta를 사용해 Logical Date의 간격을 지정할 수 있다.
아래 예제에서는 execution_delta=timedelta(hours=1)를 적용해 1시간 전에 dag1이 수행되었는 지 확인한다.


execution_delta
import pendulum
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import timedelta
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "sensor"],
)
def external_task_sensor():
external_sensor = ExternalTaskSensor(task_id='wait_for_dag1',
external_dag_id='dag1',
external_task_id='task1',
mode='reschedule',
poke_interval=10,
timeout=60,
execution_delta=timedelta(hours=1)
)
external_sensor
external_task_sensor()
아래 그림은 ExternalTaskSensor에 execution_delta를 적용했을 때를 표현한 그림이다.

핵심 개념
1. 실행 시간 (execution_date) 매칭
ExternalTaskSensor는 단순히 “끝났냐”가 아니라
👉 같은 execution_date 기준으로 체크
예:
B DAG (00:05 실행)
→ B는 A의 **같은 날짜 실행(run)**을 기다림
2. Task vs DAG 레벨
- external_task_id 지정 → 특정 Task만 기다림
- 미지정 → DAG 전체 완료 기다림
- File/Bash → 외부 환경
- ExternalTaskSensor → Airflow 내부 의존성
TriggerDagRunOperator
DAG간 의존성이 존재할 경우에 사용되는 Operator로 다른 DAG를 트리거한다.
트리거된 DAG는 원래 스케줄과 별개로 실행된다.
Trigger되는 DAG는 아래와 같이 스케줄링이 활성화 되어 있어야 한다. 그렇지 않으면 Task가 실행되지 않는다.


아래는 TriggerDagRunOperator를 사용해 dag1를 실행시키는 예제이다. 사용 방법은 매우 간단하다.
trigger_dag_id에 트리거할 dag의 id만 입력해주면 된다.
아래 코드에서는 wait_for_completion을 True로 설정해 dag1이 종료되어야 trigger_dag1 task가 success 되도록 하여, dag1_finish_notify가 dag1 수행 종료 후에 수행되도록 설정했다. poke_interval은 dag1이 완료되었는 지를 확인하는 시간 간격이다.
import pendulum
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import timedelta
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "trigger"],
)
def trigger_dag():
trigger_dag1 = TriggerDagRunOperator(
task_id="trigger_dag1",
trigger_dag_id="dag1",
wait_for_completion=True,
poke_interval=10,
)
@task
def dag1_finish_notify():
print("dag1 finished")
trigger_dag1 >> dag1_finish_notify()
trigger_dag()
@dag(
schedule=None, # trigger되는 DAG는 schedule이 필요없다.
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "trigger"],
)
def dag1():
@task
def task1():
print("task1")
task1()
dag1()
wait_for_completion이 True인 경우에는 아래와 같이 동작합니다.

wait_for_completion이 True인 경우에는 아래와 같이 동작합니다.

DAG 1이 수행된 후에 DAG 2 와 DAG 3이 수행되어야 하는 경우에는 TriggerDagRunOperator 사용이 적절하고,
DAG 1과 DAG 2가 수행된 후에 DAG 3이 수행되어야 하는 경우에는 ExternalTaskSensor 사용이 적절하다.

TriggerDagRunOperator 파라미터들 (다른 DAG 실행시키는 오퍼레이터)
| trigger_dag_id (str) | 어떤 DAG를 실행할지 지정 |
| trigger_run_id (str | None) | 실행될 DAG Run의 고유 ID (중복 실행 제어 / 추적용) |
| execution_date (str | datetime | None) | 실행 기준 시간 (logical date) |
| conf (dict | None) | 실행 설정 전달 (트리거되는 DAG에 파라미터 전달) |
| reset_dag_run (bool) | 기존 실행 처리 (동일 run_id가 이미 있을 때 처리 방식) |
| wait_for_completion (bool) | 트리거 후 끝날 때까지 기다릴지 여부 (완료 대기 옵션) |
| poke_interval |
상태 체크 주기 (초)
wait_for_completion=True일 때만 사용 poke_interval=60 # 1분마다 체크 |
| allowed_states (list[str]) | 성공/실패 조건 이 상태가 되면 성공 처리 ['success'] |
| failed_states (list[str]) | 실패로 간주할 상태 ['failed', 'upstream_failed'] |
| deferrable (bool) | 대기 방식 선택 (리소스 최적화 핵심) False = Worker가 계속 점유 True = Triggerer로 넘겨서 슬롯 해제 |

'🔥 Data Engineer > Airflow' 카테고리의 다른 글
| [Airflow] - DAG CI/CD (0) | 2026.04.15 |
|---|---|
| [Airflow] - Task 분기처리방법(@task.branch,BranchPythonOperator ,BaseBranchOperator) (0) | 2026.04.14 |
| [Airflow] - Custom Operator (0) | 2026.04.14 |
| [Airflow] - Airflow Xcom 5가지 방법 (0) | 2026.04.08 |
| [Airflow] - Airflow Connection (0) | 2026.04.08 |