Airflow로 파이프라인을 생성하여 기존에 만들었던 파이프라인을 airflow에 연동시키는 과정을 공부해봤다.
일단 wsl을 이용해서 db.py main.py queris.py 등등 wsl를 활용해 디렉토리에 저장해두고

airflow dags list 를 활용해서 해당 파이프

라인이 잘 들어갔는지 리스트도 체크해준다
실제로 들어가있는지 Airflow에 admin계정을 생성해서 dags에 있는지 체크


DAG를 연결할때 python에서 dags파일을 만든 과정이다
이코드의 목적은 매일 한번 지표를 계산/저장을 자동화하는 목적이다
- 매일 특정날짜 기준으로
- save_daily_metric(target_date)를 실행해서
- DB/파일/어딘가에 "일 단위 지표"를 저장하는 작업을 정해진 스케쥴로 자동 실행하게 만드는 것이다.
즉, 수동으로 매번 파이썬 실행하는 걸 Airflow에 맡기는 자동화 파이프라인이다
Airflow에서 “DAG”가 뭔데?
Airflow는 작업을 3단계 개념으로 본다
- DAG: 작업 흐름(워크플로우) 전체의 설계도
- Task: DAG 안의 개별 작업(한 단계)
- Operator: Task를 “어떤 방식으로 실행할지” 정해주는 실행 타입(예: PythonOperator)
코드의 흐름 Airflow 관점으로 해석
1.import & 함수 준비

2.default_args

3.run_daily_metric(**context)

4. DAG

5. Task

AIRFLOW - DAG 연결되는 과정의 실제 흐름
- Scheduler/DAG Processor가 DAG 폴더를 주기적으로 스캔
- 새 .py 파일을 발견하면 파이썬으로 import 해봄(= 파싱)
- 파싱에 성공하면 Airflow DB에 DAG 메타정보(dag_id, task 구조, 스케줄) 를 등록
- Scheduler가 스케줄(@daily)과 start_date/catchup을 보고
실행해야 할 “DAG Run(날짜 단위 실행 인스턴스)”을 생성 - 각 DAG Run마다 task instance를 만들고, 실행 큐에 넣음
- Worker가 task를 가져가서 PythonOperator로 run_daily_metric 실행
- run_daily_metric에서 context["ds"]를 꺼내서 save_daily_metric(ds) 수행
- 성공/실패 결과가 다시 Airflow DB에 기록되고 UI에서 확인 가능
'데이터 엔지니어링' 카테고리의 다른 글
| [DBT] - Data build tool (1) | 2025.12.29 |
|---|---|
| [Refactoring] - DAU·CTR·CVR·ARPU 배치 파이프라인 (0) | 2025.12.19 |
| [SQL + Python]Idempotent -Pipeline (0) | 2025.12.17 |
| [SQL + Python] CVR,ARPU (0) | 2025.12.16 |
| [SQL + Python] DAU,CTR (0) | 2025.12.15 |