리팩토링을 한 기준
1. 일일 지표(DAU, CTR, CVR, ARPU)를 계산하는 배치 파이프라인을 만들면서
단순히 “지표를 계산하는 것”보다 다음을 더 중요하게 생각했다
- 재실행해도 안전한가?
- Airflow와 로컬 실행에서 동일하게 동작하는가?
- 지표가 늘어나도 구조를 크게 바꾸지 않아도 되는가?
초기 구현은 동작은 했지만,
Airflow에 바로 얹기에는 몇 가지 구조적인 문제가 있었다.
그래서 지표 계산 로직을 중심으로 리팩토링을 진행했다.
2. 지표 SQL 쿼리 설계
각 지표는 하루에 하나의 row만 반환하도록 설계했다.
공통 원칙은 다음과 같다.
- 모든 쿼리는 target_date 파라미터를 사용
- 결과 컬럼을 event_date + metric 값 형태로 통일
- 분모가 0이 될 수 있는 지표는 NULLIF, COALESCE로 방어
예를 들어 CTR, CVR, ARPU는 다음 특징을 가진다.
- CTR
- impression이 0일 경우 → 0 반환
- CVR
- 클릭 유저가 없을 경우 → 0 반환
- ARPU
- 활성 유저가 없을 경우 → 0 반환
이렇게 하면
SQL 단에서 에러를 최대한 제거하고,
Python에서는 단순 병합만 하면 된다.
3. 메트릭 집계 Python 로직 리팩토링
Python에서는 각 지표 SQL을 개별 실행한 뒤
event_date 기준으로 병합하는 구조를 사용했다.
리팩토링의 핵심 포인트는 다음이다.
(1) 날짜 처리 통일
- date → YYYY-MM-DD 문자열 변환을 단일 함수로 분리
- Airflow execution date / CLI date 모두 동일하게 처리
(2) pandas + SQLAlchemy 안정성 개선
pandas.read_sql_query() 실행 시
SQLAlchemy 엔진 인식 문제를 피하기 위해
engine.raw_connection()을 사용했다.
이로 인해:
- 환경 차이로 인한 실행 실패를 줄일 수 있었고
- Airflow worker에서도 동일하게 동작한다.
(3) idempotent한 저장 구조
지표 저장 시 다음 순서를 따른다.
- 해당 날짜의 기존 데이터 DELETE
- 새 지표 INSERT
DELETE는 트랜잭션으로 감싸
중간 실패 시 데이터가 꼬이지 않도록 했다.
이 구조 덕분에:
- Airflow retry
- 특정 날짜 재실행
이 모두 안전해졌다.
4. main.py 리팩토링 (CLI 실행용)
main.py는 실행 진입점 역할만 하도록 단순화했다.
- argparse에서 날짜 형식 검증
- 잘못된 날짜는 로직 실행 전에 실패
- 실제 비즈니스 로직은 save_daily_metric()에 위임
이렇게 분리함으로써:
- CLI 실행
- Airflow PythonOperator
에서 완전히 동일한 메트릭 로직을 재사용할 수 있다.
5. Airflow DAG 적용
Airflow DAG에서는 다음 원칙을 지켰다.
- DAG는 orchestration만 담당
- 지표 계산 로직은 Python 모듈에 위치
- Airflow context에서 날짜만 전달
즉,
Airflow는 “언제 실행할지”만 알고
“무엇을 계산하는지”는 모른다
라는 구조를 만들었다.
이 방식은:
- 테스트 용이성
- 로컬 디버깅
- 장기적인 유지보수
측면에서 큰 장점이 있다.
Metrics.py

Main.py

'데이터 엔지니어링' 카테고리의 다른 글
| [SQL + PYTHON] - RFID 실시간/배치 파이프라인 01 (0) | 2026.01.03 |
|---|---|
| [DBT] - Data build tool (1) | 2025.12.29 |
| [Airflow] - Dag 생성 (0) | 2025.12.18 |
| [SQL + Python]Idempotent -Pipeline (0) | 2025.12.17 |
| [SQL + Python] CVR,ARPU (0) | 2025.12.16 |