Spaces:
Sleeping
Sleeping
| """ | |
| Simulation Arena — deterministic parallel-universe replenishment simulator. | |
| =========================================================================== | |
| Two identical copies of the world ("universes") run forward from the same | |
| initial state and the SAME demand realisation. The only difference is the | |
| replenishment policy: | |
| - Universe A (Naive): continuous-review reorder point with a fixed safety | |
| stock and a 7-day moving-average forecast. Reacts | |
| slowly; blind to seasonality and the VBP shock. | |
| - Universe B (Adaptive): periodic review that sizes safety stock from demand | |
| volatility, follows short-term trend, and anticipates | |
| the Volume-Based Procurement (VBP) demand spillover. | |
| Mechanics borrowed from FoodTruck Bench: an *opaque* daily demand process the | |
| policies never see, compounding consequences (stockouts, overstock, expiry), | |
| and a deterministic seed so the two universes are directly comparable. | |
| The engine is pure numpy/pandas (no Streamlit) so it can be unit-tested and | |
| imported by the dashboard. See test_simulation_engine.py for invariants. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from dataclasses import dataclass, field | |
| import numpy as np | |
| import pandas as pd | |
| DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") | |
| # Economic assumptions (illustrative, documented for the report) | |
| RETAIL_MARKUP = 1.40 # selling price = effective unit cost x markup | |
| HOLDING_RATE_ANNUAL = 0.25 # 25% of inventory value per year | |
| CHURN_PENALTY_FRAC = 0.50 # goodwill cost per unit of unmet demand (x margin) | |
| VBP_PRICE_DROP = 0.51 # ~51% procurement price cut at the shock (case figure) | |
| VBP_VOLUME_UPLIFT = 0.30 # retail demand spillover after the cut | |
| Z_SERVICE = 1.64 # ~95% service level for statistical safety stock | |
| class SKUParams: | |
| sku_id: str | |
| name: str | |
| demand_class: str | |
| base_monthly: float | |
| cv: float | |
| vbp_flag: bool | |
| unit_cost: float | |
| lead_time_days: int | |
| shelf_life_days: int | |
| def _load_tables(): | |
| products = pd.read_csv(os.path.join(DATA_DIR, "products.csv")) | |
| profiles = pd.read_csv(os.path.join(DATA_DIR, "sku_profiles.csv")) | |
| return products.merge(profiles, on="sku_id", suffixes=("", "_p")) | |
| def list_demo_skus(preferred_classes=("policy_shocked", "seasonal", "fast")) -> pd.DataFrame: | |
| """Return a tidy table of SKUs suitable for the Arena, best demo first.""" | |
| df = _load_tables() | |
| order = {c: i for i, c in enumerate(preferred_classes)} | |
| df["_rank"] = df["demand_class"].map(lambda c: order.get(c, 99)) | |
| cols = ["sku_id", "product_name", "therapy_area", "demand_class", | |
| "vbp_flag", "base_demand", "demand_cv", "_rank"] | |
| return df[cols].sort_values(["_rank", "base_demand"], ascending=[True, False]).drop(columns="_rank").reset_index(drop=True) | |
| def load_sku(sku_id: str | None = None) -> SKUParams: | |
| df = _load_tables() | |
| if sku_id is None: | |
| # Default to the highest-volume VBP-exposed SKU: the shock is where the | |
| # naive policy's blindness shows, so it is the clearest showcase. | |
| demo = list_demo_skus() | |
| vbp = demo[demo["vbp_flag"]] | |
| sku_id = (vbp if not vbp.empty else demo).iloc[0]["sku_id"] | |
| row = df[df["sku_id"] == sku_id].iloc[0] | |
| return SKUParams( | |
| sku_id=row["sku_id"], | |
| name=row["product_name"], | |
| demand_class=row["demand_class"], | |
| base_monthly=float(row["base_demand"]), | |
| cv=float(row["demand_cv"]), | |
| vbp_flag=bool(row["vbp_flag"]), | |
| unit_cost=float(row["unit_cost_cny"]), | |
| lead_time_days=int(row["lead_time_days"]), | |
| shelf_life_days=int(row["shelf_life_months"]) * 30, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Opaque daily demand process (hidden from the policies) | |
| # ------------------------------------------------------------------ | |
| def simulate_demand(sku: SKUParams, days: int, seed: int, | |
| inject_vbp: bool, vbp_shock_day: int) -> np.ndarray: | |
| """Daily demand realisation. Deterministic given seed.""" | |
| rng = np.random.default_rng(seed) | |
| base_daily = sku.base_monthly / 30.0 | |
| t = np.arange(days) | |
| # Annual seasonality; strong for seasonal SKUs, mild otherwise. | |
| amp = {"seasonal": 0.45, "fast": 0.06, "policy_shocked": 0.10, "long_tail": 0.10}.get(sku.demand_class, 0.10) | |
| season = 1.0 + amp * np.sin(2 * np.pi * (t / 365.0) - np.pi / 2) # trough mid-year, peak winter | |
| # Winter ILI bump for seasonal SKUs (days near year boundaries) | |
| doy = (t % 365) | |
| ili = np.where((sku.demand_class == "seasonal") & ((doy < 60) | (doy > 330)), 1.30, 1.0) | |
| level = base_daily * season * ili | |
| # VBP structural break: demand spillover after the shock for VBP SKUs. | |
| if inject_vbp and sku.vbp_flag: | |
| level = level * np.where(t >= vbp_shock_day, 1 + VBP_VOLUME_UPLIFT, 1.0) | |
| # Heteroscedastic noise (wider just after the shock). | |
| sigma = np.full(days, sku.cv) | |
| if inject_vbp and sku.vbp_flag: | |
| sigma = np.where((t >= vbp_shock_day) & (t < vbp_shock_day + 21), sku.cv * 1.6, sigma) | |
| noise = rng.normal(0, sigma) | |
| demand = np.maximum(0, level * (1 + noise)) | |
| return np.round(demand).astype(int) | |
| def effective_unit_cost(sku: SKUParams, day: int, inject_vbp: bool, vbp_shock_day: int) -> float: | |
| if inject_vbp and sku.vbp_flag and day >= vbp_shock_day: | |
| return sku.unit_cost * (1 - VBP_PRICE_DROP) | |
| return sku.unit_cost | |
| # ------------------------------------------------------------------ | |
| # Inventory with FEFO fulfilment and shelf-life expiry | |
| # ------------------------------------------------------------------ | |
| class Inventory: | |
| """First-Expired-First-Out batches; each batch ages and can expire.""" | |
| def __init__(self, shelf_life_days: int): | |
| self.shelf_life = shelf_life_days | |
| self.batches: list[list[int]] = [] # [remaining_qty, age_days] | |
| def on_hand(self) -> int: | |
| return sum(b[0] for b in self.batches) | |
| def receive(self, qty: int): | |
| if qty > 0: | |
| self.batches.append([int(qty), 0]) | |
| def age_and_expire(self) -> int: | |
| expired = 0 | |
| for b in self.batches: | |
| b[1] += 1 | |
| kept = [] | |
| for qty, age in self.batches: | |
| if age >= self.shelf_life: | |
| expired += qty | |
| else: | |
| kept.append([qty, age]) | |
| self.batches = kept | |
| return expired | |
| def fulfil(self, demand: int) -> tuple[int, int]: | |
| """Consume oldest batches first. Returns (fulfilled, unmet).""" | |
| need = demand | |
| self.batches.sort(key=lambda b: -b[1]) # oldest (highest age) first | |
| for b in self.batches: | |
| if need <= 0: | |
| break | |
| take = min(b[0], need) | |
| b[0] -= take | |
| need -= take | |
| self.batches = [b for b in self.batches if b[0] > 0] | |
| fulfilled = demand - need | |
| return fulfilled, need | |
| # ------------------------------------------------------------------ | |
| # Policies (they observe past demand only — never the hidden process) | |
| # ------------------------------------------------------------------ | |
| class Policy: | |
| sku: SKUParams | |
| vbp_shock_day: int | |
| inject_vbp: bool | |
| def order(self, day, on_hand, inbound_qty, demand_history) -> int: | |
| raise NotImplementedError | |
| class NaivePolicy(Policy): | |
| """Continuous-review ROP, fixed safety stock, 7-day moving average.""" | |
| def order(self, day, on_hand, inbound_qty, demand_history) -> int: | |
| lead = self.sku.lead_time_days | |
| ma7 = float(np.mean(demand_history[-7:])) if len(demand_history) >= 1 else self.sku.base_monthly / 30 | |
| safety = ma7 * 3 # fixed ~3 days, regardless of volatility or shocks | |
| rop = ma7 * lead + safety | |
| position = on_hand + inbound_qty | |
| if position <= rop: | |
| order_up_to = ma7 * (lead + 7) + safety | |
| return int(max(0, round(order_up_to - position))) | |
| return 0 | |
| class AdaptivePolicy(Policy): | |
| """Periodic review; volatility-sized safety; trend- and VBP-aware.""" | |
| review_period: int = 7 | |
| def order(self, day, on_hand, inbound_qty, demand_history) -> int: | |
| if day % self.review_period != 0: | |
| return 0 | |
| lead = self.sku.lead_time_days | |
| hist = np.asarray(demand_history[-28:]) if len(demand_history) >= 1 else np.array([self.sku.base_monthly / 30]) | |
| ma28 = float(hist.mean()) | |
| recent = float(hist[-7:].mean()) if len(hist) >= 7 else ma28 | |
| trend = recent - ma28 | |
| sigma = float(hist.std()) if len(hist) > 1 else ma28 * self.sku.cv | |
| forecast = max(0.0, ma28 + 0.5 * trend) | |
| # Anticipate the VBP demand spillover one lead time ahead. | |
| if self.inject_vbp and self.sku.vbp_flag and day >= self.vbp_shock_day - lead: | |
| forecast *= (1 + VBP_VOLUME_UPLIFT) | |
| horizon = lead + self.review_period | |
| safety = Z_SERVICE * sigma * np.sqrt(horizon) | |
| order_up_to = forecast * horizon + safety | |
| position = on_hand + inbound_qty | |
| return int(max(0, round(order_up_to - position))) | |
| # ------------------------------------------------------------------ | |
| # Single-universe run | |
| # ------------------------------------------------------------------ | |
| def _run_universe(sku, demand, policy, inject_vbp, vbp_shock_day, holding_daily): | |
| inv = Inventory(sku.shelf_life_days) | |
| inv.receive(int(np.mean(demand[:30]) * sku.lead_time_days * 1.5)) # warm start | |
| inbound: list[list[int]] = [] # [arrival_day, qty] | |
| hist: list[int] = [] | |
| series = {k: [] for k in ("on_hand", "sales", "unmet", "orders", "expired", "net_cash")} | |
| rev = cogs = hold = exp_cost = churn = order_spend = 0.0 | |
| fulfilled_total = demand_total = 0 | |
| for day in range(len(demand)): | |
| # receive inbound that has arrived | |
| arrived = sum(q for a, q in inbound if a == day) | |
| inv.receive(arrived) | |
| inbound = [[a, q] for a, q in inbound if a > day] | |
| cost = effective_unit_cost(sku, day, inject_vbp, vbp_shock_day) | |
| price = cost * RETAIL_MARKUP | |
| # fulfil today's demand (FEFO) | |
| d = int(demand[day]) | |
| fulfilled, unmet = inv.fulfil(d) | |
| # ageing / expiry | |
| expired = inv.age_and_expire() | |
| # replenishment decision | |
| inbound_qty = sum(q for _, q in inbound) | |
| qty = policy.order(day, inv.on_hand, inbound_qty, hist) | |
| if qty > 0: | |
| inbound.append([day + sku.lead_time_days, qty]) | |
| order_spend += qty * cost | |
| # economics | |
| rev += fulfilled * price | |
| cogs += fulfilled * cost | |
| hold_today = holding_daily * inv.on_hand * cost | |
| hold += hold_today | |
| exp_cost += expired * cost | |
| churn += unmet * (price - cost) * CHURN_PENALTY_FRAC | |
| fulfilled_total += fulfilled | |
| demand_total += d | |
| net = fulfilled * (price - cost) - hold_today - expired * cost - unmet * (price - cost) * CHURN_PENALTY_FRAC | |
| for k, v in (("on_hand", inv.on_hand), ("sales", fulfilled), ("unmet", unmet), | |
| ("orders", qty), ("expired", expired), ("net_cash", net)): | |
| series[k].append(v) | |
| hist.append(d) | |
| gross_margin = rev - cogs - hold - exp_cost - churn | |
| kpis = { | |
| "revenue": rev, | |
| "cogs": cogs, | |
| "holding_cost": hold, | |
| "expiry_cost": exp_cost, | |
| "churn_penalty": churn, | |
| "gross_margin": gross_margin, | |
| "units_sold": fulfilled_total, | |
| "unmet_units": demand_total - fulfilled_total, | |
| "stockout_days": int(np.sum(np.array(series["unmet"]) > 0)), | |
| "expiry_units": int(np.sum(series["expired"])), | |
| "service_level": (fulfilled_total / demand_total) if demand_total else 1.0, | |
| "avg_on_hand": float(np.mean(series["on_hand"])), | |
| "order_spend": order_spend, | |
| } | |
| series["cum_cash"] = list(np.cumsum(series["net_cash"])) | |
| return series, kpis | |
| # ------------------------------------------------------------------ | |
| # Arena: run both universes on the same demand, build an event log | |
| # ------------------------------------------------------------------ | |
| def run_arena(sku_id: str | None = None, horizon_days: int = 180, | |
| vbp_shock_day: int = 90, seed: int = 42, inject_vbp: bool = True) -> dict: | |
| sku = load_sku(sku_id) | |
| demand = simulate_demand(sku, horizon_days, seed, inject_vbp, vbp_shock_day) | |
| holding_daily = HOLDING_RATE_ANNUAL / 365.0 | |
| a_series, a_kpis = _run_universe(sku, demand, NaivePolicy(sku, vbp_shock_day, inject_vbp), | |
| inject_vbp, vbp_shock_day, holding_daily) | |
| b_series, b_kpis = _run_universe(sku, demand, AdaptivePolicy(sku, vbp_shock_day, inject_vbp), | |
| inject_vbp, vbp_shock_day, holding_daily) | |
| events = [] | |
| if inject_vbp and sku.vbp_flag: | |
| events.append((vbp_shock_day, "VBP shock", f"~{int(VBP_PRICE_DROP*100)}% price cut; +{int(VBP_VOLUME_UPLIFT*100)}% demand spillover")) | |
| for label, series in (("A", a_series), ("B", b_series)): | |
| unmet = np.array(series["unmet"]) | |
| for day in np.where(unmet > 0)[0]: | |
| events.append((int(day), f"Universe {label} stockout", f"{int(unmet[day])} units unmet")) | |
| exp = np.array(series["expired"]) | |
| for day in np.where(exp > 0)[0]: | |
| events.append((int(day), f"Universe {label} expiry", f"{int(exp[day])} units written off")) | |
| events.sort(key=lambda e: e[0]) | |
| return { | |
| "sku": sku, | |
| "demand": demand, | |
| "A": {"series": a_series, "kpis": a_kpis}, | |
| "B": {"series": b_series, "kpis": b_kpis}, | |
| "events": events, | |
| "config": {"horizon_days": horizon_days, "vbp_shock_day": vbp_shock_day, | |
| "seed": seed, "inject_vbp": inject_vbp}, | |
| "delta": {k: b_kpis[k] - a_kpis[k] for k in a_kpis}, | |
| } | |
| if __name__ == "__main__": | |
| res = run_arena() | |
| s = res["sku"] | |
| print(f"SKU {s.sku_id} ({s.name}, {s.demand_class}, VBP={s.vbp_flag})") | |
| for u in ("A", "B"): | |
| k = res[u]["kpis"] | |
| print(f" Universe {u}: margin ¥{k['gross_margin']:>12,.0f} | " | |
| f"service {k['service_level']*100:5.1f}% | stockout-days {k['stockout_days']:3d} | " | |
| f"expiry {k['expiry_units']:5d} | avg on-hand {k['avg_on_hand']:.0f}") | |
| d = res["delta"] | |
| print(f" Delta (B-A): margin ¥{d['gross_margin']:+,.0f} | " | |
| f"service {d['service_level']*100:+.1f}pp | stockout-days {d['stockout_days']:+d}") | |