Step2 ㅡ> Airflow로 일일 스냅샷 만들기: Airflow로 일일 스냅샷 만들기
왜 “스냅샷 테이블”이 필요한가?
https://docs.cloud.google.com/composer/docs/composer-3/save-load-snapshots?hl=ko
환경 스냅샷 저장 및 로드 | Cloud Composer | Google Cloud Documentation
2026년 9월 15일에 모든 Cloud Composer 1 버전과 Cloud Composer 2의 버전 2.0.x가 지원 종료 예정일에 도달합니다. 이러한 버전에서는 환경을 사용할 수 없습니다. Cloud Composer 3으로 마이그레이션하는 것이
docs.cloud.google.com
Airflow 스냅샷은 주로 GCP의 Cloud Composer 환경에서 제공하는 기능으로, 환경의 현재 상태(DAG, 설정, DB 등)를 저장하고 복원하여 백업, 마이그레이션, 테스트 등에 활용된다. 스냅샷은 Airflow UI 또는 gcloud 명령어로 생성하며, 생성 시 DAG 실행 기록이 복원되어 중복 실행 가능성 등 주의가 필요하며, 이는 환경의 안정적인 관리와 빠른 복구를 위한 중요한 기능이다.
실시간 재고(inventory_current)는 “지금 값”만 들고 있다
- 어제 재고가 얼마였는지
- 일주일 동안 어떻게 변했는지
같은 히스토리 분석이 어렵다.
그래서 매일 1번
- 이벤트 원장(rfid_events)을 기준으로
- “해당 날짜까지 누적 재고”를 계산해서
- inventory_snapshot_daily에 저장한다.
즉
- rfid_events = 진실(append-only)
- inventory_current = 실시간(online)
- inventory_snapshot_daily = 분석용(history)
스냅샷을 만드는 SQL 아이디어
insert into inventory_snapshot_daily(date, sku, location, qty_on_hand)
select
date(event_ts) as date,
sku,
location,
sum(
case
when event_type='IN' then qty
when event_type='OUT' then -qty
else qty
end
) as qty_on_hand
from rfid_events
where date(event_ts) <= date('{{ ds }}')
group by 1,2,3
on conflict (date, sku, location) do update
set qty_on_hand = excluded.qty_on_hand,
computed_at = now();
IN은 +, OUT은 -, ADJUST는 그대로 반영
Airflow DAG로 묶기(일일 배치)
Airflow에서 이 SQL을 매일 실행하도록 DAG로 만든다.
- DAG 이름: inventory_snapshot_daily
- 스케줄: 하루 한 번 (예: 새벽 1시 10분)
- 태스크: snapshot_daily 하나(=SQL 실행)
POINT: Airflow는 “정해진 시간마다” DB 변환 작업을 안정적으로 반복하는 데 강하다.
내가 실제로 겪은 트러블슈팅 (Airflow)
1) DAG 파일을 어디에 넣어야 하나?
컨테이너 안 /opt/airflow/dags는 사실 호스트 폴더가 마운트된 경로였다.
컨테이너 내부에서 파일을 작성하려다 Permission denied가 발생했다.
해결
- docker inspect로 마운트된 호스트 경로를 확인하고
- 그 호스트 경로에 DAG 파일을 생성해야 한다.
2) days_ago ImportError(Airflow 3)
Airflow 3 환경에서 아래 import가 실패했다.
from airflow.utils.dates import days_ago
해결
- pendulum 기반 start_date로 변경하는 방식이 안전하다.
Airflow로 출고 예측 만들기: shipment_forecast_daily (MA7)
왜 예측 테이블이 필요한가?
RFID 이벤트는 과거 데이터다.
하지만 운영에서는 “내일 출고량이 어느 정도 될지”가 중요하다.
그래서 우선 가장 쉬운 예측 모델인MA7(최근 7일 이동평균) 으로 오늘 기준 예측값을 저장한다.
MA7 예측 로직(개념)
- rfid_events에서 event_type = 'OUT'만 뽑는다
- 날짜별 OUT 합계를 만든다
- 최근 7일 평균을 계산한다
- 오늘 날짜 예측으로 shipment_forecast_daily에 upsert 한다
예측 결과는 “정답”이라기보다
- 파이프라인이 예측 산출 → 적재 → 시각화까지 되는지 확인하는 목적이 크다.
예측값이 0으로 나오는 이유(내 케이스)
나는 OUT 이벤트가 1건뿐이라
7일 평균이 0.xx가 되었고 integer로 저장하면서 0이 되었다.
이건 버그가 아니라 데이터가 적어서 생기는 자연스러운 결과다.
Metabase로 시각화하기 (DB 연결 + 대시보드)

