""" Simulation Arena — deterministic parallel-universe replenishment simulator. =========================================================================== Two identical copies of the world ("universes") run forward from the same initial state and the SAME demand realisation. The only difference is the replenishment policy: - Universe A (Naive): continuous-review reorder point with a fixed safety stock and a 7-day moving-average forecast. Reacts slowly; blind to seasonality and the VBP shock. - Universe B (Adaptive): periodic review that sizes safety stock from demand volatility, follows short-term trend, and anticipates the Volume-Based Procurement (VBP) demand spillover. Mechanics borrowed from FoodTruck Bench: an *opaque* daily demand process the policies never see, compounding consequences (stockouts, overstock, expiry), and a deterministic seed so the two universes are directly comparable. The engine is pure numpy/pandas (no Streamlit) so it can be unit-tested and imported by the dashboard. See test_simulation_engine.py for invariants. """ from __future__ import annotations import os from dataclasses import dataclass, field import numpy as np import pandas as pd DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") # Economic assumptions (illustrative, documented for the report) RETAIL_MARKUP = 1.40 # selling price = effective unit cost x markup HOLDING_RATE_ANNUAL = 0.25 # 25% of inventory value per year CHURN_PENALTY_FRAC = 0.50 # goodwill cost per unit of unmet demand (x margin) VBP_PRICE_DROP = 0.51 # ~51% procurement price cut at the shock (case figure) VBP_VOLUME_UPLIFT = 0.30 # retail demand spillover after the cut Z_SERVICE = 1.64 # ~95% service level for statistical safety stock @dataclass class SKUParams: sku_id: str name: str demand_class: str base_monthly: float cv: float vbp_flag: bool unit_cost: float lead_time_days: int shelf_life_days: int def _load_tables(): products = pd.read_csv(os.path.join(DATA_DIR, "products.csv")) profiles = pd.read_csv(os.path.join(DATA_DIR, "sku_profiles.csv")) return products.merge(profiles, on="sku_id", suffixes=("", "_p")) def list_demo_skus(preferred_classes=("policy_shocked", "seasonal", "fast")) -> pd.DataFrame: """Return a tidy table of SKUs suitable for the Arena, best demo first.""" df = _load_tables() order = {c: i for i, c in enumerate(preferred_classes)} df["_rank"] = df["demand_class"].map(lambda c: order.get(c, 99)) cols = ["sku_id", "product_name", "therapy_area", "demand_class", "vbp_flag", "base_demand", "demand_cv", "_rank"] return df[cols].sort_values(["_rank", "base_demand"], ascending=[True, False]).drop(columns="_rank").reset_index(drop=True) def load_sku(sku_id: str | None = None) -> SKUParams: df = _load_tables() if sku_id is None: # Default to the highest-volume VBP-exposed SKU: the shock is where the # naive policy's blindness shows, so it is the clearest showcase. demo = list_demo_skus() vbp = demo[demo["vbp_flag"]] sku_id = (vbp if not vbp.empty else demo).iloc[0]["sku_id"] row = df[df["sku_id"] == sku_id].iloc[0] return SKUParams( sku_id=row["sku_id"], name=row["product_name"], demand_class=row["demand_class"], base_monthly=float(row["base_demand"]), cv=float(row["demand_cv"]), vbp_flag=bool(row["vbp_flag"]), unit_cost=float(row["unit_cost_cny"]), lead_time_days=int(row["lead_time_days"]), shelf_life_days=int(row["shelf_life_months"]) * 30, ) # ------------------------------------------------------------------ # Opaque daily demand process (hidden from the policies) # ------------------------------------------------------------------ def simulate_demand(sku: SKUParams, days: int, seed: int, inject_vbp: bool, vbp_shock_day: int) -> np.ndarray: """Daily demand realisation. Deterministic given seed.""" rng = np.random.default_rng(seed) base_daily = sku.base_monthly / 30.0 t = np.arange(days) # Annual seasonality; strong for seasonal SKUs, mild otherwise. amp = {"seasonal": 0.45, "fast": 0.06, "policy_shocked": 0.10, "long_tail": 0.10}.get(sku.demand_class, 0.10) season = 1.0 + amp * np.sin(2 * np.pi * (t / 365.0) - np.pi / 2) # trough mid-year, peak winter # Winter ILI bump for seasonal SKUs (days near year boundaries) doy = (t % 365) ili = np.where((sku.demand_class == "seasonal") & ((doy < 60) | (doy > 330)), 1.30, 1.0) level = base_daily * season * ili # VBP structural break: demand spillover after the shock for VBP SKUs. if inject_vbp and sku.vbp_flag: level = level * np.where(t >= vbp_shock_day, 1 + VBP_VOLUME_UPLIFT, 1.0) # Heteroscedastic noise (wider just after the shock). sigma = np.full(days, sku.cv) if inject_vbp and sku.vbp_flag: sigma = np.where((t >= vbp_shock_day) & (t < vbp_shock_day + 21), sku.cv * 1.6, sigma) noise = rng.normal(0, sigma) demand = np.maximum(0, level * (1 + noise)) return np.round(demand).astype(int) def effective_unit_cost(sku: SKUParams, day: int, inject_vbp: bool, vbp_shock_day: int) -> float: if inject_vbp and sku.vbp_flag and day >= vbp_shock_day: return sku.unit_cost * (1 - VBP_PRICE_DROP) return sku.unit_cost # ------------------------------------------------------------------ # Inventory with FEFO fulfilment and shelf-life expiry # ------------------------------------------------------------------ class Inventory: """First-Expired-First-Out batches; each batch ages and can expire.""" def __init__(self, shelf_life_days: int): self.shelf_life = shelf_life_days self.batches: list[list[int]] = [] # [remaining_qty, age_days] @property def on_hand(self) -> int: return sum(b[0] for b in self.batches) def receive(self, qty: int): if qty > 0: self.batches.append([int(qty), 0]) def age_and_expire(self) -> int: expired = 0 for b in self.batches: b[1] += 1 kept = [] for qty, age in self.batches: if age >= self.shelf_life: expired += qty else: kept.append([qty, age]) self.batches = kept return expired def fulfil(self, demand: int) -> tuple[int, int]: """Consume oldest batches first. Returns (fulfilled, unmet).""" need = demand self.batches.sort(key=lambda b: -b[1]) # oldest (highest age) first for b in self.batches: if need <= 0: break take = min(b[0], need) b[0] -= take need -= take self.batches = [b for b in self.batches if b[0] > 0] fulfilled = demand - need return fulfilled, need # ------------------------------------------------------------------ # Policies (they observe past demand only — never the hidden process) # ------------------------------------------------------------------ @dataclass class Policy: sku: SKUParams vbp_shock_day: int inject_vbp: bool def order(self, day, on_hand, inbound_qty, demand_history) -> int: raise NotImplementedError class NaivePolicy(Policy): """Continuous-review ROP, fixed safety stock, 7-day moving average.""" def order(self, day, on_hand, inbound_qty, demand_history) -> int: lead = self.sku.lead_time_days ma7 = float(np.mean(demand_history[-7:])) if len(demand_history) >= 1 else self.sku.base_monthly / 30 safety = ma7 * 3 # fixed ~3 days, regardless of volatility or shocks rop = ma7 * lead + safety position = on_hand + inbound_qty if position <= rop: order_up_to = ma7 * (lead + 7) + safety return int(max(0, round(order_up_to - position))) return 0 class AdaptivePolicy(Policy): """Periodic review; volatility-sized safety; trend- and VBP-aware.""" review_period: int = 7 def order(self, day, on_hand, inbound_qty, demand_history) -> int: if day % self.review_period != 0: return 0 lead = self.sku.lead_time_days hist = np.asarray(demand_history[-28:]) if len(demand_history) >= 1 else np.array([self.sku.base_monthly / 30]) ma28 = float(hist.mean()) recent = float(hist[-7:].mean()) if len(hist) >= 7 else ma28 trend = recent - ma28 sigma = float(hist.std()) if len(hist) > 1 else ma28 * self.sku.cv forecast = max(0.0, ma28 + 0.5 * trend) # Anticipate the VBP demand spillover one lead time ahead. if self.inject_vbp and self.sku.vbp_flag and day >= self.vbp_shock_day - lead: forecast *= (1 + VBP_VOLUME_UPLIFT) horizon = lead + self.review_period safety = Z_SERVICE * sigma * np.sqrt(horizon) order_up_to = forecast * horizon + safety position = on_hand + inbound_qty return int(max(0, round(order_up_to - position))) # ------------------------------------------------------------------ # Single-universe run # ------------------------------------------------------------------ def _run_universe(sku, demand, policy, inject_vbp, vbp_shock_day, holding_daily): inv = Inventory(sku.shelf_life_days) inv.receive(int(np.mean(demand[:30]) * sku.lead_time_days * 1.5)) # warm start inbound: list[list[int]] = [] # [arrival_day, qty] hist: list[int] = [] series = {k: [] for k in ("on_hand", "sales", "unmet", "orders", "expired", "net_cash")} rev = cogs = hold = exp_cost = churn = order_spend = 0.0 fulfilled_total = demand_total = 0 for day in range(len(demand)): # receive inbound that has arrived arrived = sum(q for a, q in inbound if a == day) inv.receive(arrived) inbound = [[a, q] for a, q in inbound if a > day] cost = effective_unit_cost(sku, day, inject_vbp, vbp_shock_day) price = cost * RETAIL_MARKUP # fulfil today's demand (FEFO) d = int(demand[day]) fulfilled, unmet = inv.fulfil(d) # ageing / expiry expired = inv.age_and_expire() # replenishment decision inbound_qty = sum(q for _, q in inbound) qty = policy.order(day, inv.on_hand, inbound_qty, hist) if qty > 0: inbound.append([day + sku.lead_time_days, qty]) order_spend += qty * cost # economics rev += fulfilled * price cogs += fulfilled * cost hold_today = holding_daily * inv.on_hand * cost hold += hold_today exp_cost += expired * cost churn += unmet * (price - cost) * CHURN_PENALTY_FRAC fulfilled_total += fulfilled demand_total += d net = fulfilled * (price - cost) - hold_today - expired * cost - unmet * (price - cost) * CHURN_PENALTY_FRAC for k, v in (("on_hand", inv.on_hand), ("sales", fulfilled), ("unmet", unmet), ("orders", qty), ("expired", expired), ("net_cash", net)): series[k].append(v) hist.append(d) gross_margin = rev - cogs - hold - exp_cost - churn kpis = { "revenue": rev, "cogs": cogs, "holding_cost": hold, "expiry_cost": exp_cost, "churn_penalty": churn, "gross_margin": gross_margin, "units_sold": fulfilled_total, "unmet_units": demand_total - fulfilled_total, "stockout_days": int(np.sum(np.array(series["unmet"]) > 0)), "expiry_units": int(np.sum(series["expired"])), "service_level": (fulfilled_total / demand_total) if demand_total else 1.0, "avg_on_hand": float(np.mean(series["on_hand"])), "order_spend": order_spend, } series["cum_cash"] = list(np.cumsum(series["net_cash"])) return series, kpis # ------------------------------------------------------------------ # Arena: run both universes on the same demand, build an event log # ------------------------------------------------------------------ def run_arena(sku_id: str | None = None, horizon_days: int = 180, vbp_shock_day: int = 90, seed: int = 42, inject_vbp: bool = True) -> dict: sku = load_sku(sku_id) demand = simulate_demand(sku, horizon_days, seed, inject_vbp, vbp_shock_day) holding_daily = HOLDING_RATE_ANNUAL / 365.0 a_series, a_kpis = _run_universe(sku, demand, NaivePolicy(sku, vbp_shock_day, inject_vbp), inject_vbp, vbp_shock_day, holding_daily) b_series, b_kpis = _run_universe(sku, demand, AdaptivePolicy(sku, vbp_shock_day, inject_vbp), inject_vbp, vbp_shock_day, holding_daily) events = [] if inject_vbp and sku.vbp_flag: events.append((vbp_shock_day, "VBP shock", f"~{int(VBP_PRICE_DROP*100)}% price cut; +{int(VBP_VOLUME_UPLIFT*100)}% demand spillover")) for label, series in (("A", a_series), ("B", b_series)): unmet = np.array(series["unmet"]) for day in np.where(unmet > 0)[0]: events.append((int(day), f"Universe {label} stockout", f"{int(unmet[day])} units unmet")) exp = np.array(series["expired"]) for day in np.where(exp > 0)[0]: events.append((int(day), f"Universe {label} expiry", f"{int(exp[day])} units written off")) events.sort(key=lambda e: e[0]) return { "sku": sku, "demand": demand, "A": {"series": a_series, "kpis": a_kpis}, "B": {"series": b_series, "kpis": b_kpis}, "events": events, "config": {"horizon_days": horizon_days, "vbp_shock_day": vbp_shock_day, "seed": seed, "inject_vbp": inject_vbp}, "delta": {k: b_kpis[k] - a_kpis[k] for k in a_kpis}, } if __name__ == "__main__": res = run_arena() s = res["sku"] print(f"SKU {s.sku_id} ({s.name}, {s.demand_class}, VBP={s.vbp_flag})") for u in ("A", "B"): k = res[u]["kpis"] print(f" Universe {u}: margin ¥{k['gross_margin']:>12,.0f} | " f"service {k['service_level']*100:5.1f}% | stockout-days {k['stockout_days']:3d} | " f"expiry {k['expiry_units']:5d} | avg on-hand {k['avg_on_hand']:.0f}") d = res["delta"] print(f" Delta (B-A): margin ¥{d['gross_margin']:+,.0f} | " f"service {d['service_level']*100:+.1f}pp | stockout-days {d['stockout_days']:+d}")