"""SimMart reward grader — 4 reward components (mini env). Formula: R_episode = 0.50 × Σ weekly_kpi_delta_score (weekly, returned each step) + 0.30 × Σ rogue_catch_bonus (weekly) - 0.10 × Σ stockout_penalty (weekly) + 0.20 × quarterly_pnl_bonus (terminal only) Weekly reward is assembled from the first 3 components; the environment sums them over the 8 weeks. Terminal reward (quarterly_pnl) is added in the last step. Components stay in [-1, +1] before weighting so the weighted sum also falls in a well-bounded range (≈ [-1.3, +1.2] per episode). Public API: weekly_reward(...) → (float, Dict[str, float]) per-week total + components terminal_reward(...) → (float, Dict[str, float]) episode-end terminal bonus + components kpi_delta_score(kpi) rogue_catch_score(rogue_metrics) false_reject_penalty(decisions, inbox, rogue_pids) stockout_penalty(kpi) journal_coherence_score(journal, decisions, prev_journal) quarterly_pnl_bonus(ledger) cash_floor_penalty(min_cash_inr) """ from __future__ import annotations import math import re from typing import Any, Dict, Iterable, List, Optional, Tuple try: from ..models import ( CompanyLedger, KPISnapshot, Proposal, ProposalDecision, ) from . import economics as E except (ImportError, ModuleNotFoundError): from models import CompanyLedger, KPISnapshot, Proposal, ProposalDecision from server import economics as E # --------------------------------------------------------------------------- # Individual components (each returns a normalised signal in [-1, +1] or [0, 1]) # --------------------------------------------------------------------------- def kpi_delta_score(kpi: KPISnapshot) -> float: """Average of 5 KPI levels measured against the starting baseline target. Per-component score = clip((level - target) / normaliser, -1, +1). Stockout is INVERTED so that *lower* stockouts score positive. Why level (not week-over-week delta)? A delta-based score punishes turbulence: a steady decline scores negative every week even at a constant rate of slip, so the cumulative reward is the integral of decline. With level scoring, a do-nothing-perfectly policy that holds at baseline scores 0/wk; any improvement above baseline scores positive; any deficit scores proportional to its current size, not its derivative. This gives GRPO a well-shaped landscape with a clean zero-point. """ t = E.KPI_TARGETS n = E.KPI_LEVEL_NORMALISERS scores = [ _clip_to_unit((kpi.revenue_inr - t["revenue_inr"]) / n["revenue_inr"]), _clip_to_unit((kpi.gross_margin_pct - t["gross_margin_pct"]) / n["gross_margin_pct"]), -_clip_to_unit((kpi.stockout_rate_pct - t["stockout_rate_pct"]) / n["stockout_rate_pct"]), _clip_to_unit((kpi.nps - t["nps"]) / n["nps"]), _clip_to_unit((kpi.delivery_sla_hit_rate_pct - t["delivery_sla_hit_rate_pct"]) / n["delivery_sla_hit_rate_pct"]), ] return sum(scores) / len(scores) def rogue_catch_score(rogue_metrics: Dict[str, int]) -> float: """Correct flags at +1.0 each; false flags at -0.5 each. Returned value is clipped to [-1, +1] so a single correct catch is meaningful; stacking 3 correct catches caps at +1.0. """ correct = rogue_metrics.get("correct_flags", 0) false = rogue_metrics.get("false_flags", 0) raw = correct * E.ROGUE_CATCH_BONUS + false * E.ROGUE_CATCH_FALSE_FLAG return max(-1.0, min(1.0, raw)) def false_reject_penalty( decisions: List[ProposalDecision], inbox: List[Proposal], rogue_proposal_ids: Optional[Iterable[str]] = None, ) -> float: """Weighted count of non-rogue rejections, normalised by inbox size. Weights: high urgency → 1.0 (most costly to reject incorrectly) med urgency → 0.5 low urgency → 0.1 A pure `reject-all` baseline on an 11-proposal inbox averaging med-urgency scores roughly 0.5 (then −0.3 weight → -0.15 component, potentially worse with the stockout penalty that follows). Oracle CEO: 0.0. """ if not inbox: return 0.0 rogue_ids = set(rogue_proposal_ids or []) inbox_by_id = {p.proposal_id: p for p in inbox} weights = {"high": 1.0, "med": 0.5, "low": 0.1} total = 0.0 for d in decisions: if d.verdict != "reject": continue if d.proposal_id in rogue_ids: continue # rejecting a rogue is fine (rogue=flag_suspicious is preferred but reject not punished) prop = inbox_by_id.get(d.proposal_id) if prop is None: continue total += weights.get(prop.urgency, 0.5) return min(1.0, total / max(1, len(inbox))) def stockout_penalty(kpi: KPISnapshot) -> float: """Return a [0, 1] penalty proportional to stockout rate over 5 %. Per economics.py STOCKOUT_PER_PT_PENALTY = 0.05/pt, so 25 pp over the threshold maxes the penalty at 1.0. """ excess_pts = max(0.0, kpi.stockout_rate_pct - 5.0) return min(1.0, excess_pts * E.STOCKOUT_PER_PT_PENALTY) def journal_coherence_score( journal: str, decisions: List[ProposalDecision], prev_journal: str = "", ) -> float: """Score journal quality ∈ [0, 1]: structural + coherence + length. Structural (0.4): - mentions of at least 2 structural keywords (week, risk, decision, next) - mentions of at least 2 distinct dept names Coherence (0.4): - references at least one proposal ID from decisions (approved or flagged) - optional: references at least one flagged rogue - continuity with prev_journal (a shared noun phrase bumps +0.1) Length (0.2): - sweet spot 150–350 words; 0 below 20 words; plateau in band; taper """ if not journal or not journal.strip(): return 0.0 text = journal lower = text.lower() # --- Structural (0.4 max) --- struct = 0.0 keyword_hits = sum( 1 for kw in ("week", "risk", "decision", "next", "approv", "reject", "flag", "stock", "cash", "margin", "priority", "action") if kw in lower ) struct += 0.5 * min(1.0, keyword_hits / 5.0) depts_seen = sum( 1 for dept_kw in ("supply", "store", "finance", "growth", "expansion", "ops", "marketing") if dept_kw in lower ) struct += 0.5 * min(1.0, depts_seen / 3.0) structural_sub = E.JOURNAL_STRUCTURAL_MAX * struct # up to 0.4 # --- Coherence (0.4 max) --- coherence = 0.0 mentioned_pids = 0 approved_or_flagged = [ d.proposal_id for d in decisions if d.verdict in ("approve", "modify", "flag_suspicious") ] for pid in approved_or_flagged: if pid and pid in text: mentioned_pids += 1 coherence += 0.7 * min(1.0, mentioned_pids / 2.0) if any(d.verdict == "flag_suspicious" for d in decisions): for d in decisions: if d.verdict == "flag_suspicious" and d.proposal_id in text: coherence += 0.2 break if prev_journal: shared = _shared_noun_overlap(lower, prev_journal.lower()) if shared: coherence += 0.1 coherence = min(1.0, coherence) coherence_sub = E.JOURNAL_COHERENCE_MAX * coherence # up to 0.4 # --- Length (0.2 max) --- n_words = len(text.split()) length_sub = E.JOURNAL_LENGTH_MAX * _length_score_triangular(n_words) return min(1.0, structural_sub + coherence_sub + length_sub) def quarterly_pnl_bonus(ledger: CompanyLedger) -> float: """Map QTD EBITDA margin to [-1, +1] (linear, clipped at -10%/+10%). Anchors: -10% → -1.0 (deep cash-burn, floor) -5 % → -0.5 0 % → 0.0 (break-even, neutral) +5 % → +0.5 (at the plan's stretch target) +10% → +1.0 (ceiling) """ margin_pct = ledger.pnl_qtd.ebitda_margin_pct mapped = (margin_pct + 5.0) / 10.0 - 0.5 return max(-1.0, min(1.0, mapped)) def cash_floor_penalty(min_cash_inr: float) -> float: """+1.0 if cash dropped below 0 at any point (applied with negative weight).""" return 1.0 if min_cash_inr < 0 else 0.0 # --------------------------------------------------------------------------- # Weekly + Terminal roll-up # --------------------------------------------------------------------------- def weekly_reward( kpi_snapshot: KPISnapshot, decisions: List[ProposalDecision], inbox: List[Proposal], rogue_metrics: Dict[str, int], journal_entry: str, prev_journal_entry: str = "", ) -> Tuple[float, Dict[str, float]]: """Compute this week's reward and per-component breakdown. Returns: (weekly_reward_float, components_dict) components_dict has raw (pre-weight) + weighted keys. """ raw: Dict[str, float] = { "kpi_delta": kpi_delta_score(kpi_snapshot), "rogue_catch": rogue_catch_score(rogue_metrics), "stockout": stockout_penalty(kpi_snapshot), } w = E.REWARD_WEIGHTS weighted: Dict[str, float] = { "kpi_delta": w["weekly_kpi_delta"] * raw["kpi_delta"], "rogue_catch": w["rogue_catch"] * raw["rogue_catch"], "stockout": w["stockout"] * raw["stockout"], # neg weight } total = sum(weighted.values()) out = {f"raw.{k}": v for k, v in raw.items()} out.update({f"weighted.{k}": v for k, v in weighted.items()}) out["total"] = total return total, out def terminal_reward( ledger: CompanyLedger, min_cash_inr: float, ) -> Tuple[float, Dict[str, float]]: """Terminal reward applied at the final step of the episode.""" raw = { "quarterly_pnl": quarterly_pnl_bonus(ledger), } w = E.REWARD_WEIGHTS weighted = { "quarterly_pnl": w["quarterly_pnl"] * raw["quarterly_pnl"], } total = sum(weighted.values()) out = {f"raw.{k}": v for k, v in raw.items()} out.update({f"weighted.{k}": v for k, v in weighted.items()}) out["total"] = total return total, out # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _clip_to_unit(x: float) -> float: return max(-1.0, min(1.0, x)) def _length_score_triangular(n_words: int) -> float: """1.0 in [150, 350] band, tapering down outside; 0 below 20.""" if n_words < 20: return 0.0 if n_words < 150: return (n_words - 20) / (150 - 20) if n_words <= 350: return 1.0 if n_words < 700: return 1.0 - (n_words - 350) / (700 - 350) return 0.0 _NOUN_TOKEN_RE = re.compile(r"[a-zA-Z]{5,}") def _shared_noun_overlap(a: str, b: str) -> bool: """True if the two lowercased journals share at least one reasonable token. Cheap proxy for narrative continuity (no LLM). """ if not a or not b: return False a_tokens = set(_NOUN_TOKEN_RE.findall(a)) b_tokens = set(_NOUN_TOKEN_RE.findall(b)) stops = {"week", "this", "that", "with", "been", "have", "from", "were", "they", "their", "about", "there", "which", "these", "those", "should", "could", "would", "while"} shared = (a_tokens & b_tokens) - stops return bool(shared)