Metabase를 왜 쓰나?
- Postgres에 쌓인 데이터를 “비개발자도” 쉽게 볼 수 있게 해준다.
- SQL을 직접 입력해도 되고,
- GUI로 클릭해서 차트를 만들 수도 있다.
Postgres 연결 정보(내 환경 기준)
Metabase가 도커 컨테이너라면 Postgres 연결은 보통 2가지 방식이 있다.
(권장) 컨테이너 네트워크 내부로 붙기
- Host: postgres (docker compose 서비스명)
- Port: 5432
- DB: warehouse
- User: app
- Password: app_pw
(대안) host.docker.internal
컨테이너에서 호스트 포트를 타고 접근하는 방식.
- Host: host.docker.internal
- Port: 5432
포트폴리오 관점에서 4가지 카드를 만들어보았다.
카드 1) 현재 재고(Inventory Current)
- 테이블: inventory_current
- 질문: SKU/Location별 qty_on_hand
카드 2) 이벤트 원장(Events Ledger)
- 테이블: rfid_events
- 질문: event_type별 건수, 시간순 리스트
카드 3) 일일 스냅샷(History)
- 테이블: inventory_snapshot_daily
- 질문: 날짜별 재고 추이(라인/막대)
카드 4) 예측 vs 실제 비교
- 실제: OUT 합계(일별)
- 예측: MA7 예측값
- 한 화면에 두 개를 같이 보여주면 “예측 파이프라인”이 완성됨

정리: 지금까지 만든 것들을 요약해보겠다.
1.Kafka 토픽으로 RFID 이벤트를 받는다.
2.Python Consumer가 이벤트를 읽고 Postgres에 원장으로 저장한다.
3.동시에 현재 재고 테이블을 실시간으로 업데이트한다.
4.Airflow가 매일 스냅샷과 예측 결과를 생성한다.
5.Metabase로 실시간/배치 결과를 한눈에 모니터링한다.
트러블슈팅 모음 및 이슈
- WSL에서 docker 명령어 안됨
→ Docker Desktop WSL Integration 켜기 - Kafka bitnami 태그 pull 실패
→ confluentinc 이미지로 변경 - zookeeper 컨테이너 이름 충돌
→ 기존 컨테이너 삭제/compose down - docker compose exec “no configuration file”
→ compose 파일 있는 폴더에서 실행 - Airflow days_ago import 에러
→ pendulum start_date로 변경 - Metabase 차트가 비어 보임
→ 데이터 포인트 수 / 날짜필터 / sync 문제 확인
다음에 연습해볼 내용은
- enable.auto.commit=False + DB commit 후 수동 commit으로 Step3 안정화(실무형)
- FastAPI로 “현재 재고 조회 API” 만들고 Redis 캐시 붙이기
- dbt로 스냅샷/예측 SQL을 모델링해서 “변환 관리”까지 확장
이정도 생각하고있다. 사실 포트폴리오기준이라 구글링과 안될때 에러들을 찾아보고 어떤게 이슈가있고 문제가 되는지
적어가면서 하고있는데 생각보다 wsl쪽에서 많이 약한거같다. 좀더 복습하고 연습해서 같은실수를 번복하지 말아야겠다.
Airflow에서 Dag생성시 bash에서 느꼈던 점들을 적어보겠다.
'데이터 엔지니어링' 카테고리의 다른 글
| [Redis] - RFID 기반 실시간 재고 조회 API 구축 (0) | 2026.01.10 |
|---|---|
| [WSL + Airflow] RFID 실시간/배치 파이프라인 03 - Dag 코드(스냅샷/예측) (0) | 2026.01.03 |
| [SQL + PYTHON] - RFID 실시간/배치 파이프라인 01 (0) | 2026.01.03 |
| [DBT] - Data build tool (1) | 2025.12.29 |
| [Refactoring] - DAU·CTR·CVR·ARPU 배치 파이프라인 (0) | 2025.12.19 |