"""SimMart environment — multi-agent tier-2 Indian retail simulation (OpenEnv). 1 step = 1 week (7 daily ticks inside). 1 episode = 13 weeks = 90 days = 1 quarter. Flow per step(action): 1. Score this week's decisions against rogue tells (mark caught rogues) 2. Execute approved proposals (mutate ledger; buffer next-week multipliers) 3. Consume pending weekly effects (revenue/margin/NPS multipliers) 4. Tick 7 days: • competitor events (once at week start) • for each day: crisis activation → demand/supply/SLA → daily ledger tick 5. Aggregate weekly KPIs, update NPS/basket/footfall/repeat 6. Compute weekly reward (grader.weekly_reward) 7. Record in state.history + cache next-week's inbox, crises, etc. 8. Return the next-week's SimMartObservation At reset: • Seed RNG • Create initial ledger • Sample per-dept drifts (base ± jitter) • Schedule crises + rogues for the episode • Generate week-1 inbox • Return week-1 SimMartObservation """ from __future__ import annotations import random import uuid from typing import Any, Dict, List, Optional, Tuple from openenv.core.env_server import Environment try: from ..models import ( CompanyLedger, CompetitorEvent, Complaint, CrisisEvent, ExecutiveDiligenceFinding, ExecutiveDiligenceRequest, KPISnapshot, PnLSnapshot, Proposal, ProposalDecision, RogueIncident, SimMartAction, SimMartObservation, SimMartState, WeeklyDecision, ) from . import crises as CR from . import demand as DMD from . import departments as DEP from . import economics as E from . import grader as GR from . import ledger as LD from . import proposals as PROP from . import rogue as RG except (ImportError, ModuleNotFoundError): from models import ( CompanyLedger, CompetitorEvent, Complaint, CrisisEvent, ExecutiveDiligenceFinding, ExecutiveDiligenceRequest, KPISnapshot, PnLSnapshot, Proposal, ProposalDecision, RogueIncident, SimMartAction, SimMartObservation, SimMartState, WeeklyDecision, ) from server import crises as CR from server import demand as DMD from server import departments as DEP from server import economics as E from server import grader as GR from server import ledger as LD from server import proposals as PROP from server import rogue as RG class SimMartEnvironment( Environment[SimMartAction, SimMartObservation, SimMartState] ): SUPPORTS_CONCURRENT_SESSIONS = True MAX_WEEKS: int = E.WEEKS_PER_QUARTER DAYS_PER_QUARTER: int = E.DAYS_PER_QUARTER def __init__(self): super().__init__() self._rng: random.Random = random.Random(0) self._rng_seed: int = 0 self._state: SimMartState = SimMartState() self._episode_index: int = 0 # for curriculum lookup self._min_cash_reached: float = 0.0 # Per-episode accumulators self._competitor_events_window: List[CompetitorEvent] = [] self._pending_complaints: List[Complaint] = [] self._last_journal_entry: str = "" self._last_kpi_snapshot: Optional[KPISnapshot] = None self._current_inbox: List[Proposal] = [] self._current_active_crises: List[CrisisEvent] = [] self._pending_diligence_findings: List[ExecutiveDiligenceFinding] = [] self._schema_hash_cache: str = PROP.schema_hash() # ----------------------------------------------------------------------- # Reset # ----------------------------------------------------------------------- def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> SimMartObservation: self._rng_seed = int(seed) if seed is not None else random.randint(0, 2**31 - 1) self._rng = random.Random(self._rng_seed) # Pick up curriculum (test override via kwargs; default from running count) self._episode_index = int(kwargs.get("episode_index", self._episode_index + 1)) phase = E.curriculum_for_episode(self._episode_index) # Seed ledger ledger = LD.create_initial_ledger(self._rng) # Sample dept drifts around base ± jitter drifts: Dict[str, float] = {} for dept, base in E.DEPT_BASE_DRIFT.items(): drifts[dept] = max( 0.0, min(1.0, base + self._rng.uniform(-E.DEPT_DRIFT_JITTER, E.DEPT_DRIFT_JITTER)), ) # Schedule crises + rogues for the whole episode crisis_queue = CR.schedule_crises( self._rng, crisis_prob=phase["crisis_prob_per_ep"], dept_drifts=drifts, cities=ledger.cities, ) rogues = RG.schedule_rogues( self._rng, rogue_prob=phase["rogue_prob_per_ep"], dept_drifts=drifts, cities=ledger.cities, ) # Seed state self._state = SimMartState( episode_id=episode_id or str(uuid.uuid4()), day=0, week=0, rng_seed=self._rng_seed, company=ledger, dept_drifts=drifts, crisis_queue=crisis_queue, rogue_incidents=rogues, history=[], pending_diligence_findings=[], ) # Reset transient accumulators self._competitor_events_window = [] self._pending_complaints = [] self._last_journal_entry = "" self._pending_diligence_findings = [] self._last_kpi_snapshot = KPISnapshot( revenue_inr=E.BASELINE_WEEKLY_REVENUE_INR, gross_margin_pct=E.STARTING_BLENDED_MARGIN_PCT, stockout_rate_pct=E.STARTING_STOCKOUT_PCT, nps=E.STARTING_NPS, cash_inr=ledger.cash_inr, shrinkage_pct=E.STARTING_SHRINKAGE_PCT, delivery_sla_hit_rate_pct=E.STARTING_SLA_HIT_RATE_PCT, basket_size_inr=E.STARTING_BASKET_SIZE_INR, footfall_per_store=E.STARTING_FOOTFALL_PER_STORE, repeat_purchase_rate_pct=E.STARTING_REPEAT_PURCHASE_PCT, ) ledger.kpi_history.append(self._last_kpi_snapshot) self._min_cash_reached = ledger.cash_inr self._schema_hash_cache = PROP.schema_hash() # Generate week-1 inbox self._state.week = 1 self._state.day = 0 inbox = self._generate_weekly_inbox(week=1) self._current_inbox = inbox self._current_active_crises = [] return self._build_observation( step_type="weekly_decision", week=1, inbox=inbox, reward=None, done=False, message=self._narrative_for_week(1, crisis_queue, rogues), ) # ----------------------------------------------------------------------- # Step # ----------------------------------------------------------------------- def step( self, action: SimMartAction, timeout_s: Optional[float] = None, **kwargs: Any, ) -> SimMartObservation: ledger = self._state.company prev_week = self._state.week current_inbox = list(self._current_inbox) # 1. Mark caught rogues from flag_suspicious verdicts rogue_metrics = RG.mark_caught( self._state.rogue_incidents, prev_week, action.decisions, current_inbox, ) # 2. Execute approved / modified proposals exec_tel = LD.execute_approved_proposals( ledger, current_inbox, action.decisions, self._rng, ) # 2b. Process CEO-level diligence escalations. These do not hide basic # KPI/P&L visibility; they spend scarce staff bandwidth on deeper # forensic review and surface findings in the next executive brief. diligence_tel = self._process_diligence_requests( action.diligence_requests, current_inbox, prev_week, ) if diligence_tel["cost_inr"] > 0: ledger.cash_inr -= diligence_tel["cost_inr"] ledger.pnl_qtd.opex_qtd_inr += diligence_tel["cost_inr"] # 3. Consume pending weekly effect buffer pending = LD.consume_pending_effects(ledger) pending_rev_mult = pending["revenue_mult"] pending_margin_delta = pending["margin_delta_pts"] pending_nps_delta = pending["nps_delta"] pending_sla_delta = pending["sla_delta_pts"] # 4. Run 7 daily ticks for this week daily_tel_list: List[Dict[str, Any]] = [] # Competitor events happen once per week new_comp = DMD.competitor_weekly_events(ledger, prev_week, self._rng) self._competitor_events_window.extend(new_comp) # Decay competitor events older than 3 weeks self._competitor_events_window = [ c for c in self._competitor_events_window if c.week >= prev_week - 3 ] week_start_day = (prev_week - 1) * 7 + 1 for offset in range(7): d = week_start_day + offset if d > self.DAYS_PER_QUARTER: break # Activate any crises that fire today firing, expired = CR.tick_crisis_active(self._state.crisis_queue, d) active = CR.active_crises_now(self._state.crisis_queue) effects = CR.crisis_effects_today(active) # Apply one-shot cash bump from newly-firing crises for c in firing: cash_bump = float((c.affected or {}).get("cash_bump_inr", 0.0)) if cash_bump != 0.0: ledger.cash_inr += cash_bump # Determine today's exogenous demand share_drain = DMD.active_share_drain_pct(self._competitor_events_window, prev_week) # Apply crisis share-drain bump share_drain = min(15.0, share_drain + float(effects.get("share_drain_bump_pct", 0.0))) cat_demand = DMD.customer_daily_demand( ledger=ledger, day_of_quarter=d, nps=self._last_kpi_snapshot.nps, share_drain_pct=share_drain, active_crises=active, rng=self._rng, pending_revenue_mult=pending_rev_mult, ) sla_hit = DMD.rider_daily_sla_hit_rate(d, active, self._rng) tel = LD.tick_one_day( ledger=ledger, day_of_quarter=d, category_demand_units=cat_demand, sla_hit_rate_pct=sla_hit, crisis_extra_opex_inr=float(effects.get("opex_bump_inr", 0.0)), rng=self._rng, ) daily_tel_list.append(tel) self._min_cash_reached = min(self._min_cash_reached, ledger.cash_inr) self._state.day = d # 5. Weekly KPI aggregation weekly_revenue = sum(t["revenue_inr"] for t in daily_tel_list) weekly_cogs = sum(t["cogs_inr"] for t in daily_tel_list) weekly_opex = sum(t["opex_inr"] for t in daily_tel_list) weekly_sla = ( sum(t["sla_hit_rate_pct"] for t in daily_tel_list) / max(1, len(daily_tel_list)) ) weekly_stockout = ( sum(t["stockout_rate_pct"] for t in daily_tel_list) / max(1, len(daily_tel_list)) ) weekly_shrinkage_value = sum(t["shrinkage_value_inr"] for t in daily_tel_list) weekly_shrinkage_pct = ( weekly_shrinkage_value / max(1.0, weekly_revenue) * 100.0 if weekly_revenue > 0 else E.STARTING_SHRINKAGE_PCT ) prev_nps = self._last_kpi_snapshot.nps high_sev_complaints = sum(1 for c in self._pending_complaints if c.severity == "high") new_nps = DMD.update_weekly_nps( prev_nps=prev_nps, stockout_rate_pct=weekly_stockout, sla_hit_rate_pct=weekly_sla, pending_nps_delta=pending_nps_delta, high_severity_complaints=high_sev_complaints, rng=self._rng, ) festival_weight = DMD.festival_weight_for_week(prev_week) new_basket = DMD.update_weekly_basket_size( self._last_kpi_snapshot.basket_size_inr, weekly_stockout, festival_weight, self._rng, ) new_footfall = DMD.update_weekly_footfall( self._last_kpi_snapshot.footfall_per_store, DMD.active_share_drain_pct(self._competitor_events_window, prev_week), festival_weight, weekly_stockout, self._rng, ) new_repeat = DMD.update_weekly_repeat_purchase( self._last_kpi_snapshot.repeat_purchase_rate_pct, new_nps, pending_loyalty_boost=0.0, rng=self._rng, ) snap = LD.snapshot_weekly_kpis( ledger=ledger, weekly_revenue=weekly_revenue, weekly_cogs=weekly_cogs, weekly_stockout_rate_pct=weekly_stockout, weekly_shrinkage_pct=weekly_shrinkage_pct, weekly_sla_hit_rate_pct=max(45.0, min(99.0, weekly_sla + pending_sla_delta)), weekly_nps=new_nps, weekly_basket_inr=new_basket, weekly_footfall_per_store=new_footfall, weekly_repeat_purchase_pct=new_repeat, ) # Apply pending margin delta to the snap (reflects promotional margin drag) snap.margin_delta_pts = snap.margin_delta_pts + pending_margin_delta self._last_kpi_snapshot = snap # 6. Weekly reward weekly_r, components = GR.weekly_reward( kpi_snapshot=snap, decisions=action.decisions, inbox=current_inbox, rogue_metrics=rogue_metrics, journal_entry=action.journal_entry, prev_journal_entry=self._last_journal_entry, ) self._last_journal_entry = action.journal_entry # 7. Record in history self._state.history.append(WeeklyDecision( week=prev_week, decisions=action.decisions, budget_allocations=action.budget_allocations, diligence_requests=action.diligence_requests, diligence_findings=diligence_tel["findings"], journal_entry=action.journal_entry, weekly_reward=weekly_r, reward_components={k: v for k, v in components.items() if k.startswith("weighted.") or k == "total"}, kpi_snapshot=snap, rogues_active=[r.rogue_id for r in RG.active_this_week(self._state.rogue_incidents, prev_week)], rogues_caught=[r.rogue_id for r in self._state.rogue_incidents if r.caught and prev_week in r.active_weeks], )) # 8. Generate franchise complaints for the NEXT week's observation stockout_by_cat = {"aggregate": weekly_stockout} # simplified self._pending_complaints = DMD.franchisee_weekly_complaints( ledger=ledger, week_of_quarter=prev_week + 1, stockout_rate_by_category=stockout_by_cat, sla_hit_rate_pct=weekly_sla, rng=self._rng, ) # 9. Determine next week / terminal next_week = prev_week + 1 done = next_week > self.MAX_WEEKS self._state.week = next_week if not done else prev_week self._state.step_count = prev_week if done: # Terminal reward term_r, term_components = GR.terminal_reward(ledger, self._min_cash_reached) total_reward = weekly_r + term_r self._current_inbox = [] self._current_active_crises = CR.active_crises_now(self._state.crisis_queue) return self._build_observation( step_type="quarterly_close", week=prev_week, inbox=[], reward=total_reward, done=True, message=self._terminal_narrative(ledger, term_components, rogue_metrics), ) # 10. Build next-week's inbox + observation inbox_next = self._generate_weekly_inbox(next_week) self._current_inbox = inbox_next self._current_active_crises = CR.active_crises_now(self._state.crisis_queue) return self._build_observation( step_type="weekly_decision", week=next_week, inbox=inbox_next, reward=weekly_r, done=False, message=self._narrative_for_week(next_week, self._state.crisis_queue, self._state.rogue_incidents), ) # ----------------------------------------------------------------------- # State + close # ----------------------------------------------------------------------- @property def state(self) -> SimMartState: return self._state def close(self) -> None: pass # ----------------------------------------------------------------------- # Inbox + rogue overlay # ----------------------------------------------------------------------- def _generate_weekly_inbox(self, week: int) -> List[Proposal]: active_crises = CR.active_crises_now(self._state.crisis_queue) base = DEP.generate_weekly_proposals( ledger=self._state.company, active_crises=active_crises, week=week, dept_drifts=self._state.dept_drifts, rng=self._rng, crisis_queue=self._state.crisis_queue, ) rogues_now = RG.active_this_week(self._state.rogue_incidents, week) overlaid = RG.inject_rogue_proposals( base_proposals=base, active_rogues=rogues_now, week=week, ledger=self._state.company, rng=self._rng, ) return overlaid # ----------------------------------------------------------------------- # Executive diligence # ----------------------------------------------------------------------- def _process_diligence_requests( self, requests: List[ExecutiveDiligenceRequest], inbox: List[Proposal], week: int, ) -> Dict[str, Any]: budget = E.EXECUTIVE_DILIGENCE_REQUESTS_PER_WEEK cost_per = E.EXECUTIVE_DILIGENCE_COST_INR findings: List[ExecutiveDiligenceFinding] = [] inbox_by_id = {p.proposal_id: p for p in inbox} active_rogues = RG.active_this_week(self._state.rogue_incidents, week) rogue_pids = { pid for rogue in active_rogues for pid in rogue.associated_proposal_ids } for idx, req in enumerate(requests): request_id = req.request_id or f"DIL-W{week:02d}-{idx + 1}" if idx >= budget: findings.append(ExecutiveDiligenceFinding( request_id=request_id, request_type=req.request_type, proposal_id=req.proposal_id, dept=req.dept, status="capacity_exceeded", risk_level="med", summary=( "Escalation not completed: CEO diligence bandwidth was already " f"used for {budget} request(s) this week." ), suggested_action="Prioritize the riskiest proposals for diligence next week.", cost_inr=0.0, )) continue finding = self._build_diligence_finding( req=req, request_id=request_id, proposal=inbox_by_id.get(req.proposal_id), rogue_pids=rogue_pids, cost_inr=cost_per, ) findings.append(finding) completed_cost = sum(f.cost_inr for f in findings if f.status == "completed") self._pending_diligence_findings = findings self._state.pending_diligence_findings = findings return {"findings": findings, "cost_inr": completed_cost} def _build_diligence_finding( self, req: ExecutiveDiligenceRequest, request_id: str, proposal: Optional[Proposal], rogue_pids: set, cost_inr: float, ) -> ExecutiveDiligenceFinding: ledger = self._state.company if req.request_type in {"cashflow_stress_test", "cfo_variance_note"} and not req.proposal_id: cash_cr = ledger.cash_inr / 1e7 loc_used_pct = ( ledger.line_of_credit_drawn / max(1.0, ledger.line_of_credit_limit) * 100.0 ) risk = "high" if ledger.cash_inr < 0.25 * E.STARTING_CASH_INR else "med" if ledger.cash_inr < 0.5 * E.STARTING_CASH_INR else "low" return ExecutiveDiligenceFinding( request_id=request_id, request_type=req.request_type, proposal_id="", dept=req.dept or "finance", status="completed", risk_level=risk, summary=( f"Finance escalation complete: cash is ₹{cash_cr:+.2f} Cr, " f"LoC utilization is {loc_used_pct:.0f}%, and QTD EBITDA margin is " f"{ledger.pnl_qtd.ebitda_margin_pct:+.1f}%." ), evidence={ "cash_inr": ledger.cash_inr, "line_of_credit_drawn": ledger.line_of_credit_drawn, "ebitda_margin_pct": ledger.pnl_qtd.ebitda_margin_pct, }, suggested_action="Preserve cash buffer before approving discretionary growth or capex proposals.", cost_inr=cost_inr, ) if proposal is None: return ExecutiveDiligenceFinding( request_id=request_id, request_type=req.request_type, proposal_id=req.proposal_id, dept=req.dept, status="invalid_request", risk_level="low", summary="Escalation could not be completed because the proposal id was not in this week's CEO inbox.", suggested_action="Use proposal IDs exactly as shown in the weekly inbox.", cost_inr=0.0, ) risk_level, evidence, suggested = self._proposal_diligence_risk(proposal, proposal.proposal_id in rogue_pids) summary = ( f"{req.request_type} completed for {proposal.proposal_id} " f"({proposal.dept}.{proposal.action}). Risk assessed as {risk_level}." ) if risk_level == "high": summary += " Escalation found evidence that merits flagging or rejecting similar future proposals." elif risk_level == "med": summary += " Escalation found some pressure points; approve only with tighter controls." else: summary += " Escalation did not find a material control issue." return ExecutiveDiligenceFinding( request_id=request_id, request_type=req.request_type, proposal_id=proposal.proposal_id, dept=proposal.dept, status="completed", risk_level=risk_level, summary=summary, evidence=evidence, suggested_action=suggested, cost_inr=cost_inr, ) def _proposal_diligence_risk( self, proposal: Proposal, is_active_rogue: bool, ) -> Tuple[str, Dict[str, Any], str]: params = proposal.params or {} evidence: Dict[str, Any] = { "cost_inr": proposal.cost_inr, "urgency": proposal.urgency, } if is_active_rogue: evidence["rogue_pattern_detected"] = True return ( "high", evidence, "Flag or reject similar proposals and require finance/audit sign-off before future approval.", ) suspicious_vendor = str(params.get("vendor_id", "")).startswith("V-SUSPICIOUS-") unit_cost = float(params.get("unit_cost", 0.0) or 0.0) sku_id = params.get("sku_id") sku_cost = 0.0 if sku_id in self._state.company.sku_catalogue: sku_cost = float(self._state.company.sku_catalogue[sku_id]["cost_inr"]) cost_uplift = (unit_cost / sku_cost - 1.0) if sku_cost and unit_cost else 0.0 qty = float(params.get("qty", 0.0) or 0.0) baseline_qty = float(params.get("inventory_baseline_qty", 0.0) or 0.0) qty_multiple = (qty / baseline_qty) if baseline_qty > 0 else 0.0 evidence.update({ "suspicious_vendor": suspicious_vendor, "unit_cost_uplift_pct": round(cost_uplift * 100.0, 1), "qty_multiple_vs_baseline": round(qty_multiple, 2) if qty_multiple else 0.0, }) if suspicious_vendor or cost_uplift > 0.15 or qty_multiple > 2.0: return ( "high", evidence, "Escalate to audit before approving future proposals with the same vendor, SKU, or cost pattern.", ) if proposal.urgency == "high" or abs(proposal.cost_inr) > 2e6: return ( "med", evidence, "Approve only if the department can tie the spend to stockout, SLA, or cash protection.", ) return ( "low", evidence, "No special follow-up needed beyond normal weekly KPI review.", ) # ----------------------------------------------------------------------- # Observation builder # ----------------------------------------------------------------------- def _build_observation( self, step_type: str, week: int, inbox: List[Proposal], reward: Optional[float], done: bool, message: str, ) -> SimMartObservation: active = CR.active_crises_now(self._state.crisis_queue) return SimMartObservation( done=done, reward=reward, step_type=step_type, day_of_quarter=self._state.day, week_of_quarter=week, kpi_snapshot=self._last_kpi_snapshot or KPISnapshot(), pnl_snapshot=self._state.company.pnl_qtd, inbox=inbox, active_crises=active, franchise_complaints=list(self._pending_complaints), competitor_events=list(self._competitor_events_window), executive_diligence_findings=list(self._pending_diligence_findings), diligence_budget_remaining=E.EXECUTIVE_DILIGENCE_REQUESTS_PER_WEEK, schema_hash=self._schema_hash_cache, last_journal=self._last_journal_entry, task_description=self._task_description(week, active), message=message, output=message, ) # ----------------------------------------------------------------------- # Narrative helpers # ----------------------------------------------------------------------- def _task_description(self, week: int, active: List[CrisisEvent]) -> str: head = f"Week {week}/{self.MAX_WEEKS} of SimMart's festive quarter in tier-2 India." if active: crisis_names = ", ".join(f"{c.crisis_id} {c.name}" for c in active) return f"{head} Currently active: {crisis_names}. Review the inbox and decide." return f"{head} Review the inbox, decide per proposal, allocate budget, log the journal." def _narrative_for_week( self, week: int, crisis_queue: List[CrisisEvent], rogues: List[RogueIncident], ) -> str: upcoming = [ c for c in crisis_queue if c.started_day > self._state.day and c.started_day <= self._state.day + 14 and not c.active ] bits = [f"Week {week} begins."] if upcoming: bits.append( "On the horizon: " + ", ".join(f"{c.name} (~day {c.started_day})" for c in upcoming[:2]) + "." ) return " ".join(bits) def _terminal_narrative( self, ledger: CompanyLedger, term_components: Dict[str, float], rogue_metrics: Dict[str, Any], ) -> str: pnl = ledger.pnl_qtd caught_meta = RG.episode_accuracy(self._state.rogue_incidents) return ( f"Quarter closed. Revenue ₹{pnl.revenue_qtd_inr/1e7:.2f} Cr, " f"EBITDA ₹{pnl.ebitda_qtd_inr/1e7:+.2f} Cr ({pnl.ebitda_margin_pct:+.1f}%), " f"final cash ₹{ledger.cash_inr/1e7:+.2f} Cr, " f"min cash reached ₹{self._min_cash_reached/1e7:+.2f} Cr. " f"Rogue catches: {caught_meta['caught']}/{caught_meta['total_rogues']} " f"(recall {caught_meta['recall']:.0%})." )