데이터 엔지니어링

[WSL + Airflow] RFID 실시간/배치 파이프라인 03 - Dag 코드(스냅샷/예측)

jyu_seo_ 2026. 1. 3. 22:36

Airflow DAG 코드(스냅샷/예측)  환경(Airflow 3, days_ago 이슈 반영)기준으로 정리해볼려한다.

 

이번글은 wsl에서 너무 헤매고 했어서 기록용으로 남겨둘려고한다.

 

Airflow Connection 생성(컨테이너 안에서 1회)

 

Postgres가 docker compose로 0.0.0.0:5432->5432 열려 있으니,

Airflow 컨테이너에서는 host.docker.internal:5432로 붙는 게 가장 간단함.

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow connections add rfid_postgres \
 --conn-type postgres \
 --conn-host host.docker.internal \
 --conn-port 5432 \
 --conn-schema warehouse \
 --conn-login app \
 --conn-password app_pw"

확인 코드

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow connections get rfid_postgres"

DAG 파일을 어디에 넣어야 하는가?

 

  • 호스트(WSL): /root/dags
  • 컨테이너: /opt/airflow/dags

확인용

docker inspect root-airflow-dag-processor-1 \
  --format '{{range .Mounts}}{{println .Source " -> " .Destination}}{{end}}' | grep dags

 

 

DAG 파일은 WSL에서 /root/dags에 생성하면 된다.

sudo mkdir -p /root/dags

 

DAG #1: 일일 재고 스냅샷 inventory_snapshot_daily.py

 

  • 이벤트 원장 rfid_events를 기반으로
  • 해당 날짜(=Airflow logical date, ds) 기준 재고를 계산해서
  • inventory_snapshot_daily에 upsert(있으면 갱신)한다.

 

sudo tee /root/dags/inventory_snapshot_daily.py > /dev/null <<'PY'
import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# DAG 타임존을 KST로 두면 ds가 한국 날짜로 떨어짐
KST = pendulum.timezone("Asia/Seoul")

# Airflow Connection ID (미리 만들어둔 rfid_postgres)
CONN_ID = "rfid_postgres"

SNAPSHOT_SQL = """
/*
inventory_snapshot_daily:
- ds(논리 실행일) 기준으로, 그날까지의 이벤트를 누적해 재고를 계산
- IN : +qty
- OUT: -qty
- ADJUST: qty 그대로 반영(음수면 감소)
*/
insert into inventory_snapshot_daily(date, sku, location, qty_on_hand)
select
  date('{{ ds }}') as date,
  sku,
  location,
  coalesce(sum(
    case
      when event_type='IN' then qty
      when event_type='OUT' then -qty
      else qty
    end
  ), 0)::int as qty_on_hand
from rfid_events
where date(event_ts) <= date('{{ ds }}')
group by 1,2,3
on conflict (date, sku, location) do update
set qty_on_hand = excluded.qty_on_hand,
    computed_at = now();
"""

with DAG(
    dag_id="inventory_snapshot_daily",
    start_date=pendulum.datetime(2026, 1, 1, tz=KST),
    schedule="10 1 * * *",  # 매일 01:10 (원하면 변경)
    catchup=False,          # 과거 날짜 쌓아 올리는 backfill은 일단 끔
    max_active_runs=1,
    default_args={
        "owner": "airflow",
        "retries": 1,
    },
    tags=["rfid", "snapshot", "postgres"],
) as dag:

    snapshot_daily = SQLExecuteQueryOperator(
        task_id="snapshot_daily",
        conn_id=CONN_ID,
        sql=SNAPSHOT_SQL,
    )
PY

 

DAG #2: 출고 예측(MA7) shipment_forecast_daily_ma7.py

  • rfid_events에서 event_type='OUT'만 가져와 일별 출고량을 만들고
  • 최근 7일 평균(MA7)을 계산해서
  • shipment_forecast_daily에 upsert 한다.

예측은 ds(오늘) 기준으로 “이전 7일(ds-7 ~ ds-1)”을 보고, ds일 예측값을 만든다.
(데이터가 적으면 0이 나오는 게 정상일 수 있음)

 

sudo tee /root/dags/shipment_forecast_daily_ma7.py > /dev/null <<'PY'
import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

KST = pendulum.timezone("Asia/Seoul")
CONN_ID = "rfid_postgres"

