데이터 엔지니어링

[Airflow] - Dag 생성

jyu_seo_ 2025. 12. 18. 23:49

Airflow로 파이프라인을 생성하여 기존에 만들었던 파이프라인을 airflow에 연동시키는 과정을 공부해봤다.

일단 wsl을 이용해서 db.py main.py queris.py 등등 wsl를 활용해 디렉토리에 저장해두고

 

해당 내용은 ls root를 활용해서 py파일들이 저장되어있는지 확인하는 과정이다.

airflow dags list 를 활용해서 해당 파이프

라인이 잘 들어갔는지 리스트도 체크해준다

 

실제로 들어가있는지 Airflow에 admin계정을 생성해서 dags에 있는지 체크

Airflow에 파이프라인이 들어가있는걸 확인할수 있다.

 

 

 

DAG를 연결할때 python에서 dags파일을 만든 과정이다

 

이코드의 목적은 매일 한번 지표를 계산/저장을 자동화하는 목적이다

- 매일 특정날짜 기준으로

- save_daily_metric(target_date)를 실행해서

- DB/파일/어딘가에 "일 단위 지표"를 저장하는 작업을 정해진 스케쥴로 자동 실행하게 만드는 것이다.

 

즉, 수동으로 매번 파이썬 실행하는 걸 Airflow에 맡기는 자동화 파이프라인이다

 

Airflow에서 “DAG”가 뭔데?

Airflow는 작업을 3단계 개념으로 본다

  • DAG: 작업 흐름(워크플로우) 전체의 설계도
  • Task: DAG 안의 개별 작업(한 단계)
  • Operator: Task를 “어떤 방식으로 실행할지” 정해주는 실행 타입(예: PythonOperator)

 

 

 

코드의 흐름 Airflow 관점으로 해석

 

1.import & 함수 준비

실제 비즈니스 로직(지표 저장)은 metrics.py에 있고, DAG 파일은 “언제/어떻게 실행할지”만 정의한다.


2.default_args

실패했을 때 재시도 정책 같은 “공통 실행 설정”이다. depends_on_past=False → 어제 실패했다고 오늘까지 막지 않게한다.

 

3.run_daily_metric(**context)

Airflow가 태스크 실행 시 컨텍스트(context) 를 넘김 ds는 Airflow에서 자주 쓰는 “실행 날짜(YYYY-MM-DD)” 문자열 그래서 오늘 실행이 아니라, ‘이 DAG Run이 대표하는 날짜’를 기준으로 지표를 계산함

 

4. DAG

dag_id: UI에서 보이는 DAG 이름(검색 키) schedule="@daily": 매일 실행 start_date: 스케줄의 기준 시작점 catchup=True: start_date부터 “밀린 날짜들”을 과거부터 차례로 실행할 수 있게 함 과거 데이터를 채우려면 켜고, “오늘부터만”이면 catchup=False가 편함.

5. Task

task_id는 UI에서 task 노드 이름 python_callable에 등록된 함수를 Airflow worker가 실행

 

AIRFLOW - DAG 연결되는 과정의 실제 흐름

 

 

  • Scheduler/DAG Processor가 DAG 폴더를 주기적으로 스캔
    • 새 .py 파일을 발견하면 파이썬으로 import 해봄(= 파싱)
  • 파싱에 성공하면 Airflow DB에 DAG 메타정보(dag_id, task 구조, 스케줄) 를 등록
  • Scheduler가 스케줄(@daily)과 start_date/catchup을 보고
    실행해야 할 “DAG Run(날짜 단위 실행 인스턴스)”을 생성
  • 각 DAG Run마다 task instance를 만들고, 실행 큐에 넣음
  • Worker가 task를 가져가서 PythonOperator로 run_daily_metric 실행
  • run_daily_metric에서 context["ds"]를 꺼내서 save_daily_metric(ds) 수행
  • 성공/실패 결과가 다시 Airflow DB에 기록되고 UI에서 확인 가능

 

 

'데이터 엔지니어링' 카테고리의 다른 글

[DBT] - Data build tool  (1) 2025.12.29
[Refactoring] - DAU·CTR·CVR·ARPU 배치 파이프라인  (0) 2025.12.19
[SQL + Python]Idempotent -Pipeline  (0) 2025.12.17
[SQL + Python] CVR,ARPU  (0) 2025.12.16
[SQL + Python] DAU,CTR  (0) 2025.12.15