데이터 엔지니어링

[Redis] - RFID 기반 실시간 재고 조회 API 구축

jyu_seo_ 2026. 1. 10. 21:38

 

재고 시스템에서 가장 어려운 문제는 단순히 “재고를 저장하는 것”이 아니라,
변화하는 재고를 얼마나 정확하고 빠르게 조회할 수 있느냐다.

이번 글에서는 RFID 이벤트를 Kafka로 수집하고,
Postgres에 정합성 있게 누적한 뒤,
FastAPI + Redis를 이용해 실시간에 가까운 재고 조회 API를 구축한 과정을 정리한다.

 

현재 까지 작업단계

 

1.Kafka 토픽으로 RFID 이벤트를 받는다.

2.Python Consumer가 이벤트를 읽고 Postgres에 원장으로 저장한다.

3.동시에 현재 재고 테이블을 실시간으로 업데이트한다.

4.Airflow가 매일 스냅샷과 예측 결과를 생성한다.

5.Metabase로 실시간/배치 결과를 한눈에 모니터링한다.

실시간 RFID 재고 시스템 구축기

Kafka → Postgres → FastAPI → Redis → 조회 API

전체 아키텍처 개요

목표

  • RFID 이벤트를 실시간으로 수집
  • 현재 재고를 DB에 누적 관리
  • 빠른 재고 조회 API 제공
  • 조회 트래픽은 Redis 캐시로 흡수

전체 흐름

RFID Reader
   ↓
Kafka Topic (rfid_events)
   ↓
Python Consumer
   ↓
Postgres
   ├─ rfid_events (원장)
   └─ inventory_current (현재 재고)
   ↓
FastAPI (조회 API)
   ↓
Redis (캐시)

 


 

01 Kafka → Postgres Consumer 구현

왜 이벤트 원장이 필요한가

 

RFID 이벤트는 단순 증가/감소가 아니라,
중복 이벤트, 재처리, 정정(ADJUST) 가능성이 항상 존재한다.

그래서 이벤트를 바로 재고에 반영하지 않고,
항상 이벤트 원장(rfid_events)에 먼저 기록한다.

 

현재 Consumer의 핵심 로직은 다음과 같다.

  • Kafka 메시지 수신
  • 이벤트 중복 방지 (event_id 기준)
  • 이벤트는 원장에 먼저 저장
  • 성공한 이벤트만 현재 재고에 반영
  • 모든 작업을 단일 트랜잭션으로 처리
INSERT INTO rfid_events (...) 
ON CONFLICT (event_id) DO NOTHING;

INSERT INTO inventory_current (sku, location, qty_on_hand)
VALUES (...)
ON CONFLICT (sku, location)
DO UPDATE SET
  qty_on_hand = inventory_current.qty_on_hand + excluded.qty_on_hand,
  updated_at = now();

 


02 Docker Compose로 인프라 구성

사용 서비스

  • Postgres 16
  • Kafka + Zookeeper
  • Redis
  • Metabase

핵심 포인트

  • Postgres / Redis를 호스트 포트로 publish
  • FastAPI(WSL)에서 localhost로 접근 가능
postgres:
  ports:
    - "5432:5432"

redis:
  ports:
    - "6379:6379"

03 FastAPI “현재 재고 조회 API” 구현

왜 FastAPI를 쓰는가

  • 가볍고 빠름
  • Swagger 자동 제공
  • 조회 API에 최적

※ Swagger : FastAPI 프레임워크가 자동으로 생성해주는 대화형 API 문서로, Swagger UI 또는 ReDoc을 통해 API의 모든 엔드포인트(URL, HTTP 메서드)와 데이터 모델을 시각적으로 보여주고, 실제 요청을 보내 테스트할 수 있게 해주는 도구다. 별도의 설정 없이 FastAPI 코드만으로 OpenAPI 표준 기반의 문서를 만들고, 프론트엔드 개발자와의 협업 및 API 테스트를 쉽게 해준다.

 

APP.PY

import json # Redis에 저장할때 파이썬 dict을 문자열로 바꿀려고 한다
import os #환경 변수로 DB/Redis 주소를 받기위해 필요
from typing import Optional, Any

import psycopg2 # Postgres 접속/쿼리 실행
import redis # Redis 접속
from fastapi import FastAPI, HTTPException, Query # 웹 API 서버

PG_DSN = os.getenv("PG_DSN", "dbname=warehouse user=app password=app_pw host=localhost port=5432") # DB 접속 문자열. 기본값 : docker - compose에 맞춘값
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") # Redis 주소 - docker-compose에서 6379 localhost로 접속
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "5")) # 캐시를 몇초 보관할지

rds = redis.Redis.from_url(REDIS_URL, decode_responses=True) # rds = Redis랑 통신할 객체

app = FastAPI(title="Current Inventory API") # Fast API 서버의 본채

def cache_get(key: str) -> Optional[Any]: # cache_get = rds.get(key)로 Redis에서 값을 꺼냄 없으면 None 있으면 JSON 문자열이므로 json.loads로 dict/list로 복원해서 반환
    v = rds.get(key)
    return None if v is None else json.loads(v)