FORECAST_SQL = """
/*
shipment_forecast_daily (MA7):
- ds(오늘) 예측값 = 이전 7일(ds-7 ~ ds-1)의 일별 OUT 합계 평균
- OUT 이벤트가 없는 날은 0으로 취급되도록 generate_series로 날짜 골격 생성
*/
with daily_out as (
  select
    date(event_ts) as d,
    sku,
    location,
    sum(qty)::int as out_qty
  from rfid_events
  where event_type = 'OUT'
    and date(event_ts) >= date('{{ macros.ds_add(ds, -7) }}')
    and date(event_ts) <= date('{{ macros.ds_add(ds, -1) }}')
  group by 1,2,3
),
date_spine as (
  select generate_series(
    date('{{ macros.ds_add(ds, -7) }}'),
    date('{{ macros.ds_add(ds, -1) }}'),
    interval '1 day'
  )::date as d
),
sku_loc as (
  -- 최근 7일간 OUT가 한 번이라도 있었던 sku/location 조합만 예측 대상으로 삼음(원하면 확장 가능)
  select distinct sku, location from daily_out
),
filled as (
  select
    s.sku,
    s.location,
    ds.d,
    coalesce(o.out_qty, 0)::int as out_qty
  from sku_loc s
  cross join date_spine ds
  left join daily_out o
    on o.sku = s.sku and o.location = s.location and o.d = ds.d
),
ma7 as (
  select
    date('{{ ds }}') as date,
    sku,
    location,
    -- 평균 → 정수 저장. round 대신 ceil/greatest로 바꾸면 0 문제를 줄일 수 있음
    round(avg(out_qty))::int as forecast_qty
  from filled
  group by 1,2,3
)
insert into shipment_forecast_daily(date, sku, location, forecast_qty, method)
select date, sku, location, forecast_qty, 'ma7' as method
from ma7
on conflict (date, sku, location, method) do update
set forecast_qty = excluded.forecast_qty,
    computed_at = now();
"""

with DAG(
    dag_id="shipment_forecast_daily_ma7",
    start_date=pendulum.datetime(2026, 1, 1, tz=KST),
    schedule="10 1 * * *",  # 스냅샷과 같은 시간에 실행(원하면 분리)
    catchup=False,
    max_active_runs=1,
    default_args={
        "owner": "airflow",
        "retries": 1,
    },
    tags=["rfid", "forecast", "ma7", "postgres"],
) as dag:

    forecast_ma7 = SQLExecuteQueryOperator(
        task_id="forecast_ma7",
        conn_id=CONN_ID,
        sql=FORECAST_SQL,
    )
PY

 

DAG 로드 확인(Import 에러 체크 포함)

import 에러 확인

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags list-import-errors"

 

DAG 목록에 뜨는지 확인

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags list | egrep 'inventory_snapshot_daily|shipment_forecast_daily_ma7' || true"

 

실행(트리거) & 결과 확인

수동 실행(트리거)

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger inventory_snapshot_daily"

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger shipment_forecast_daily_ma7"

만약 “manual run에서 logical_date가 이상하게 잡혀 실패”를 겪으면

  • -l 옵션으로 논리 날짜를 명시해서 트리거 가능

ex :) 

docker exec -it --user airflow root-airflow-scheduler-1 bash -lc \
"airflow dags trigger shipment_forecast_daily_ma7 -l 2026-01-03T01:10:00+09:00 -r manual_fix"

 

Postgres에서 결과 확인

스냅샷:

cd ~/rfid-pipeline
docker compose exec postgres psql -U app -d warehouse -c \
"select * from inventory_snapshot_daily order by date desc, sku, location limit 20;"

 

예측:

docker compose exec postgres psql -U app -d warehouse -c \
"select * from shipment_forecast_daily order by computed_at desc limit 20;"

 

  • Airflow 3에서 days_ago는 import 에러 날 수 있음 → pendulum.datetime(..., tz=...)로 start_date 지정
  • DAG 파일은 컨테이너에 직접 쓰지 말고(권한 문제) → 호스트 마운트 경로(/root/dags)에 저장
  • Airflow 컨테이너 ↔ Postgres 컨테이너가 네트워크가 다르면 → host.docker.internal:5432가 제일 간단
  • MA7이 0 나오는 건 버그가 아니라 데이터가 적어서일 수 있음(OUT 이벤트가 거의 없으면 평균이 0.xx)

wsl을 다루면서 이슈가 있었던 내용들을 다뤄봤다.