Airflow DAG 코드(스냅샷/예측) 환경(Airflow 3, days_ago 이슈 반영)기준으로 정리해볼려한다.
이번글은 wsl에서 너무 헤매고 했어서 기록용으로 남겨둘려고한다.
Airflow Connection 생성(컨테이너 안에서 1회)
Postgres가 docker compose로 0.0.0.0:5432->5432 열려 있으니,
Airflow 컨테이너에서는 host.docker.internal:5432로 붙는 게 가장 간단함.
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow connections add rfid_postgres \
--conn-type postgres \
--conn-host host.docker.internal \
--conn-port 5432 \
--conn-schema warehouse \
--conn-login app \
--conn-password app_pw"
확인 코드
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow connections get rfid_postgres"
DAG 파일을 어디에 넣어야 하는가?
- 호스트(WSL): /root/dags
- 컨테이너: /opt/airflow/dags
확인용
docker inspect root-airflow-dag-processor-1 \
--format '{{range .Mounts}}{{println .Source " -> " .Destination}}{{end}}' | grep dags
DAG 파일은 WSL에서 /root/dags에 생성하면 된다.
sudo mkdir -p /root/dags
DAG #1: 일일 재고 스냅샷 inventory_snapshot_daily.py
- 이벤트 원장 rfid_events를 기반으로
- 해당 날짜(=Airflow logical date, ds) 기준 재고를 계산해서
- inventory_snapshot_daily에 upsert(있으면 갱신)한다.
sudo tee /root/dags/inventory_snapshot_daily.py > /dev/null <<'PY'
import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# DAG 타임존을 KST로 두면 ds가 한국 날짜로 떨어짐
KST = pendulum.timezone("Asia/Seoul")
# Airflow Connection ID (미리 만들어둔 rfid_postgres)
CONN_ID = "rfid_postgres"
SNAPSHOT_SQL = """
/*
inventory_snapshot_daily:
- ds(논리 실행일) 기준으로, 그날까지의 이벤트를 누적해 재고를 계산
- IN : +qty
- OUT: -qty
- ADJUST: qty 그대로 반영(음수면 감소)
*/
insert into inventory_snapshot_daily(date, sku, location, qty_on_hand)
select
date('{{ ds }}') as date,
sku,
location,
coalesce(sum(
case
when event_type='IN' then qty
when event_type='OUT' then -qty
else qty
end
), 0)::int as qty_on_hand
from rfid_events
where date(event_ts) <= date('{{ ds }}')
group by 1,2,3
on conflict (date, sku, location) do update
set qty_on_hand = excluded.qty_on_hand,
computed_at = now();
"""
with DAG(
dag_id="inventory_snapshot_daily",
start_date=pendulum.datetime(2026, 1, 1, tz=KST),
schedule="10 1 * * *", # 매일 01:10 (원하면 변경)
catchup=False, # 과거 날짜 쌓아 올리는 backfill은 일단 끔
max_active_runs=1,
default_args={
"owner": "airflow",
"retries": 1,
},
tags=["rfid", "snapshot", "postgres"],
) as dag:
snapshot_daily = SQLExecuteQueryOperator(
task_id="snapshot_daily",
conn_id=CONN_ID,
sql=SNAPSHOT_SQL,
)
PY
DAG #2: 출고 예측(MA7) shipment_forecast_daily_ma7.py
- rfid_events에서 event_type='OUT'만 가져와 일별 출고량을 만들고
- 최근 7일 평균(MA7)을 계산해서
- shipment_forecast_daily에 upsert 한다.
예측은 ds(오늘) 기준으로 “이전 7일(ds-7 ~ ds-1)”을 보고, ds일 예측값을 만든다.
(데이터가 적으면 0이 나오는 게 정상일 수 있음)
sudo tee /root/dags/shipment_forecast_daily_ma7.py > /dev/null <<'PY'
import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
KST = pendulum.timezone("Asia/Seoul")
CONN_ID = "rfid_postgres"
FORECAST_SQL = """
/*
shipment_forecast_daily (MA7):
- ds(오늘) 예측값 = 이전 7일(ds-7 ~ ds-1)의 일별 OUT 합계 평균
- OUT 이벤트가 없는 날은 0으로 취급되도록 generate_series로 날짜 골격 생성
*/
with daily_out as (
select
date(event_ts) as d,
sku,
location,
sum(qty)::int as out_qty
from rfid_events
where event_type = 'OUT'
and date(event_ts) >= date('{{ macros.ds_add(ds, -7) }}')
and date(event_ts) <= date('{{ macros.ds_add(ds, -1) }}')
group by 1,2,3
),
date_spine as (
select generate_series(
date('{{ macros.ds_add(ds, -7) }}'),
date('{{ macros.ds_add(ds, -1) }}'),
interval '1 day'
)::date as d
),
sku_loc as (
-- 최근 7일간 OUT가 한 번이라도 있었던 sku/location 조합만 예측 대상으로 삼음(원하면 확장 가능)
select distinct sku, location from daily_out
),
filled as (
select
s.sku,
s.location,
ds.d,
coalesce(o.out_qty, 0)::int as out_qty
from sku_loc s
cross join date_spine ds
left join daily_out o
on o.sku = s.sku and o.location = s.location and o.d = ds.d
),
ma7 as (
select
date('{{ ds }}') as date,
sku,
location,
-- 평균 → 정수 저장. round 대신 ceil/greatest로 바꾸면 0 문제를 줄일 수 있음
round(avg(out_qty))::int as forecast_qty
from filled
group by 1,2,3
)
insert into shipment_forecast_daily(date, sku, location, forecast_qty, method)
select date, sku, location, forecast_qty, 'ma7' as method
from ma7
on conflict (date, sku, location, method) do update
set forecast_qty = excluded.forecast_qty,
computed_at = now();
"""
with DAG(
dag_id="shipment_forecast_daily_ma7",
start_date=pendulum.datetime(2026, 1, 1, tz=KST),
schedule="10 1 * * *", # 스냅샷과 같은 시간에 실행(원하면 분리)
catchup=False,
max_active_runs=1,
default_args={
"owner": "airflow",
"retries": 1,
},
tags=["rfid", "forecast", "ma7", "postgres"],
) as dag:
forecast_ma7 = SQLExecuteQueryOperator(
task_id="forecast_ma7",
conn_id=CONN_ID,
sql=FORECAST_SQL,
)
PY
DAG 로드 확인(Import 에러 체크 포함)
import 에러 확인
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags list-import-errors"
DAG 목록에 뜨는지 확인
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags list | egrep 'inventory_snapshot_daily|shipment_forecast_daily_ma7' || true"
실행(트리거) & 결과 확인
수동 실행(트리거)
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger inventory_snapshot_daily"
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger shipment_forecast_daily_ma7"
만약 “manual run에서 logical_date가 이상하게 잡혀 실패”를 겪으면
- -l 옵션으로 논리 날짜를 명시해서 트리거 가능
ex :)
docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger shipment_forecast_daily_ma7 -l 2026-01-03T01:10:00+09:00 -r manual_fix"
Postgres에서 결과 확인
스냅샷:
cd ~/rfid-pipeline
docker compose exec postgres psql -U app -d warehouse -c \
"select * from inventory_snapshot_daily order by date desc, sku, location limit 20;"
예측:
docker compose exec postgres psql -U app -d warehouse -c \
"select * from shipment_forecast_daily order by computed_at desc limit 20;"
- Airflow 3에서 days_ago는 import 에러 날 수 있음 → pendulum.datetime(..., tz=...)로 start_date 지정
- DAG 파일은 컨테이너에 직접 쓰지 말고(권한 문제) → 호스트 마운트 경로(/root/dags)에 저장
- Airflow 컨테이너 ↔ Postgres 컨테이너가 네트워크가 다르면 → host.docker.internal:5432가 제일 간단
- MA7이 0 나오는 건 버그가 아니라 데이터가 적어서일 수 있음(OUT 이벤트가 거의 없으면 평균이 0.xx)
wsl을 다루면서 이슈가 있었던 내용들을 다뤄봤다.
'데이터 엔지니어링' 카테고리의 다른 글
| [데이터엔지니어링] - ETL VS ELT (1) | 2026.01.23 |
|---|---|
| [Redis] - RFID 기반 실시간 재고 조회 API 구축 (0) | 2026.01.10 |
| [AirFlow+Metabase+트러블슈팅] - RFID 실시간/배치 파이프라인 02(중요) (0) | 2026.01.03 |
| [SQL + PYTHON] - RFID 실시간/배치 파이프라인 01 (0) | 2026.01.03 |
| [DBT] - Data build tool (1) | 2025.12.29 |