test_space_sdc_26 / simulation_engine.py
RJuro's picture
Deploy Streamlit app from local src/
ecb62e6 verified
Raw
History Blame Contribute Delete
14.6 kB
"""
Simulation Arena — deterministic parallel-universe replenishment simulator.
===========================================================================
Two identical copies of the world ("universes") run forward from the same
initial state and the SAME demand realisation. The only difference is the
replenishment policy:
- Universe A (Naive): continuous-review reorder point with a fixed safety
stock and a 7-day moving-average forecast. Reacts
slowly; blind to seasonality and the VBP shock.
- Universe B (Adaptive): periodic review that sizes safety stock from demand
volatility, follows short-term trend, and anticipates
the Volume-Based Procurement (VBP) demand spillover.
Mechanics borrowed from FoodTruck Bench: an *opaque* daily demand process the
policies never see, compounding consequences (stockouts, overstock, expiry),
and a deterministic seed so the two universes are directly comparable.
The engine is pure numpy/pandas (no Streamlit) so it can be unit-tested and
imported by the dashboard. See test_simulation_engine.py for invariants.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
# Economic assumptions (illustrative, documented for the report)
RETAIL_MARKUP = 1.40 # selling price = effective unit cost x markup
HOLDING_RATE_ANNUAL = 0.25 # 25% of inventory value per year
CHURN_PENALTY_FRAC = 0.50 # goodwill cost per unit of unmet demand (x margin)
VBP_PRICE_DROP = 0.51 # ~51% procurement price cut at the shock (case figure)
VBP_VOLUME_UPLIFT = 0.30 # retail demand spillover after the cut
Z_SERVICE = 1.64 # ~95% service level for statistical safety stock
@dataclass
class SKUParams:
sku_id: str
name: str
demand_class: str
base_monthly: float
cv: float
vbp_flag: bool
unit_cost: float
lead_time_days: int
shelf_life_days: int
def _load_tables():
products = pd.read_csv(os.path.join(DATA_DIR, "products.csv"))
profiles = pd.read_csv(os.path.join(DATA_DIR, "sku_profiles.csv"))
return products.merge(profiles, on="sku_id", suffixes=("", "_p"))
def list_demo_skus(preferred_classes=("policy_shocked", "seasonal", "fast")) -> pd.DataFrame:
"""Return a tidy table of SKUs suitable for the Arena, best demo first."""
df = _load_tables()
order = {c: i for i, c in enumerate(preferred_classes)}
df["_rank"] = df["demand_class"].map(lambda c: order.get(c, 99))
cols = ["sku_id", "product_name", "therapy_area", "demand_class",
"vbp_flag", "base_demand", "demand_cv", "_rank"]
return df[cols].sort_values(["_rank", "base_demand"], ascending=[True, False]).drop(columns="_rank").reset_index(drop=True)
def load_sku(sku_id: str | None = None) -> SKUParams:
df = _load_tables()
if sku_id is None:
# Default to the highest-volume VBP-exposed SKU: the shock is where the
# naive policy's blindness shows, so it is the clearest showcase.
demo = list_demo_skus()
vbp = demo[demo["vbp_flag"]]
sku_id = (vbp if not vbp.empty else demo).iloc[0]["sku_id"]
row = df[df["sku_id"] == sku_id].iloc[0]
return SKUParams(
sku_id=row["sku_id"],
name=row["product_name"],
demand_class=row["demand_class"],
base_monthly=float(row["base_demand"]),
cv=float(row["demand_cv"]),
vbp_flag=bool(row["vbp_flag"]),
unit_cost=float(row["unit_cost_cny"]),
lead_time_days=int(row["lead_time_days"]),
shelf_life_days=int(row["shelf_life_months"]) * 30,
)
# ------------------------------------------------------------------
# Opaque daily demand process (hidden from the policies)
# ------------------------------------------------------------------
def simulate_demand(sku: SKUParams, days: int, seed: int,
inject_vbp: bool, vbp_shock_day: int) -> np.ndarray:
"""Daily demand realisation. Deterministic given seed."""
rng = np.random.default_rng(seed)
base_daily = sku.base_monthly / 30.0
t = np.arange(days)
# Annual seasonality; strong for seasonal SKUs, mild otherwise.
amp = {"seasonal": 0.45, "fast": 0.06, "policy_shocked": 0.10, "long_tail": 0.10}.get(sku.demand_class, 0.10)
season = 1.0 + amp * np.sin(2 * np.pi * (t / 365.0) - np.pi / 2) # trough mid-year, peak winter
# Winter ILI bump for seasonal SKUs (days near year boundaries)
doy = (t % 365)
ili = np.where((sku.demand_class == "seasonal") & ((doy < 60) | (doy > 330)), 1.30, 1.0)
level = base_daily * season * ili
# VBP structural break: demand spillover after the shock for VBP SKUs.
if inject_vbp and sku.vbp_flag:
level = level * np.where(t >= vbp_shock_day, 1 + VBP_VOLUME_UPLIFT, 1.0)
# Heteroscedastic noise (wider just after the shock).
sigma = np.full(days, sku.cv)
if inject_vbp and sku.vbp_flag:
sigma = np.where((t >= vbp_shock_day) & (t < vbp_shock_day + 21), sku.cv * 1.6, sigma)
noise = rng.normal(0, sigma)
demand = np.maximum(0, level * (1 + noise))
return np.round(demand).astype(int)
def effective_unit_cost(sku: SKUParams, day: int, inject_vbp: bool, vbp_shock_day: int) -> float:
if inject_vbp and sku.vbp_flag and day >= vbp_shock_day:
return sku.unit_cost * (1 - VBP_PRICE_DROP)
return sku.unit_cost
# ------------------------------------------------------------------
# Inventory with FEFO fulfilment and shelf-life expiry
# ------------------------------------------------------------------
class Inventory:
"""First-Expired-First-Out batches; each batch ages and can expire."""
def __init__(self, shelf_life_days: int):
self.shelf_life = shelf_life_days
self.batches: list[list[int]] = [] # [remaining_qty, age_days]
@property
def on_hand(self) -> int:
return sum(b[0] for b in self.batches)
def receive(self, qty: int):
if qty > 0:
self.batches.append([int(qty), 0])
def age_and_expire(self) -> int:
expired = 0
for b in self.batches:
b[1] += 1
kept = []
for qty, age in self.batches:
if age >= self.shelf_life:
expired += qty
else:
kept.append([qty, age])
self.batches = kept
return expired
def fulfil(self, demand: int) -> tuple[int, int]:
"""Consume oldest batches first. Returns (fulfilled, unmet)."""
need = demand
self.batches.sort(key=lambda b: -b[1]) # oldest (highest age) first
for b in self.batches:
if need <= 0:
break
take = min(b[0], need)
b[0] -= take
need -= take
self.batches = [b for b in self.batches if b[0] > 0]
fulfilled = demand - need
return fulfilled, need
# ------------------------------------------------------------------
# Policies (they observe past demand only — never the hidden process)
# ------------------------------------------------------------------
@dataclass
class Policy:
sku: SKUParams
vbp_shock_day: int
inject_vbp: bool
def order(self, day, on_hand, inbound_qty, demand_history) -> int:
raise NotImplementedError
class NaivePolicy(Policy):
"""Continuous-review ROP, fixed safety stock, 7-day moving average."""
def order(self, day, on_hand, inbound_qty, demand_history) -> int:
lead = self.sku.lead_time_days
ma7 = float(np.mean(demand_history[-7:])) if len(demand_history) >= 1 else self.sku.base_monthly / 30
safety = ma7 * 3 # fixed ~3 days, regardless of volatility or shocks
rop = ma7 * lead + safety
position = on_hand + inbound_qty
if position <= rop:
order_up_to = ma7 * (lead + 7) + safety
return int(max(0, round(order_up_to - position)))
return 0
class AdaptivePolicy(Policy):
"""Periodic review; volatility-sized safety; trend- and VBP-aware."""
review_period: int = 7
def order(self, day, on_hand, inbound_qty, demand_history) -> int:
if day % self.review_period != 0:
return 0
lead = self.sku.lead_time_days
hist = np.asarray(demand_history[-28:]) if len(demand_history) >= 1 else np.array([self.sku.base_monthly / 30])
ma28 = float(hist.mean())
recent = float(hist[-7:].mean()) if len(hist) >= 7 else ma28
trend = recent - ma28
sigma = float(hist.std()) if len(hist) > 1 else ma28 * self.sku.cv
forecast = max(0.0, ma28 + 0.5 * trend)
# Anticipate the VBP demand spillover one lead time ahead.
if self.inject_vbp and self.sku.vbp_flag and day >= self.vbp_shock_day - lead:
forecast *= (1 + VBP_VOLUME_UPLIFT)
horizon = lead + self.review_period
safety = Z_SERVICE * sigma * np.sqrt(horizon)
order_up_to = forecast * horizon + safety
position = on_hand + inbound_qty
return int(max(0, round(order_up_to - position)))
# ------------------------------------------------------------------
# Single-universe run
# ------------------------------------------------------------------
def _run_universe(sku, demand, policy, inject_vbp, vbp_shock_day, holding_daily):
inv = Inventory(sku.shelf_life_days)
inv.receive(int(np.mean(demand[:30]) * sku.lead_time_days * 1.5)) # warm start
inbound: list[list[int]] = [] # [arrival_day, qty]
hist: list[int] = []
series = {k: [] for k in ("on_hand", "sales", "unmet", "orders", "expired", "net_cash")}
rev = cogs = hold = exp_cost = churn = order_spend = 0.0
fulfilled_total = demand_total = 0
for day in range(len(demand)):
# receive inbound that has arrived
arrived = sum(q for a, q in inbound if a == day)
inv.receive(arrived)
inbound = [[a, q] for a, q in inbound if a > day]
cost = effective_unit_cost(sku, day, inject_vbp, vbp_shock_day)
price = cost * RETAIL_MARKUP
# fulfil today's demand (FEFO)
d = int(demand[day])
fulfilled, unmet = inv.fulfil(d)
# ageing / expiry
expired = inv.age_and_expire()
# replenishment decision
inbound_qty = sum(q for _, q in inbound)
qty = policy.order(day, inv.on_hand, inbound_qty, hist)
if qty > 0:
inbound.append([day + sku.lead_time_days, qty])
order_spend += qty * cost
# economics
rev += fulfilled * price
cogs += fulfilled * cost
hold_today = holding_daily * inv.on_hand * cost
hold += hold_today
exp_cost += expired * cost
churn += unmet * (price - cost) * CHURN_PENALTY_FRAC
fulfilled_total += fulfilled
demand_total += d
net = fulfilled * (price - cost) - hold_today - expired * cost - unmet * (price - cost) * CHURN_PENALTY_FRAC
for k, v in (("on_hand", inv.on_hand), ("sales", fulfilled), ("unmet", unmet),
("orders", qty), ("expired", expired), ("net_cash", net)):
series[k].append(v)
hist.append(d)
gross_margin = rev - cogs - hold - exp_cost - churn
kpis = {
"revenue": rev,
"cogs": cogs,
"holding_cost": hold,
"expiry_cost": exp_cost,
"churn_penalty": churn,
"gross_margin": gross_margin,
"units_sold": fulfilled_total,
"unmet_units": demand_total - fulfilled_total,
"stockout_days": int(np.sum(np.array(series["unmet"]) > 0)),
"expiry_units": int(np.sum(series["expired"])),
"service_level": (fulfilled_total / demand_total) if demand_total else 1.0,
"avg_on_hand": float(np.mean(series["on_hand"])),
"order_spend": order_spend,
}
series["cum_cash"] = list(np.cumsum(series["net_cash"]))
return series, kpis
# ------------------------------------------------------------------
# Arena: run both universes on the same demand, build an event log
# ------------------------------------------------------------------
def run_arena(sku_id: str | None = None, horizon_days: int = 180,
vbp_shock_day: int = 90, seed: int = 42, inject_vbp: bool = True) -> dict:
sku = load_sku(sku_id)
demand = simulate_demand(sku, horizon_days, seed, inject_vbp, vbp_shock_day)
holding_daily = HOLDING_RATE_ANNUAL / 365.0
a_series, a_kpis = _run_universe(sku, demand, NaivePolicy(sku, vbp_shock_day, inject_vbp),
inject_vbp, vbp_shock_day, holding_daily)
b_series, b_kpis = _run_universe(sku, demand, AdaptivePolicy(sku, vbp_shock_day, inject_vbp),
inject_vbp, vbp_shock_day, holding_daily)
events = []
if inject_vbp and sku.vbp_flag:
events.append((vbp_shock_day, "VBP shock", f"~{int(VBP_PRICE_DROP*100)}% price cut; +{int(VBP_VOLUME_UPLIFT*100)}% demand spillover"))
for label, series in (("A", a_series), ("B", b_series)):
unmet = np.array(series["unmet"])
for day in np.where(unmet > 0)[0]:
events.append((int(day), f"Universe {label} stockout", f"{int(unmet[day])} units unmet"))
exp = np.array(series["expired"])
for day in np.where(exp > 0)[0]:
events.append((int(day), f"Universe {label} expiry", f"{int(exp[day])} units written off"))
events.sort(key=lambda e: e[0])
return {
"sku": sku,
"demand": demand,
"A": {"series": a_series, "kpis": a_kpis},
"B": {"series": b_series, "kpis": b_kpis},
"events": events,
"config": {"horizon_days": horizon_days, "vbp_shock_day": vbp_shock_day,
"seed": seed, "inject_vbp": inject_vbp},
"delta": {k: b_kpis[k] - a_kpis[k] for k in a_kpis},
}
if __name__ == "__main__":
res = run_arena()
s = res["sku"]
print(f"SKU {s.sku_id} ({s.name}, {s.demand_class}, VBP={s.vbp_flag})")
for u in ("A", "B"):
k = res[u]["kpis"]
print(f" Universe {u}: margin ¥{k['gross_margin']:>12,.0f} | "
f"service {k['service_level']*100:5.1f}% | stockout-days {k['stockout_days']:3d} | "
f"expiry {k['expiry_units']:5d} | avg on-hand {k['avg_on_hand']:.0f}")
d = res["delta"]
print(f" Delta (B-A): margin ¥{d['gross_margin']:+,.0f} | "
f"service {d['service_level']*100:+.1f}pp | stockout-days {d['stockout_days']:+d}")