SimMart / server /grader.py
Viani's picture
HF Space: 4-dept SimMart env + 1.5B SFT+GRPO training (hackathon submission)
5c35138
"""SimMart reward grader β€” 4 reward components (mini env).
Formula:
R_episode =
0.50 Γ— Ξ£ weekly_kpi_delta_score (weekly, returned each step)
+ 0.30 Γ— Ξ£ rogue_catch_bonus (weekly)
- 0.10 Γ— Ξ£ stockout_penalty (weekly)
+ 0.20 Γ— quarterly_pnl_bonus (terminal only)
Weekly reward is assembled from the first 3 components; the environment sums
them over the 8 weeks. Terminal reward (quarterly_pnl) is added in the last
step.
Components stay in [-1, +1] before weighting so the weighted sum also
falls in a well-bounded range (β‰ˆ [-1.3, +1.2] per episode).
Public API:
weekly_reward(...) β†’ (float, Dict[str, float]) per-week total + components
terminal_reward(...) β†’ (float, Dict[str, float]) episode-end terminal bonus + components
kpi_delta_score(kpi)
rogue_catch_score(rogue_metrics)
false_reject_penalty(decisions, inbox, rogue_pids)
stockout_penalty(kpi)
journal_coherence_score(journal, decisions, prev_journal)
quarterly_pnl_bonus(ledger)
cash_floor_penalty(min_cash_inr)
"""
from __future__ import annotations
import math
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple
try:
from ..models import (
CompanyLedger,
KPISnapshot,
Proposal,
ProposalDecision,
)
from . import economics as E
except (ImportError, ModuleNotFoundError):
from models import CompanyLedger, KPISnapshot, Proposal, ProposalDecision
from server import economics as E
# ---------------------------------------------------------------------------
# Individual components (each returns a normalised signal in [-1, +1] or [0, 1])
# ---------------------------------------------------------------------------
def kpi_delta_score(kpi: KPISnapshot) -> float:
"""Average of 5 KPI levels measured against the starting baseline target.
Per-component score = clip((level - target) / normaliser, -1, +1).
Stockout is INVERTED so that *lower* stockouts score positive.
Why level (not week-over-week delta)? A delta-based score punishes
turbulence: a steady decline scores negative every week even at a
constant rate of slip, so the cumulative reward is the integral of
decline. With level scoring, a do-nothing-perfectly policy that
holds at baseline scores 0/wk; any improvement above baseline scores
positive; any deficit scores proportional to its current size, not
its derivative. This gives GRPO a well-shaped landscape with a
clean zero-point.
"""
t = E.KPI_TARGETS
n = E.KPI_LEVEL_NORMALISERS
scores = [
_clip_to_unit((kpi.revenue_inr - t["revenue_inr"]) / n["revenue_inr"]),
_clip_to_unit((kpi.gross_margin_pct - t["gross_margin_pct"]) / n["gross_margin_pct"]),
-_clip_to_unit((kpi.stockout_rate_pct - t["stockout_rate_pct"]) / n["stockout_rate_pct"]),
_clip_to_unit((kpi.nps - t["nps"]) / n["nps"]),
_clip_to_unit((kpi.delivery_sla_hit_rate_pct - t["delivery_sla_hit_rate_pct"]) / n["delivery_sla_hit_rate_pct"]),
]
return sum(scores) / len(scores)
def rogue_catch_score(rogue_metrics: Dict[str, int]) -> float:
"""Correct flags at +1.0 each; false flags at -0.5 each.
Returned value is clipped to [-1, +1] so a single correct catch is
meaningful; stacking 3 correct catches caps at +1.0.
"""
correct = rogue_metrics.get("correct_flags", 0)
false = rogue_metrics.get("false_flags", 0)
raw = correct * E.ROGUE_CATCH_BONUS + false * E.ROGUE_CATCH_FALSE_FLAG
return max(-1.0, min(1.0, raw))
def false_reject_penalty(
decisions: List[ProposalDecision],
inbox: List[Proposal],
rogue_proposal_ids: Optional[Iterable[str]] = None,
) -> float:
"""Weighted count of non-rogue rejections, normalised by inbox size.
Weights:
high urgency β†’ 1.0 (most costly to reject incorrectly)
med urgency β†’ 0.5
low urgency β†’ 0.1
A pure `reject-all` baseline on an 11-proposal inbox averaging med-urgency
scores roughly 0.5 (then βˆ’0.3 weight β†’ -0.15 component, potentially
worse with the stockout penalty that follows). Oracle CEO: 0.0.
"""
if not inbox:
return 0.0
rogue_ids = set(rogue_proposal_ids or [])
inbox_by_id = {p.proposal_id: p for p in inbox}
weights = {"high": 1.0, "med": 0.5, "low": 0.1}
total = 0.0
for d in decisions:
if d.verdict != "reject":
continue
if d.proposal_id in rogue_ids:
continue # rejecting a rogue is fine (rogue=flag_suspicious is preferred but reject not punished)
prop = inbox_by_id.get(d.proposal_id)
if prop is None:
continue
total += weights.get(prop.urgency, 0.5)
return min(1.0, total / max(1, len(inbox)))
def stockout_penalty(kpi: KPISnapshot) -> float:
"""Return a [0, 1] penalty proportional to stockout rate over 5 %.
Per economics.py STOCKOUT_PER_PT_PENALTY = 0.05/pt, so 25 pp over the
threshold maxes the penalty at 1.0.
"""
excess_pts = max(0.0, kpi.stockout_rate_pct - 5.0)
return min(1.0, excess_pts * E.STOCKOUT_PER_PT_PENALTY)
def journal_coherence_score(
journal: str,
decisions: List[ProposalDecision],
prev_journal: str = "",
) -> float:
"""Score journal quality ∈ [0, 1]: structural + coherence + length.
Structural (0.4):
- mentions of at least 2 structural keywords (week, risk, decision, next)
- mentions of at least 2 distinct dept names
Coherence (0.4):
- references at least one proposal ID from decisions (approved or flagged)
- optional: references at least one flagged rogue
- continuity with prev_journal (a shared noun phrase bumps +0.1)
Length (0.2):
- sweet spot 150–350 words; 0 below 20 words; plateau in band; taper
"""
if not journal or not journal.strip():
return 0.0
text = journal
lower = text.lower()
# --- Structural (0.4 max) ---
struct = 0.0
keyword_hits = sum(
1
for kw in ("week", "risk", "decision", "next", "approv", "reject", "flag",
"stock", "cash", "margin", "priority", "action")
if kw in lower
)
struct += 0.5 * min(1.0, keyword_hits / 5.0)
depts_seen = sum(
1
for dept_kw in ("supply", "store", "finance", "growth", "expansion",
"ops", "marketing")
if dept_kw in lower
)
struct += 0.5 * min(1.0, depts_seen / 3.0)
structural_sub = E.JOURNAL_STRUCTURAL_MAX * struct # up to 0.4
# --- Coherence (0.4 max) ---
coherence = 0.0
mentioned_pids = 0
approved_or_flagged = [
d.proposal_id for d in decisions
if d.verdict in ("approve", "modify", "flag_suspicious")
]
for pid in approved_or_flagged:
if pid and pid in text:
mentioned_pids += 1
coherence += 0.7 * min(1.0, mentioned_pids / 2.0)
if any(d.verdict == "flag_suspicious" for d in decisions):
for d in decisions:
if d.verdict == "flag_suspicious" and d.proposal_id in text:
coherence += 0.2
break
if prev_journal:
shared = _shared_noun_overlap(lower, prev_journal.lower())
if shared:
coherence += 0.1
coherence = min(1.0, coherence)
coherence_sub = E.JOURNAL_COHERENCE_MAX * coherence # up to 0.4
# --- Length (0.2 max) ---
n_words = len(text.split())
length_sub = E.JOURNAL_LENGTH_MAX * _length_score_triangular(n_words)
return min(1.0, structural_sub + coherence_sub + length_sub)
def quarterly_pnl_bonus(ledger: CompanyLedger) -> float:
"""Map QTD EBITDA margin to [-1, +1] (linear, clipped at -10%/+10%).
Anchors:
-10% β†’ -1.0 (deep cash-burn, floor)
-5 % β†’ -0.5
0 % β†’ 0.0 (break-even, neutral)
+5 % β†’ +0.5 (at the plan's stretch target)
+10% β†’ +1.0 (ceiling)
"""
margin_pct = ledger.pnl_qtd.ebitda_margin_pct
mapped = (margin_pct + 5.0) / 10.0 - 0.5
return max(-1.0, min(1.0, mapped))
def cash_floor_penalty(min_cash_inr: float) -> float:
"""+1.0 if cash dropped below 0 at any point (applied with negative weight)."""
return 1.0 if min_cash_inr < 0 else 0.0
# ---------------------------------------------------------------------------
# Weekly + Terminal roll-up
# ---------------------------------------------------------------------------
def weekly_reward(
kpi_snapshot: KPISnapshot,
decisions: List[ProposalDecision],
inbox: List[Proposal],
rogue_metrics: Dict[str, int],
journal_entry: str,
prev_journal_entry: str = "",
) -> Tuple[float, Dict[str, float]]:
"""Compute this week's reward and per-component breakdown.
Returns:
(weekly_reward_float, components_dict)
components_dict has raw (pre-weight) + weighted keys.
"""
raw: Dict[str, float] = {
"kpi_delta": kpi_delta_score(kpi_snapshot),
"rogue_catch": rogue_catch_score(rogue_metrics),
"stockout": stockout_penalty(kpi_snapshot),
}
w = E.REWARD_WEIGHTS
weighted: Dict[str, float] = {
"kpi_delta": w["weekly_kpi_delta"] * raw["kpi_delta"],
"rogue_catch": w["rogue_catch"] * raw["rogue_catch"],
"stockout": w["stockout"] * raw["stockout"], # neg weight
}
total = sum(weighted.values())
out = {f"raw.{k}": v for k, v in raw.items()}
out.update({f"weighted.{k}": v for k, v in weighted.items()})
out["total"] = total
return total, out
def terminal_reward(
ledger: CompanyLedger,
min_cash_inr: float,
) -> Tuple[float, Dict[str, float]]:
"""Terminal reward applied at the final step of the episode."""
raw = {
"quarterly_pnl": quarterly_pnl_bonus(ledger),
}
w = E.REWARD_WEIGHTS
weighted = {
"quarterly_pnl": w["quarterly_pnl"] * raw["quarterly_pnl"],
}
total = sum(weighted.values())
out = {f"raw.{k}": v for k, v in raw.items()}
out.update({f"weighted.{k}": v for k, v in weighted.items()})
out["total"] = total
return total, out
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _clip_to_unit(x: float) -> float:
return max(-1.0, min(1.0, x))
def _length_score_triangular(n_words: int) -> float:
"""1.0 in [150, 350] band, tapering down outside; 0 below 20."""
if n_words < 20:
return 0.0
if n_words < 150:
return (n_words - 20) / (150 - 20)
if n_words <= 350:
return 1.0
if n_words < 700:
return 1.0 - (n_words - 350) / (700 - 350)
return 0.0
_NOUN_TOKEN_RE = re.compile(r"[a-zA-Z]{5,}")
def _shared_noun_overlap(a: str, b: str) -> bool:
"""True if the two lowercased journals share at least one reasonable token.
Cheap proxy for narrative continuity (no LLM).
"""
if not a or not b:
return False
a_tokens = set(_NOUN_TOKEN_RE.findall(a))
b_tokens = set(_NOUN_TOKEN_RE.findall(b))
stops = {"week", "this", "that", "with", "been", "have", "from", "were",
"they", "their", "about", "there", "which", "these", "those",
"should", "could", "would", "while"}
shared = (a_tokens & b_tokens) - stops
return bool(shared)