"""SimMart pydantic models — full §4.1 schema. Layers: • Leaf types : ProposalDecision, Proposal, CrisisEvent, RogueIncident, Complaint, CompetitorEvent, InterDeptMessage, KPISnapshot, PnLSnapshot, WeeklyDecision, CompanyLedger • OpenEnv contract : SimMartAction, SimMartObservation, SimMartState Design notes: • All types are JSON-serialisable (pydantic v2). No complex runtime objects. • Rogue *tells* live in `Proposal.params` (plain fields the CEO can inspect); the dedicated `RogueIncident.tell` dict is the ground-truth cheat sheet the CEO never sees — it lives in State only. • InterDeptMessage + Proposal.thread are included now to unblock Enhancement A (coalitions) later; default empty so core training doesn't depend on them. • Dept identifiers are string constants: "supply_chain", "store_ops", "finance", "growth", "expansion". • schema_hash is surfaced in the Observation so a Patronus-style schema-drift crisis can mutate field names mid-quarter (Onsite stretch). """ from __future__ import annotations from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field from openenv.core.env_server.types import Action, Observation, State # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DEPTS: tuple[str, ...] = ( "supply_chain", "store_ops", ) VERDICTS: tuple[str, ...] = ( "approve", "reject", "flag_suspicious", ) ACTION_TYPES: tuple[str, ...] = ("decide", "journal", "noop") DILIGENCE_REQUEST_TYPES: tuple[str, ...] = ( "vendor_audit", "supplier_quote_comparison", "ops_root_cause", "cfo_variance_note", "cashflow_stress_test", "campaign_postmortem", ) STEP_TYPES: tuple[str, ...] = ( "weekly_decision", "daily_update", "quarterly_close", ) # --------------------------------------------------------------------------- # Leaf types # --------------------------------------------------------------------------- class ProposalDecision(BaseModel): """CEO's verdict on a single proposal.""" proposal_id: str = Field(..., description="Matches a Proposal.proposal_id from the inbox") verdict: str = Field( ..., description=f"One of {VERDICTS}", ) modified_params: Dict[str, Any] = Field( default_factory=dict, description="If verdict == 'modify', the params that override the original proposal", ) reasoning: str = Field(default="", description="Short per-decision rationale") flag_reason: str = Field(default="", description="If verdict == 'flag_suspicious', why") class InterDeptMessage(BaseModel): """Inter-department message attached to a Proposal (Enhancement A). Kept in schema from day one so turning on Enhancement A later does not require observation-schema migration. """ from_dept: str kind: str = Field(..., description="endorse | attack | co_sign | request_hold | counter_propose") text: str = Field(default="") class ExecutiveDiligenceRequest(BaseModel): """CEO-level escalation for deeper diligence on a proposal or operating risk. This is not low-level dashboard access. The CEO already sees KPIs/P&L; a diligence request spends scarce executive/staff bandwidth on forensic review, cross-functional follow-up, or audit-level scrutiny. """ request_id: str = Field(default="", description="Optional caller-supplied id; environment fills one if absent") request_type: str = Field(default="vendor_audit", description=f"One of {DILIGENCE_REQUEST_TYPES}") proposal_id: str = Field(default="", description="Optional proposal id being escalated") dept: str = Field(default="", description="Optional department focus") rationale: str = Field(default="", description="Why the CEO is escalating this item") class ExecutiveDiligenceFinding(BaseModel): """Board-style finding returned after a prior-week CEO escalation.""" request_id: str = "" request_type: str = "" proposal_id: str = "" dept: str = "" status: str = Field(default="completed", description="completed | invalid_request | capacity_exceeded") risk_level: str = Field(default="low", description="low | med | high") summary: str = "" evidence: Dict[str, Any] = Field(default_factory=dict) suggested_action: str = Field(default="", description="CEO-level recommendation for similar future proposals") cost_inr: float = Field(default=0.0, description="Estimated staff/executive bandwidth cost") class Proposal(BaseModel): """A single weekly proposal from a department to the CEO.""" proposal_id: str = Field(..., description="e.g. 'S-07' (supply_chain, week-7 running index)") dept: str = Field(..., description=f"One of {DEPTS}") action: str = Field( ..., description=( "Action namespace.action — e.g. 'po.place', 'vendor.switch', 'staff.schedule', " "'budget.reallocate', 'campaign.launch', 'city.enter', 'franchise.onboard'." ), ) params: Dict[str, Any] = Field( default_factory=dict, description="Action-specific fields; rogue tells (inflated qty, cost uptick, etc.) live here", ) cost_inr: float = Field( default=0.0, description="Estimated signed ₹ impact this week (negative = cost, positive = revenue/recovery)", ) urgency: str = Field(default="med", description="low | med | high") reasoning: str = Field( default="", description="Department's free-text justification (can contain lobbying/hidden-info language)", ) week_submitted: int = Field(default=0, description="Simulation week this proposal was filed") thread: List[InterDeptMessage] = Field( default_factory=list, description="Enhancement A: inter-dept endorsements/attacks/co-signs. Empty by default", ) class CrisisEvent(BaseModel): """A currently-firing crisis (one of C1–C10).""" crisis_id: str = Field(..., description="C1..C10") name: str = Field(..., description="Human-readable name, e.g. 'Diwali demand surge'") started_day: int = Field(..., description="Day-of-quarter the crisis began (1..90)") duration_days: int = Field(..., description="Scheduled duration") severity: str = Field(default="med", description="low | med | high") affected: Dict[str, Any] = Field( default_factory=dict, description="{region, category, sku_ids, store_ids, ...}", ) active: bool = Field(default=False, description="True while started_day <= today < started_day+duration_days") description: str = Field(default="", description="Narrative for CEO-facing observation") class RogueIncident(BaseModel): """A rogue scenario (one of R1–R12). Ground-truth record; `tell` is hidden from CEO observations. The tell manifests in live `Proposal.params` fields through the rogue scenario's proposal generator. """ rogue_id: str = Field(..., description="R1..R12") dept: str = Field(..., description="Which department is rogue") scenario: str = Field(..., description="Human-readable scenario name") active_weeks: List[int] = Field( default_factory=list, description="Weeks (1..13) during which the rogue scenario emits its telltale proposals", ) tell: Dict[str, Any] = Field( default_factory=dict, description="Structured ground-truth signature (hidden from observation)", ) associated_proposal_ids: List[str] = Field( default_factory=list, description="Proposals emitted by this rogue scenario", ) caught: bool = Field(default=False, description="True once CEO correctly flag_suspicious's an associated proposal") class Complaint(BaseModel): """A franchisee complaint surfaced into the weekly observation.""" franchise_id: str city: str issue: str severity: str = Field(default="med", description="low | med | high") week_filed: int class CompetitorEvent(BaseModel): """A competitor-driven event (pricing attack, city entry, dark store opening).""" competitor: str = Field(..., description="JioMart | Blinkit | Zepto | DMart | ...") event_type: str = Field( ..., description="price_cut | city_entry | dark_store_open | loyalty_push | bulk_ad", ) region: str = Field(default="") impact_pct: float = Field(default=0.0, description="Estimated monthly share impact") week: int = Field(default=0) description: str = Field(default="") class KPISnapshot(BaseModel): """Weekly KPIs (absolutes + deltas vs prior week).""" revenue_inr: float = 0.0 gross_margin_pct: float = 0.0 stockout_rate_pct: float = 0.0 nps: float = 0.0 cash_inr: float = 0.0 shrinkage_pct: float = 0.0 delivery_sla_hit_rate_pct: float = 0.0 basket_size_inr: float = 0.0 footfall_per_store: float = 0.0 repeat_purchase_rate_pct: float = 0.0 revenue_delta_pct: float = 0.0 margin_delta_pts: float = 0.0 stockout_delta_pts: float = 0.0 nps_delta: float = 0.0 sla_delta_pts: float = 0.0 class PnLSnapshot(BaseModel): """Quarter-to-date P&L.""" revenue_qtd_inr: float = 0.0 cogs_qtd_inr: float = 0.0 opex_qtd_inr: float = 0.0 ebitda_qtd_inr: float = 0.0 ebitda_margin_pct: float = 0.0 cash_delta_qtd_inr: float = 0.0 class CompanyLedger(BaseModel): """Company-wide mutable state (cash, inventory, stores, franchisees, SKUs, rolling P&L).""" # Cash cash_inr: float = Field(default=0.0) line_of_credit_limit: float = Field(default=0.0) line_of_credit_drawn: float = Field(default=0.0) # Inventory: sku_id -> {qty:int, value_inr:float, avg_age_days:float} inventory: Dict[str, Dict[str, float]] = Field(default_factory=dict) # Stores: list of {store_id, city, sqft, status} stores: List[Dict[str, Any]] = Field(default_factory=list) cities: List[str] = Field(default_factory=list) # Franchisees: list of {franchise_id, store_id, city, health_score, complaints_open} franchisees: List[Dict[str, Any]] = Field(default_factory=list) # SKU catalogue (static after reset): sku_id -> {name, category, cost_inr, price_inr, margin_pct, perishable} sku_catalogue: Dict[str, Dict[str, Any]] = Field(default_factory=dict) # Rolling KPI history (last ~8 weeks, for delta calc) kpi_history: List[KPISnapshot] = Field(default_factory=list) # Rolling P&L pnl_qtd: PnLSnapshot = Field(default_factory=PnLSnapshot) # Streaks (for Enhancement B drift dynamics; populated even when flag off) dept_approval_streak: Dict[str, int] = Field(default_factory=dict) dept_rejection_streak: Dict[str, int] = Field(default_factory=dict) class WeeklyDecision(BaseModel): """An audit record of the CEO's action + the environment's response that week.""" week: int decisions: List[ProposalDecision] = Field(default_factory=list) budget_allocations: Dict[str, float] = Field(default_factory=dict) diligence_requests: List[ExecutiveDiligenceRequest] = Field(default_factory=list) diligence_findings: List[ExecutiveDiligenceFinding] = Field(default_factory=list) journal_entry: str = "" weekly_reward: float = 0.0 reward_components: Dict[str, float] = Field( default_factory=dict, description="Per-component breakdown (kpi_delta, rogue_catch, false_reject, stockout, journal_coherence, ...)", ) kpi_snapshot: Optional[KPISnapshot] = None rogues_active: List[str] = Field(default_factory=list) rogues_caught: List[str] = Field(default_factory=list) # --------------------------------------------------------------------------- # OpenEnv contract types # --------------------------------------------------------------------------- class SimMartAction(Action): """CEO's weekly action: decisions + budget + journal.""" action_type: str = Field( default="decide", description=f"One of {ACTION_TYPES}", ) decisions: List[ProposalDecision] = Field( default_factory=list, description="Per-proposal verdicts for this week's inbox", ) budget_allocations: Dict[str, float] = Field( default_factory=dict, description="dept -> rupees allocated this week", ) diligence_requests: List[ExecutiveDiligenceRequest] = Field( default_factory=list, description="CEO-level escalations for deeper diligence; limited by weekly executive bandwidth", ) journal_entry: str = Field( default="", description="Founder's Journal text for this week (Mercor token-scaling reward target)", ) class SimMartObservation(Observation): """Weekly observation presented to the CEO.""" step_type: str = Field(default="weekly_decision", description=f"One of {STEP_TYPES}") day_of_quarter: int = Field(default=0, description="1..90") week_of_quarter: int = Field(default=0, description="1..13") kpi_snapshot: KPISnapshot = Field(default_factory=KPISnapshot) pnl_snapshot: PnLSnapshot = Field(default_factory=PnLSnapshot) inbox: List[Proposal] = Field(default_factory=list, description="This week's proposals (4–6 typical)") active_crises: List[CrisisEvent] = Field(default_factory=list) franchise_complaints: List[Complaint] = Field(default_factory=list) competitor_events: List[CompetitorEvent] = Field(default_factory=list) executive_diligence_findings: List[ExecutiveDiligenceFinding] = Field( default_factory=list, description="Findings returned from prior-week CEO diligence escalations", ) diligence_budget_remaining: int = Field( default=0, description="How many CEO-level diligence escalations can be requested this week", ) schema_hash: str = Field( default="", description="sha1[:8] of the current Proposal schema; mutates when schema drift fires (Patronus)", ) last_journal: str = Field(default="", description="CEO's prior-week journal entry, for continuity") task_description: str = Field(default="", description="Episode narrative / current chapter") message: str = Field(default="") output: str = Field( default="", description="Freeform human-readable payload (fallback/echo; may be unused once prompt rendering is done client-side)", ) class SimMartState(State): """Internal SimMart state.""" episode_id: str = Field(default="") day: int = Field(default=0, description="0..90") week: int = Field(default=0, description="0..13") rng_seed: int = Field(default=0) company: CompanyLedger = Field(default_factory=CompanyLedger) dept_drifts: Dict[str, float] = Field( default_factory=dict, description="Per-dept alignment parameter in [0,1]; higher = more self-serving. Hidden from observation.", ) crisis_queue: List[CrisisEvent] = Field( default_factory=list, description="All crises scheduled for this episode (active + future)", ) rogue_incidents: List[RogueIncident] = Field( default_factory=list, description="Rogue scenarios for this episode (hidden from observation)", ) history: List[WeeklyDecision] = Field(default_factory=list, description="Auditable per-week record") pending_diligence_findings: List[ExecutiveDiligenceFinding] = Field( default_factory=list, description="Findings generated by the prior step and surfaced in the next observation", )