def cache_set(key: str, obj: Any, ttl: int = CACHE_TTL_SECONDS) -> None: # json.dumps로 dict/list를 JSON 문자열로 바꿈
    rds.setex(key, ttl, json.dumps(obj, ensure_ascii=False, default=str)) # setex(key, ttl, value)로 “ttl초 뒤 자동 삭제되는 캐시” 저장

def fetch_one(sku: str, location: str) -> dict:
    sql = """
    select sku, location, qty_on_hand, updated_at
    from inventory_current
    where sku = %s and location = %s -- here sku=%s and location=%s 딱 1건만 찾음
    """
    conn = psycopg2.connect(PG_DSN)
    try:
        with conn.cursor() as cur:
            cur.execute(sql, (sku, location))
            row = cur.fetchone()
            if row is None: #  row is non이면 DB에 해당 재고가 없으므로 API 는 404로 반환 있으면 dict 형태로 만들어서 반환
                raise HTTPException(status_code=404, detail="not found")
            return {
                "sku": row[0],
                "location": row[1],
                "qty_on_hand": int(row[2]),
                "updated_at": row[3],
            }
    finally:
        conn.close()

@app.get("/inventory/current") # redis key 만들기
def get_current_inventory(
    sku: str = Query(..., min_length=1),
    location: str = Query(..., min_length=1),
):
    cache_key = f"inv:sku:{sku}:loc:{location}"
    cached = cache_get(cache_key)
    if cached is not None:
        return {"cached": True, "data": cached} # Redis에서 먼저찾기 있으면 DB안가고 바로 반환 없으면 DB조회 조회결과를 Redis에 TTL로 저장 반환 : cached: False

    data = fetch_one(sku, location)
    cache_set(cache_key, data)
    return {"cached": False, "data": data}

@app.get("/inventory/current/{sku}")
def get_current_inventory_by_sku(sku: str):
    cache_key = f"inv:sku:{sku}:all_locs"
    cached = cache_get(cache_key)
    if cached is not None:
        return {"cached": True, "data": cached}

    sql = """
    select sku, location, qty_on_hand, updated_at
    from inventory_current
    where sku = %s
    order by location
    """
    conn = psycopg2.connect(PG_DSN)
    try:
        with conn.cursor() as cur:
            cur.execute(sql, (sku,))
            rows = cur.fetchall()
            data = [
                {
                    "sku": r[0],
                    "location": r[1],
                    "qty_on_hand": int(r[2]),
                    "updated_at": r[3],
                }
                for r in rows
            ]
            cache_set(cache_key, data)
            return {"cached": False, "data": data}
    finally:
        conn.close()

 

 

API 설계

01. 단건 조회 (SKU + LOCATION)

GET /inventory/current?sku=SKU001&location=A-01

02. SKU 전체 조회

GET /inventory/current/SKU001

 

location은 / 같은 특수문자가 가능하다 (query param 사용)


04 Redis 캐시 적용(Cache-Aside 패턴)

위내용 캐시 데이터 패턴에 대해서도 나중에 블로그에 기술해 놓겠다.

간단히 설명하자면

 

서버에서 데이터 조회 및 저장 성능을 최적화하기 위해 캐시를 활용하는 것은 필수적이다. 하지만 단순히 캐시를 사용하는 것만으로는 충분하지 않으며, 데이터의 변경 빈도와 일관성 유지 방식에 따라 적절한 캐시 패턴(Cache Pattern)을 선택해야 한다. 그러기 위해서 Cache-Aside 패턴을 사용하였다.

 

캐시 전략은 다음과 같다.

  • 조회 시
    1. Redis에서 먼저 조회
    2. 없으면 DB 조회
    3. Redis에 TTL로 저장
  • TTL: 5초 

캐시 키 설계

inv:sku:{sku}:loc:{location}
inv:sku:{sku}:all_locs

 

FastAPI 코드 핵심

cached = cache_get(cache_key)
if cached:
    return {"cached": True, "data": cached}

data = fetch_from_db()
cache_set(cache_key, data)
return {"cached": False, "data": data}

 

Redis에서 먼저찾기 있으면 DB안가고 바로 반환 없으면 DB조회

조회결과를 Redis에 TTL로 저장

반환 : cached: False


기술적 장점

1.읽기/쓰기가 분리됨

2.DB부하 감소

3.조회응답속도 ms단위

4.kafka + DB 정합성 유지

실무적 장점

1. Metabase / 대시보드 연동이 쉬워졌다.

2. 실시간 모니터링 가능

3. 트래픽 증가에도 안정적


마무리

이번 작업은 단순히 API 하나를 만든 것이 아니라,

 

이벤트 기반 재고 시스템에서
정합성과 성능을 동시에 만족하는 구조를 구현한 경험이었다.

 

Kafka, Postgres, FastAPI, Redis 각각은 익숙한 기술이지만,
어디에 어떤 역할로 배치하느냐에 따라 시스템의 성격은 완전히 달라진다.

 

Kafka 기반 RFID 이벤트를 정합성 있게 누적하고,
FastAPI + Redis 캐시로 실시간에 가까운 재고 조회 API를 구축했다.