| """SimMart reward grader β 4 reward components (mini env). |
| |
| Formula: |
| R_episode = |
| 0.50 Γ Ξ£ weekly_kpi_delta_score (weekly, returned each step) |
| + 0.30 Γ Ξ£ rogue_catch_bonus (weekly) |
| - 0.10 Γ Ξ£ stockout_penalty (weekly) |
| + 0.20 Γ quarterly_pnl_bonus (terminal only) |
| |
| Weekly reward is assembled from the first 3 components; the environment sums |
| them over the 8 weeks. Terminal reward (quarterly_pnl) is added in the last |
| step. |
| |
| Components stay in [-1, +1] before weighting so the weighted sum also |
| falls in a well-bounded range (β [-1.3, +1.2] per episode). |
| |
| Public API: |
| weekly_reward(...) β (float, Dict[str, float]) per-week total + components |
| terminal_reward(...) β (float, Dict[str, float]) episode-end terminal bonus + components |
| kpi_delta_score(kpi) |
| rogue_catch_score(rogue_metrics) |
| false_reject_penalty(decisions, inbox, rogue_pids) |
| stockout_penalty(kpi) |
| journal_coherence_score(journal, decisions, prev_journal) |
| quarterly_pnl_bonus(ledger) |
| cash_floor_penalty(min_cash_inr) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import re |
| from typing import Any, Dict, Iterable, List, Optional, Tuple |
|
|
| try: |
| from ..models import ( |
| CompanyLedger, |
| KPISnapshot, |
| Proposal, |
| ProposalDecision, |
| ) |
| from . import economics as E |
| except (ImportError, ModuleNotFoundError): |
| from models import CompanyLedger, KPISnapshot, Proposal, ProposalDecision |
| from server import economics as E |
|
|
|
|
| |
| |
| |
|
|
| def kpi_delta_score(kpi: KPISnapshot) -> float: |
| """Average of 5 KPI levels measured against the starting baseline target. |
| |
| Per-component score = clip((level - target) / normaliser, -1, +1). |
| Stockout is INVERTED so that *lower* stockouts score positive. |
| |
| Why level (not week-over-week delta)? A delta-based score punishes |
| turbulence: a steady decline scores negative every week even at a |
| constant rate of slip, so the cumulative reward is the integral of |
| decline. With level scoring, a do-nothing-perfectly policy that |
| holds at baseline scores 0/wk; any improvement above baseline scores |
| positive; any deficit scores proportional to its current size, not |
| its derivative. This gives GRPO a well-shaped landscape with a |
| clean zero-point. |
| """ |
| t = E.KPI_TARGETS |
| n = E.KPI_LEVEL_NORMALISERS |
| scores = [ |
| _clip_to_unit((kpi.revenue_inr - t["revenue_inr"]) / n["revenue_inr"]), |
| _clip_to_unit((kpi.gross_margin_pct - t["gross_margin_pct"]) / n["gross_margin_pct"]), |
| -_clip_to_unit((kpi.stockout_rate_pct - t["stockout_rate_pct"]) / n["stockout_rate_pct"]), |
| _clip_to_unit((kpi.nps - t["nps"]) / n["nps"]), |
| _clip_to_unit((kpi.delivery_sla_hit_rate_pct - t["delivery_sla_hit_rate_pct"]) / n["delivery_sla_hit_rate_pct"]), |
| ] |
| return sum(scores) / len(scores) |
|
|
|
|
| def rogue_catch_score(rogue_metrics: Dict[str, int]) -> float: |
| """Correct flags at +1.0 each; false flags at -0.5 each. |
| |
| Returned value is clipped to [-1, +1] so a single correct catch is |
| meaningful; stacking 3 correct catches caps at +1.0. |
| """ |
| correct = rogue_metrics.get("correct_flags", 0) |
| false = rogue_metrics.get("false_flags", 0) |
| raw = correct * E.ROGUE_CATCH_BONUS + false * E.ROGUE_CATCH_FALSE_FLAG |
| return max(-1.0, min(1.0, raw)) |
|
|
|
|
| def false_reject_penalty( |
| decisions: List[ProposalDecision], |
| inbox: List[Proposal], |
| rogue_proposal_ids: Optional[Iterable[str]] = None, |
| ) -> float: |
| """Weighted count of non-rogue rejections, normalised by inbox size. |
| |
| Weights: |
| high urgency β 1.0 (most costly to reject incorrectly) |
| med urgency β 0.5 |
| low urgency β 0.1 |
| |
| A pure `reject-all` baseline on an 11-proposal inbox averaging med-urgency |
| scores roughly 0.5 (then β0.3 weight β -0.15 component, potentially |
| worse with the stockout penalty that follows). Oracle CEO: 0.0. |
| """ |
| if not inbox: |
| return 0.0 |
| rogue_ids = set(rogue_proposal_ids or []) |
| inbox_by_id = {p.proposal_id: p for p in inbox} |
| weights = {"high": 1.0, "med": 0.5, "low": 0.1} |
| total = 0.0 |
| for d in decisions: |
| if d.verdict != "reject": |
| continue |
| if d.proposal_id in rogue_ids: |
| continue |
| prop = inbox_by_id.get(d.proposal_id) |
| if prop is None: |
| continue |
| total += weights.get(prop.urgency, 0.5) |
| return min(1.0, total / max(1, len(inbox))) |
|
|
|
|
| def stockout_penalty(kpi: KPISnapshot) -> float: |
| """Return a [0, 1] penalty proportional to stockout rate over 5 %. |
| |
| Per economics.py STOCKOUT_PER_PT_PENALTY = 0.05/pt, so 25 pp over the |
| threshold maxes the penalty at 1.0. |
| """ |
| excess_pts = max(0.0, kpi.stockout_rate_pct - 5.0) |
| return min(1.0, excess_pts * E.STOCKOUT_PER_PT_PENALTY) |
|
|
|
|
| def journal_coherence_score( |
| journal: str, |
| decisions: List[ProposalDecision], |
| prev_journal: str = "", |
| ) -> float: |
| """Score journal quality β [0, 1]: structural + coherence + length. |
| |
| Structural (0.4): |
| - mentions of at least 2 structural keywords (week, risk, decision, next) |
| - mentions of at least 2 distinct dept names |
| Coherence (0.4): |
| - references at least one proposal ID from decisions (approved or flagged) |
| - optional: references at least one flagged rogue |
| - continuity with prev_journal (a shared noun phrase bumps +0.1) |
| Length (0.2): |
| - sweet spot 150β350 words; 0 below 20 words; plateau in band; taper |
| """ |
| if not journal or not journal.strip(): |
| return 0.0 |
|
|
| text = journal |
| lower = text.lower() |
|
|
| |
| struct = 0.0 |
| keyword_hits = sum( |
| 1 |
| for kw in ("week", "risk", "decision", "next", "approv", "reject", "flag", |
| "stock", "cash", "margin", "priority", "action") |
| if kw in lower |
| ) |
| struct += 0.5 * min(1.0, keyword_hits / 5.0) |
|
|
| depts_seen = sum( |
| 1 |
| for dept_kw in ("supply", "store", "finance", "growth", "expansion", |
| "ops", "marketing") |
| if dept_kw in lower |
| ) |
| struct += 0.5 * min(1.0, depts_seen / 3.0) |
|
|
| structural_sub = E.JOURNAL_STRUCTURAL_MAX * struct |
|
|
| |
| coherence = 0.0 |
| mentioned_pids = 0 |
| approved_or_flagged = [ |
| d.proposal_id for d in decisions |
| if d.verdict in ("approve", "modify", "flag_suspicious") |
| ] |
| for pid in approved_or_flagged: |
| if pid and pid in text: |
| mentioned_pids += 1 |
| coherence += 0.7 * min(1.0, mentioned_pids / 2.0) |
|
|
| if any(d.verdict == "flag_suspicious" for d in decisions): |
| for d in decisions: |
| if d.verdict == "flag_suspicious" and d.proposal_id in text: |
| coherence += 0.2 |
| break |
|
|
| if prev_journal: |
| shared = _shared_noun_overlap(lower, prev_journal.lower()) |
| if shared: |
| coherence += 0.1 |
|
|
| coherence = min(1.0, coherence) |
| coherence_sub = E.JOURNAL_COHERENCE_MAX * coherence |
|
|
| |
| n_words = len(text.split()) |
| length_sub = E.JOURNAL_LENGTH_MAX * _length_score_triangular(n_words) |
|
|
| return min(1.0, structural_sub + coherence_sub + length_sub) |
|
|
|
|
| def quarterly_pnl_bonus(ledger: CompanyLedger) -> float: |
| """Map QTD EBITDA margin to [-1, +1] (linear, clipped at -10%/+10%). |
| |
| Anchors: |
| -10% β -1.0 (deep cash-burn, floor) |
| -5 % β -0.5 |
| 0 % β 0.0 (break-even, neutral) |
| +5 % β +0.5 (at the plan's stretch target) |
| +10% β +1.0 (ceiling) |
| """ |
| margin_pct = ledger.pnl_qtd.ebitda_margin_pct |
| mapped = (margin_pct + 5.0) / 10.0 - 0.5 |
| return max(-1.0, min(1.0, mapped)) |
|
|
|
|
| def cash_floor_penalty(min_cash_inr: float) -> float: |
| """+1.0 if cash dropped below 0 at any point (applied with negative weight).""" |
| return 1.0 if min_cash_inr < 0 else 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def weekly_reward( |
| kpi_snapshot: KPISnapshot, |
| decisions: List[ProposalDecision], |
| inbox: List[Proposal], |
| rogue_metrics: Dict[str, int], |
| journal_entry: str, |
| prev_journal_entry: str = "", |
| ) -> Tuple[float, Dict[str, float]]: |
| """Compute this week's reward and per-component breakdown. |
| |
| Returns: |
| (weekly_reward_float, components_dict) |
| components_dict has raw (pre-weight) + weighted keys. |
| """ |
| raw: Dict[str, float] = { |
| "kpi_delta": kpi_delta_score(kpi_snapshot), |
| "rogue_catch": rogue_catch_score(rogue_metrics), |
| "stockout": stockout_penalty(kpi_snapshot), |
| } |
|
|
| w = E.REWARD_WEIGHTS |
| weighted: Dict[str, float] = { |
| "kpi_delta": w["weekly_kpi_delta"] * raw["kpi_delta"], |
| "rogue_catch": w["rogue_catch"] * raw["rogue_catch"], |
| "stockout": w["stockout"] * raw["stockout"], |
| } |
|
|
| total = sum(weighted.values()) |
| out = {f"raw.{k}": v for k, v in raw.items()} |
| out.update({f"weighted.{k}": v for k, v in weighted.items()}) |
| out["total"] = total |
| return total, out |
|
|
|
|
| def terminal_reward( |
| ledger: CompanyLedger, |
| min_cash_inr: float, |
| ) -> Tuple[float, Dict[str, float]]: |
| """Terminal reward applied at the final step of the episode.""" |
| raw = { |
| "quarterly_pnl": quarterly_pnl_bonus(ledger), |
| } |
| w = E.REWARD_WEIGHTS |
| weighted = { |
| "quarterly_pnl": w["quarterly_pnl"] * raw["quarterly_pnl"], |
| } |
| total = sum(weighted.values()) |
| out = {f"raw.{k}": v for k, v in raw.items()} |
| out.update({f"weighted.{k}": v for k, v in weighted.items()}) |
| out["total"] = total |
| return total, out |
|
|
|
|
| |
| |
| |
|
|
| def _clip_to_unit(x: float) -> float: |
| return max(-1.0, min(1.0, x)) |
|
|
|
|
| def _length_score_triangular(n_words: int) -> float: |
| """1.0 in [150, 350] band, tapering down outside; 0 below 20.""" |
| if n_words < 20: |
| return 0.0 |
| if n_words < 150: |
| return (n_words - 20) / (150 - 20) |
| if n_words <= 350: |
| return 1.0 |
| if n_words < 700: |
| return 1.0 - (n_words - 350) / (700 - 350) |
| return 0.0 |
|
|
|
|
| _NOUN_TOKEN_RE = re.compile(r"[a-zA-Z]{5,}") |
|
|
|
|
| def _shared_noun_overlap(a: str, b: str) -> bool: |
| """True if the two lowercased journals share at least one reasonable token. |
| |
| Cheap proxy for narrative continuity (no LLM). |
| """ |
| if not a or not b: |
| return False |
| a_tokens = set(_NOUN_TOKEN_RE.findall(a)) |
| b_tokens = set(_NOUN_TOKEN_RE.findall(b)) |
| stops = {"week", "this", "that", "with", "been", "have", "from", "were", |
| "they", "their", "about", "there", "which", "these", "those", |
| "should", "could", "would", "while"} |
| shared = (a_tokens & b_tokens) - stops |
| return bool(shared) |
|
|