데이터 엔지니어링

[Refactoring] - DAU·CTR·CVR·ARPU 배치 파이프라인

jyu_seo_ 2025. 12. 19. 22:11

리팩토링을 한 기준 

 

1. 일일 지표(DAU, CTR, CVR, ARPU)를 계산하는 배치 파이프라인을 만들면서
단순히 “지표를 계산하는 것”보다 다음을 더 중요하게 생각했다

 

 

  • 재실행해도 안전한가?
  • Airflow와 로컬 실행에서 동일하게 동작하는가?
  • 지표가 늘어나도 구조를 크게 바꾸지 않아도 되는가?

초기 구현은 동작은 했지만,
Airflow에 바로 얹기에는 몇 가지 구조적인 문제가 있었다.
그래서 지표 계산 로직을 중심으로 리팩토링을 진행했다.

 

2. 지표 SQL 쿼리 설계

각 지표는 하루에 하나의 row만 반환하도록 설계했다.

공통 원칙은 다음과 같다.

  • 모든 쿼리는 target_date 파라미터를 사용
  • 결과 컬럼을 event_date + metric 값 형태로 통일
  • 분모가 0이 될 수 있는 지표는 NULLIF, COALESCE로 방어

예를 들어 CTR, CVR, ARPU는 다음 특징을 가진다.

  • CTR
    • impression이 0일 경우 → 0 반환
  • CVR
    • 클릭 유저가 없을 경우 → 0 반환
  • ARPU
    • 활성 유저가 없을 경우 → 0 반환

이렇게 하면
SQL 단에서 에러를 최대한 제거하고,
Python에서는 단순 병합만 하면 된다.

 

3. 메트릭 집계 Python 로직 리팩토링

Python에서는 각 지표 SQL을 개별 실행한 뒤
event_date 기준으로 병합하는 구조를 사용했다.

리팩토링의 핵심 포인트는 다음이다.

(1) 날짜 처리 통일

  • date → YYYY-MM-DD 문자열 변환을 단일 함수로 분리
  • Airflow execution date / CLI date 모두 동일하게 처리

(2) pandas + SQLAlchemy 안정성 개선

pandas.read_sql_query() 실행 시
SQLAlchemy 엔진 인식 문제를 피하기 위해
engine.raw_connection()을 사용했다.

이로 인해:

  • 환경 차이로 인한 실행 실패를 줄일 수 있었고
  • Airflow worker에서도 동일하게 동작한다.

(3) idempotent한 저장 구조

지표 저장 시 다음 순서를 따른다.

  1. 해당 날짜의 기존 데이터 DELETE
  2. 새 지표 INSERT

DELETE는 트랜잭션으로 감싸
중간 실패 시 데이터가 꼬이지 않도록 했다.

이 구조 덕분에:

  • Airflow retry
  • 특정 날짜 재실행
    이 모두 안전해졌다.

4. main.py 리팩토링 (CLI 실행용)

main.py는 실행 진입점 역할만 하도록 단순화했다.

  • argparse에서 날짜 형식 검증
  • 잘못된 날짜는 로직 실행 전에 실패
  • 실제 비즈니스 로직은 save_daily_metric()에 위임

이렇게 분리함으로써:

  • CLI 실행
  • Airflow PythonOperator
    에서 완전히 동일한 메트릭 로직을 재사용할 수 있다.

5. Airflow DAG 적용

Airflow DAG에서는 다음 원칙을 지켰다.

  • DAG는 orchestration만 담당
  • 지표 계산 로직은 Python 모듈에 위치
  • Airflow context에서 날짜만 전달

즉,

Airflow는 “언제 실행할지”만 알고
“무엇을 계산하는지”는 모른다

라는 구조를 만들었다.

이 방식은:

  • 테스트 용이성
  • 로컬 디버깅
  • 장기적인 유지보수
    측면에서 큰 장점이 있다.

 

Metrics.py

metrics.py에서 airflow/ cli 테스트코드에서 date, datetime, str이 섞여 들어올수있는 방지 및, SQL 파라미터 타입 불일치로 인한 에러방지

 

 

Main.py

main.py는 실행 진입점 역할만 하도록 단순화 하였다.

 

'데이터 엔지니어링' 카테고리의 다른 글

[SQL + PYTHON] - RFID 실시간/배치 파이프라인 01  (0) 2026.01.03
[DBT] - Data build tool  (1) 2025.12.29
[Airflow] - Dag 생성  (0) 2025.12.18
[SQL + Python]Idempotent -Pipeline  (0) 2025.12.17
[SQL + Python] CVR,ARPU  (0) 2025.12.16