Spaces:
Sleeping
Sleeping
| """OpenEnv Rubric subclasses that power ChargebackOps grading. | |
| Every scoring dimension is a standalone :class:`openenv.core.rubrics.Rubric` | |
| so the whole grader can be introspected via ``named_rubrics``, captured via | |
| ``state_dict``, and swapped piecewise (e.g. replace :class:`NoteQualityRubric` | |
| with an ``LLMJudge``). The per-case composite uses :class:`WeightedSum` with | |
| weights that must sum to 1.0. | |
| The rubrics take their inputs via a :class:`GradingContext` dataclass passed | |
| as the ``action`` argument of :meth:`Rubric.forward`. The ``observation`` | |
| argument is ignored — ChargebackOps grading operates over deterministic | |
| episode progress, not on the last observation payload. This keeps the rubrics | |
| pure and unit-testable without an environment instance. | |
| Set ``USE_LLM_NOTE_JUDGE=1`` to swap the deterministic | |
| :class:`NoteQualityRubric` for the LLM-backed | |
| :class:`evaluation.llm_note_judge.LLMNoteJudgeRubric` when constructing | |
| :class:`CaseRubric`. The LLM rubric falls back to the deterministic scorer | |
| on any failure, so offline benchmarks remain reproducible without API keys. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from dataclasses import dataclass | |
| from typing import Any | |
| from openenv.core.rubrics import Gate, Rubric, WeightedSum | |
| try: | |
| from ..scenarios.arbitration import ( | |
| ARB_FEE_PER_SIDE, | |
| ARB_ISSUER_WIN_THRESHOLD, | |
| ARB_MERCHANT_WIN_THRESHOLD, | |
| ) | |
| from ..scenarios.issuer_model import evidence_strength_score | |
| from ..scenarios.simulation import CaseProgress, InternalCase, TaskScenario | |
| except ImportError: # pragma: no cover | |
| from scenarios.arbitration import ( | |
| ARB_FEE_PER_SIDE, | |
| ARB_ISSUER_WIN_THRESHOLD, | |
| ARB_MERCHANT_WIN_THRESHOLD, | |
| ) | |
| from scenarios.issuer_model import evidence_strength_score | |
| from scenarios.simulation import CaseProgress, InternalCase, TaskScenario | |
| class GradingContext: | |
| """Inputs one per-case rubric evaluation needs.""" | |
| case: InternalCase | |
| progress: CaseProgress | |
| step_count: int | |
| class EpisodeGradingContext: | |
| """Inputs the episode-level rubric needs.""" | |
| task: TaskScenario | |
| progress_by_case: dict[str, CaseProgress] | |
| step_count: int | |
| def _ratio(numerator: int, denominator: int) -> float: | |
| if denominator <= 0: | |
| return 1.0 | |
| return max(0.0, min(1.0, numerator / denominator)) | |
| def _final_resolution(progress: CaseProgress) -> str: | |
| return progress.final_resolution or "unresolved" | |
| def _contest_is_valid(case: InternalCase) -> bool: | |
| return case.optimal_strategy == "contest" or "contest" in case.acceptable_strategies | |
| class StrategyCorrectnessRubric(Rubric): | |
| """Score final strategy: optimal=1.0, acceptable=0.35, else 0.0.""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| final = _final_resolution(ctx.progress) | |
| if final == ctx.case.optimal_strategy: | |
| return 1.0 | |
| if final in ctx.case.acceptable_strategies: | |
| return 0.35 | |
| return 0.0 | |
| class EvidenceQualityRubric(Rubric): | |
| """Score the evidence packet attached to the case. | |
| Zeroes out (vacuous-truth fix) when the agent contests a case that was | |
| never contestable — no evidence quality can rescue a wrong strategy. | |
| """ | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| case = ctx.case | |
| progress = ctx.progress | |
| final = _final_resolution(progress) | |
| attached_set = set(progress.attached_evidence_ids) | |
| required_attached = len(attached_set.intersection(case.required_evidence_ids)) | |
| helpful_attached = len(attached_set.intersection(case.helpful_evidence_ids)) | |
| harmful_attached = len(attached_set.intersection(case.harmful_evidence_ids)) | |
| if final == "contest": | |
| if not _contest_is_valid(case): | |
| return 0.0 | |
| base = 0.7 * _ratio(required_attached, len(case.required_evidence_ids)) | |
| bonus = 0.3 * _ratio( | |
| helpful_attached, max(1, len(case.helpful_evidence_ids)) | |
| ) | |
| penalty = 0.25 * harmful_attached | |
| return max(0.0, min(1.0, base + bonus - penalty)) | |
| if final in {"accept_chargeback", "issue_refund"}: | |
| if case.optimal_strategy == "contest": | |
| return 0.15 | |
| return 1.0 if helpful_attached == 0 and harmful_attached == 0 else 0.7 | |
| return 0.0 | |
| class PacketValidityRubric(Rubric): | |
| """All-or-nothing: required evidence complete AND no harmful attached.""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| case = ctx.case | |
| progress = ctx.progress | |
| final = _final_resolution(progress) | |
| attached_set = set(progress.attached_evidence_ids) | |
| required_attached = len(attached_set.intersection(case.required_evidence_ids)) | |
| harmful_attached = len(attached_set.intersection(case.harmful_evidence_ids)) | |
| if final == "contest": | |
| if not _contest_is_valid(case): | |
| return 0.0 | |
| if ( | |
| required_attached == len(case.required_evidence_ids) | |
| and harmful_attached == 0 | |
| ): | |
| return 1.0 | |
| return 0.0 | |
| if final in {"accept_chargeback", "issue_refund"}: | |
| if case.optimal_strategy == "contest": | |
| return 0.0 | |
| return 1.0 | |
| return 0.0 | |
| class DeadlineComplianceRubric(Rubric): | |
| """1.0 if resolved on time, else 0.0.""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| case = ctx.case | |
| progress = ctx.progress | |
| final = _final_resolution(progress) | |
| if final == "unresolved": | |
| return 0.0 | |
| if final == "contest" and progress.merchant_submitted_at_step is not None: | |
| # Long-horizon tasks can delay issuer decisions by several steps. | |
| # Deadline compliance is based on the merchant's submission time, | |
| # not when the issuer eventually responds. | |
| resolution_step = progress.merchant_submitted_at_step | |
| else: | |
| resolution_step = ( | |
| progress.resolved_at_step | |
| if progress.resolved_at_step is not None | |
| else ctx.step_count | |
| ) | |
| if resolution_step > case.deadline_step: | |
| return 0.0 | |
| return 1.0 | |
| class CaseAbandonedRubric(Rubric): | |
| """Hard-constraint rubric: 0.0 if the case was abandoned past deadline. | |
| Used as the inner rubric for the :class:`Gate` wrapped around | |
| :class:`CaseRubric`. The distinction from :class:`DeadlineComplianceRubric` | |
| is intentional: | |
| * :class:`DeadlineComplianceRubric` is the *dimension* — penalises any | |
| late resolution with a 0 in the weighted sum (15% score drop). | |
| * :class:`CaseAbandonedRubric` is the *gate* — hard-zeros the entire | |
| case only when the agent never even attempted to resolve it before | |
| the deadline expired. In real chargeback operations this is the | |
| "no contest, case closed" outcome: the merchant forfeited. | |
| This split means a late-but-completed representment takes the 15% | |
| deadline penalty (and still gets graded on evidence, strategy, and | |
| packet quality), while a case left untouched after the deadline | |
| collapses the entire case score to 0. | |
| """ | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| progress = ctx.progress | |
| case = ctx.case | |
| if _final_resolution(progress) != "unresolved": | |
| return 1.0 | |
| if ( | |
| progress.resolution_status == "pending_issuer_review" | |
| and progress.merchant_submitted_at_step is not None | |
| and progress.merchant_submitted_at_step <= case.deadline_step | |
| ): | |
| return 1.0 | |
| if ctx.step_count > case.deadline_step: | |
| return 0.0 | |
| return 1.0 | |
| class EfficiencyRubric(Rubric): | |
| """Penalise wasted / redundant actions, reward early correct concessions.""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| case = ctx.case | |
| progress = ctx.progress | |
| final = _final_resolution(progress) | |
| wasted_actions = progress.duplicate_queries + progress.invalid_actions | |
| efficiency = max( | |
| 0.0, | |
| 1.0 - min(0.9, wasted_actions * 0.1 + progress.submit_attempts * 0.05), | |
| ) | |
| # Over-querying a concedable case is wasted exploration. | |
| if ( | |
| final in {"accept_chargeback", "issue_refund"} | |
| and case.optimal_strategy != "contest" | |
| ): | |
| systems_queried = len(progress.revealed_systems) | |
| if systems_queried > 2: | |
| efficiency -= 0.15 * (systems_queried - 2) | |
| # Retrieving policy after the decision was already made is wasted. | |
| if progress.policy_retrieved and progress.resolved_at_step is not None: | |
| if final in { | |
| "accept_chargeback", | |
| "issue_refund", | |
| } and case.optimal_strategy in { | |
| "accept_chargeback", | |
| "issue_refund", | |
| }: | |
| efficiency -= 0.08 | |
| # Early correct concession bonus. | |
| if ( | |
| final in {"accept_chargeback", "issue_refund"} | |
| and case.optimal_strategy in {"accept_chargeback", "issue_refund"} | |
| and progress.resolved_at_step is not None | |
| and progress.resolved_at_step <= 3 | |
| ): | |
| efficiency = min(1.0, efficiency + 0.1) | |
| return max(0.0, min(1.0, efficiency)) | |
| class OutcomeQualityRubric(Rubric): | |
| """Discrete outcome quality: optimal=1.0, acceptable=0.4, else 0.0.""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| final = _final_resolution(ctx.progress) | |
| if final == ctx.case.optimal_strategy: | |
| return 1.0 | |
| if final in ctx.case.acceptable_strategies: | |
| return 0.4 | |
| return 0.0 | |
| def _probability_of_merchant_win(score: float) -> float: | |
| """Map evidence strength to arbitration win probability. | |
| Mirrors the deterministic arbitration ruling: strong packets always win, | |
| weak packets always lose, the ambiguity band is a 50/50 coin flip. | |
| """ | |
| if score >= ARB_MERCHANT_WIN_THRESHOLD: | |
| return 1.0 | |
| if score <= ARB_ISSUER_WIN_THRESHOLD: | |
| return 0.0 | |
| return 0.5 | |
| class EscalationROIRubric(Rubric): | |
| """Score the merchant's escalate-vs-concede decision on expected value. | |
| Escalation is rational iff ``P(win) * dispute_amount > arb_fee`` — the | |
| arbitration fee is paid by both sides regardless, so the merchant should | |
| only pay it when the expected recovered dispute amount exceeds the fee. | |
| Dimension is vacuous (full credit) for cases that never entered | |
| pre-arbitration, since no escalation decision was taken. | |
| """ | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| case = ctx.case | |
| progress = ctx.progress | |
| if progress.round_number < 2 and progress.arbitration_outcome is None: | |
| # Vacuous credit only when the case was never contestable. | |
| # Conceding a contestable case before reaching the issuer review | |
| # is a forfeit on EV grounds, not a smart decision — penalise it. | |
| if case.optimal_strategy == "contest": | |
| expected_contest_recovery = case.amount # P(win) at full evidence | |
| if expected_contest_recovery > ARB_FEE_PER_SIDE: | |
| final = _final_resolution(progress) | |
| if final in {"accept_chargeback", "issue_refund"}: | |
| return 0.0 | |
| return 1.0 | |
| score = evidence_strength_score(case, progress) | |
| p_win = _probability_of_merchant_win(score) | |
| expected_recovery = p_win * case.amount | |
| escalate_is_positive_ev = expected_recovery > ARB_FEE_PER_SIDE | |
| status = progress.resolution_status | |
| if status == "won_pre_arb": | |
| return 1.0 | |
| if status == "conceded_pre_arb": | |
| return 0.0 if escalate_is_positive_ev else 1.0 | |
| if progress.arbitration_outcome is not None: | |
| return 1.0 if escalate_is_positive_ev else 0.0 | |
| return 0.5 | |
| class NoteQualityRubric(Rubric): | |
| """Text-based representment note scorer (contest-only).""" | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: GradingContext = action | |
| progress = ctx.progress | |
| if _final_resolution(progress) != "contest" or not progress.representment_note: | |
| return 0.0 | |
| return grade_representment_note( | |
| progress.representment_note, | |
| ctx.case, | |
| set(progress.attached_evidence_ids), | |
| ) | |
| def grade_representment_note( | |
| note: str | None, | |
| case: InternalCase, | |
| attached_ids: set[str], | |
| ) -> float: | |
| """Score a representment note from 0.0 to 1.0. | |
| Evaluates whether the note references required policy claims, mentions | |
| attached evidence, has sufficient substance, and avoids harmful mentions. | |
| """ | |
| if not note or not note.strip(): | |
| return 0.0 | |
| text = note.lower() | |
| score = 0.0 | |
| # Substance: minimum length for a coherent note. | |
| word_count = len(text.split()) | |
| if word_count >= 5: | |
| score += 0.2 | |
| elif word_count >= 2: | |
| score += 0.1 | |
| # Required claims coverage: does the note mention policy requirements? | |
| if case.policy_requirements: | |
| claims_hit = 0 | |
| for req in case.policy_requirements: | |
| req_keywords = req.lower().split() | |
| if any(kw in text for kw in req_keywords if len(kw) > 3): | |
| claims_hit += 1 | |
| score += 0.5 * _ratio(claims_hit, len(case.policy_requirements)) | |
| else: | |
| score += 0.3 | |
| # Evidence coherence: does the note reference attached evidence? | |
| evidence_refs = sum( | |
| 1 | |
| for eid in attached_ids | |
| if eid.lower() in text | |
| or any( | |
| part in text | |
| for part in eid.lower().replace("-", " ").split() | |
| if len(part) > 3 | |
| ) | |
| ) | |
| if evidence_refs > 0: | |
| score += 0.15 | |
| # Harmful mention penalty derived from each case's harmful evidence blobs. | |
| harmful_terms: set[str] = set() | |
| for items in case.evidence_by_system.values(): | |
| for item in items: | |
| if item.harmful: | |
| for word in (item.title + " " + item.summary).lower().split(): | |
| clean = word.strip(".,;:()") | |
| if len(clean) > 3: | |
| harmful_terms.add(clean) | |
| harmful_terms -= { | |
| "was", | |
| "the", | |
| "and", | |
| "for", | |
| "that", | |
| "with", | |
| "from", | |
| "time", | |
| "detail", | |
| } | |
| harmful_hits = sum(1 for term in harmful_terms if term in text) | |
| if harmful_hits > 0: | |
| score -= 0.12 * min(harmful_hits, 3) | |
| return max(0.0, min(1.0, score)) | |
| # Weights must match the order of rubrics handed to WeightedSum and sum to 1.0. | |
| CASE_DIMENSION_WEIGHTS: tuple[float, ...] = ( | |
| 0.20, | |
| 0.15, | |
| 0.10, | |
| 0.10, | |
| 0.10, | |
| 0.10, | |
| 0.05, | |
| 0.20, | |
| ) | |
| def _resolve_default_note_rubric() -> Rubric: | |
| """Return the LLM-backed note judge if opted in, else the deterministic one. | |
| Reads ``USE_LLM_NOTE_JUDGE`` lazily so importing this module never triggers | |
| a provider import. The LLM rubric internally falls back to | |
| :class:`NoteQualityRubric` when no provider key is set. | |
| """ | |
| if os.getenv("USE_LLM_NOTE_JUDGE", "").lower() in {"1", "true", "yes"}: | |
| try: # pragma: no cover - import-time guard | |
| from .llm_note_judge import LLMNoteJudgeRubric | |
| except ImportError: | |
| from evaluation.llm_note_judge import LLMNoteJudgeRubric | |
| return LLMNoteJudgeRubric() | |
| return NoteQualityRubric() | |
| CASE_DIMENSION_NAMES: tuple[str, ...] = ( | |
| "strategy_correctness", | |
| "evidence_quality", | |
| "packet_validity", | |
| "deadline_compliance", | |
| "efficiency", | |
| "outcome_quality", | |
| "note_quality", | |
| "escalation_roi", | |
| ) | |
| class CaseRubric(Rubric): | |
| """Per-case composite — weighted sum of the seven scoring dimensions, | |
| hard-gated on deadline compliance. | |
| The weighted sum lives in :class:`WeightedSum`. On top of that, a | |
| :class:`Gate` wrapping :class:`DeadlineComplianceRubric` at | |
| ``threshold=1.0`` hard-zeros the whole case if the deadline is missed — | |
| in real chargeback operations the best evidence in the world can't save a | |
| case filed too late, so a late resolution must collapse the case score, | |
| not just reduce one dimension. This is a direct use of OpenEnv's | |
| :class:`Gate` primitive and exposes the hard-constraint pattern through | |
| :meth:`named_rubrics`. | |
| """ | |
| def __init__(self, *, note_rubric: Rubric | None = None) -> None: | |
| super().__init__() | |
| if note_rubric is None: | |
| note_rubric = _resolve_default_note_rubric() | |
| self.aggregator = WeightedSum( | |
| rubrics=[ | |
| StrategyCorrectnessRubric(), | |
| EvidenceQualityRubric(), | |
| PacketValidityRubric(), | |
| DeadlineComplianceRubric(), | |
| EfficiencyRubric(), | |
| OutcomeQualityRubric(), | |
| note_rubric, | |
| EscalationROIRubric(), | |
| ], | |
| weights=list(CASE_DIMENSION_WEIGHTS), | |
| ) | |
| # Hard constraint: a case never even *attempted* before the deadline | |
| # expires collapses the entire case score. This wraps | |
| # :class:`CaseAbandonedRubric` (not :class:`DeadlineComplianceRubric`) | |
| # so late-but-completed representments still get dimension-level | |
| # credit while truly abandoned cases are zeroed. | |
| self.deadline_gate = Gate(CaseAbandonedRubric(), threshold=1.0) | |
| def forward(self, action: Any, observation: Any) -> float: | |
| # Always run the aggregator first so per-dimension ``last_score`` | |
| # values are fresh for the grader breakdown, even when the gate | |
| # hard-fails the case. | |
| weighted = self.aggregator(action, observation) | |
| if self.deadline_gate(action, observation) < 1.0: | |
| return 0.0 | |
| return weighted | |
| def dimension_scores(self) -> dict[str, float]: | |
| """Return per-dimension scores from the most recent forward pass.""" | |
| scores: dict[str, float] = {} | |
| for name, child in zip(CASE_DIMENSION_NAMES, self.aggregator._rubric_list): | |
| scores[name] = ( | |
| float(child.last_score) if child.last_score is not None else 0.0 | |
| ) | |
| return scores | |
| class ChargebackOpsEpisodeRubric(Rubric): | |
| """Episode-level rubric: aggregate per-case scores weighted by case.weight.""" | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.case_rubric = CaseRubric() | |
| def forward(self, action: Any, observation: Any) -> float: | |
| ctx: EpisodeGradingContext = action | |
| total = 0.0 | |
| total_weight = 0.0 | |
| for case in ctx.task.cases: | |
| case_ctx = GradingContext( | |
| case=case, | |
| progress=ctx.progress_by_case[case.case_id], | |
| step_count=ctx.step_count, | |
| ) | |
| case_score = self.case_rubric(case_ctx, observation) | |
| total += case_score * case.weight | |
| total_weight += case.weight | |
| if total_weight == 0: | |
| return 0.0 | |
| return min(1.0, total / total_weight) | |