Spaces:
Sleeping
Sleeping
| """Core environment implementation for ChargebackOps.""" | |
| from __future__ import annotations | |
| import hashlib | |
| from datetime import datetime, timedelta, timezone | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| try: | |
| from ..core.episode_store import record_report | |
| from ..evaluation.grading import grade_episode | |
| from ..evaluation.rubrics import ChargebackOpsEpisodeRubric | |
| from ..core.models import ( | |
| ActionTraceItem, | |
| CaseQueueItem, | |
| CaseResolutionState, | |
| ChargebackOpsAction, | |
| ChargebackOpsObservation, | |
| ChargebackOpsState, | |
| EvidenceCard, | |
| PolicyView, | |
| VisibleCase, | |
| ) | |
| from ..scenarios.arbitration import ( | |
| ARB_FEE_PER_SIDE, | |
| ArbitrationOutcome, | |
| ArbitrationRuling, | |
| arbitration_ruling, | |
| ) | |
| from ..scenarios.issuer_model import IssuerAgent, IssuerDecision, IssuerReview | |
| from ..scenarios.simulation import ( | |
| ActionRecord, | |
| CaseProgress, | |
| InternalCase, | |
| get_task, | |
| ) | |
| except ImportError: # pragma: no cover | |
| from core.episode_store import record_report | |
| from evaluation.grading import grade_episode | |
| from evaluation.rubrics import ChargebackOpsEpisodeRubric | |
| from core.models import ( | |
| ActionTraceItem, | |
| CaseQueueItem, | |
| CaseResolutionState, | |
| ChargebackOpsAction, | |
| ChargebackOpsObservation, | |
| ChargebackOpsState, | |
| EvidenceCard, | |
| PolicyView, | |
| VisibleCase, | |
| ) | |
| from scenarios.arbitration import ( | |
| ARB_FEE_PER_SIDE, | |
| ArbitrationOutcome, | |
| ArbitrationRuling, | |
| arbitration_ruling, | |
| ) | |
| from scenarios.issuer_model import IssuerAgent, IssuerDecision, IssuerReview | |
| from scenarios.simulation import ActionRecord, CaseProgress, InternalCase, get_task | |
| class ChargebackOpsEnvironment( | |
| Environment[ChargebackOpsAction, ChargebackOpsObservation, ChargebackOpsState] | |
| ): | |
| """Synthetic merchant chargeback representment environment.""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| super().__init__(rubric=ChargebackOpsEpisodeRubric()) | |
| self._task = get_task("goods_not_received_easy") | |
| self._selected_case_id: str | None = None | |
| self._last_action_result = "Environment initialized." | |
| self._action_history: list[ActionRecord] = [] | |
| self._progress_by_case: dict[str, CaseProgress] = {} | |
| self._issuer_agent = IssuerAgent() | |
| self._state = ChargebackOpsState( | |
| episode_id=str(uuid4()), | |
| step_count=0, | |
| task_id=self._task.task_id, | |
| task_title=self._task.title, | |
| difficulty=self._task.difficulty, | |
| objective=self._task.objective, | |
| ) | |
| self._done = False | |
| self._latest_report = None | |
| self._reset_task_state() | |
| _MERCHANT_PROFILES = { | |
| "goods_not_received": ("Northwind Home", "5712"), | |
| "fraud_cnp": ("TechForge Direct", "5732"), | |
| "credit_not_processed": ("StreamClub Media", "4899"), | |
| "duplicate_processing": ("Meridian Retail", "5311"), | |
| "product_not_as_described": ("Aurora Commerce", "5942"), | |
| "service_not_provided": ("Metro Service Group", "7299"), | |
| } | |
| def _reset_task_state(self) -> None: | |
| self._progress_by_case = { | |
| case.case_id: CaseProgress() for case in self._task.cases | |
| } | |
| self._selected_case_id = None | |
| self._action_history = [] | |
| self._last_action_result = f"{self._task.title} ready." | |
| self._done = False | |
| self._latest_report = None | |
| def reset( | |
| self, | |
| seed: int | None = None, | |
| episode_id: str | None = None, | |
| **kwargs, | |
| ) -> ChargebackOpsObservation: | |
| task_id = kwargs.get("task_id") | |
| difficulty = kwargs.get("difficulty") | |
| _VALID_DIFFICULTIES = {"easy", "medium", "hard", "nightmare"} | |
| if difficulty is not None and difficulty not in _VALID_DIFFICULTIES: | |
| raise ValueError( | |
| f"Invalid difficulty {difficulty!r}. " | |
| f"Must be one of: {', '.join(sorted(_VALID_DIFFICULTIES))}" | |
| ) | |
| if task_id is None and difficulty in _VALID_DIFFICULTIES: | |
| resolved_seed = ( | |
| seed if seed is not None else int(kwargs.get("generated_seed", 42)) | |
| ) | |
| task_id = f"generated_{difficulty}_s{resolved_seed}" | |
| if task_id is None: | |
| task_id = "goods_not_received_easy" | |
| self._task = get_task(task_id) | |
| self._state = ChargebackOpsState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=self._task.task_id, | |
| task_title=self._task.title, | |
| difficulty=self._task.difficulty, | |
| objective=self._task.objective, | |
| ) | |
| self._reset_task_state() | |
| self._reset_rubric() | |
| return self._build_observation(reward=0.0, done=False) | |
| def step( | |
| self, | |
| action: ChargebackOpsAction, | |
| timeout_s: float | None = None, | |
| **kwargs, | |
| ) -> ChargebackOpsObservation: | |
| del timeout_s, kwargs | |
| if self._done: | |
| return self._build_observation( | |
| reward=-0.1, | |
| done=True, | |
| result="Episode already completed. Reset to start another task.", | |
| ) | |
| self._state.step_count += 1 | |
| reward = 0.0 | |
| result = "" | |
| try: | |
| reward, result = self._apply_action(action) | |
| except ValueError as exc: | |
| reward = -0.12 | |
| result = str(exc) | |
| case_id = action.case_id or self._selected_case_id | |
| if case_id and case_id in self._progress_by_case: | |
| self._progress_by_case[case_id].invalid_actions += 1 | |
| event_reward, event_messages = self._advance_pending_events() | |
| reward += event_reward | |
| if event_messages: | |
| suffix = " ".join(event_messages) | |
| result = f"{result} Updates: {suffix}" if result else suffix | |
| reward += self._apply_deadline_penalties() | |
| done = self._check_done() | |
| if done: | |
| report = grade_episode( | |
| self._task, | |
| self._progress_by_case, | |
| self._state.step_count, | |
| self._state.episode_id or "", | |
| completed=self._all_cases_resolved(), | |
| ) | |
| self._latest_report = report | |
| self._state.latest_grade = report.normalized_score | |
| self._state.grader_report = report | |
| self._state.completed = True | |
| record_report(report) | |
| reward += 0.5 * report.normalized_score | |
| else: | |
| self._state.latest_grade = self._estimated_progress_score() | |
| self._state.completed = False | |
| self._last_action_result = result | |
| self._action_history.append( | |
| ActionRecord( | |
| step_index=self._state.step_count, | |
| action_type=action.action_type, | |
| case_id=action.case_id or self._selected_case_id, | |
| outcome=result, | |
| reward=round(reward, 4), | |
| ) | |
| ) | |
| return self._build_observation( | |
| reward=round(reward, 4), | |
| done=done, | |
| result=result, | |
| ) | |
| def _apply_action(self, action: ChargebackOpsAction) -> tuple[float, str]: | |
| if action.action_type == "select_case": | |
| return self._select_case(action.case_id) | |
| if action.action_type == "wait_for_updates": | |
| return self._wait_for_updates() | |
| case = self._require_case(action.case_id) | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.resolution_status == "pending_issuer_review": | |
| raise ValueError( | |
| f"Case {case.case_id} is pending issuer review; select another case or wait for updates." | |
| ) | |
| if action.action_type == "inspect_case": | |
| return self._inspect_case(case) | |
| if action.action_type == "query_system": | |
| return self._query_system(case, action.system_name) | |
| if action.action_type == "retrieve_policy": | |
| return self._retrieve_policy(case) | |
| if action.action_type == "add_evidence": | |
| return self._add_evidence(case, action.evidence_ids) | |
| if action.action_type == "remove_evidence": | |
| return self._remove_evidence(case, action.evidence_ids) | |
| if action.action_type == "set_strategy": | |
| return self._set_strategy(case, action.strategy) | |
| if action.action_type == "submit_representment": | |
| return self._submit_representment(case, note=action.note) | |
| if action.action_type == "resolve_case": | |
| return self._resolve_case(case, action.strategy) | |
| if action.action_type == "respond_to_pre_arb": | |
| return self._respond_to_pre_arb( | |
| case, | |
| compelling_evidence_ids=action.compelling_evidence_ids, | |
| note=action.note, | |
| ) | |
| if action.action_type == "escalate_to_arbitration": | |
| return self._escalate_to_arbitration(case) | |
| if action.action_type == "accept_arbitration_loss": | |
| return self._accept_arbitration_loss(case) | |
| raise ValueError(f"Unsupported action_type '{action.action_type}'.") | |
| def _select_case(self, case_id: str | None) -> tuple[float, str]: | |
| if not case_id: | |
| raise ValueError("select_case requires case_id.") | |
| case = self._lookup_case(case_id) | |
| if not self._case_arrived(case): | |
| raise ValueError(f"Case {case.case_id} has not arrived in the queue yet.") | |
| self._selected_case_id = case.case_id | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.resolution_status != "open": | |
| return -0.02, f"Case {case.case_id} is already resolved." | |
| return 0.02, f"Selected case {case.case_id}." | |
| def _wait_for_updates(self) -> tuple[float, str]: | |
| """Advance the clock when all visible work is blocked. | |
| Long-horizon tasks include pending issuer reviews, delayed evidence | |
| pulls, and future case arrivals. Waiting while work is available is | |
| penalised; waiting when the backlog is genuinely blocked is a small | |
| neutral/positive planning action. | |
| """ | |
| open_visible = [ | |
| case | |
| for case in self._visible_task_cases() | |
| if self._progress_by_case[case.case_id].resolution_status == "open" | |
| ] | |
| if open_visible: | |
| return -0.04, "Waited while open cases were available." | |
| pending = self._pending_work_count() | |
| if pending > 0: | |
| return 0.02, f"Waited for {pending} pending updates." | |
| future = self._future_case_count() | |
| if future > 0: | |
| return 0.01, f"Waited for future case arrivals ({future} not yet visible)." | |
| return -0.03, "No pending updates or future arrivals to wait for." | |
| def _inspect_case(self, case: InternalCase) -> tuple[float, str]: | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.inspected: | |
| return -0.01, f"Case {case.case_id} was already inspected." | |
| progress.inspected = True | |
| return 0.04, f"Inspected case {case.case_id}." | |
| def _query_system( | |
| self, | |
| case: InternalCase, | |
| system_name: str | None, | |
| ) -> tuple[float, str]: | |
| if system_name is None: | |
| raise ValueError("query_system requires system_name.") | |
| progress = self._progress_by_case[case.case_id] | |
| if system_name in progress.revealed_systems: | |
| if system_name in progress.pending_evidence_systems: | |
| return ( | |
| -0.01, | |
| f"System '{system_name}' is already queued for delayed evidence on case {case.case_id}.", | |
| ) | |
| progress.duplicate_queries += 1 | |
| return ( | |
| -0.03, | |
| f"System '{system_name}' was already queried for case {case.case_id}.", | |
| ) | |
| progress.revealed_systems.add(system_name) | |
| if ( | |
| system_name in case.delayed_systems | |
| and case.evidence_response_delay_steps > 0 | |
| ): | |
| progress.pending_evidence_systems[system_name] = ( | |
| self._state.step_count + case.evidence_response_delay_steps | |
| ) | |
| return ( | |
| 0.02, | |
| f"Queued delayed {system_name} evidence for case {case.case_id}; expected in " | |
| f"{case.evidence_response_delay_steps} steps.", | |
| ) | |
| new_evidence = case.evidence_by_system.get(system_name, ()) | |
| progress.retrieved_evidence_ids.update( | |
| item.evidence_id for item in new_evidence | |
| ) | |
| helpful = sum(1 for item in new_evidence if item.helpful) | |
| if helpful > 0: | |
| return 0.06 + 0.01 * helpful, ( | |
| f"Queried {system_name} for case {case.case_id}; found {len(new_evidence)} evidence items, " | |
| f"including {helpful} useful ones." | |
| ) | |
| return -0.01 if len(new_evidence) == 0 else 0.01, ( | |
| f"Queried {system_name} for case {case.case_id}; found {len(new_evidence)} evidence items." | |
| ) | |
| def _retrieve_policy(self, case: InternalCase) -> tuple[float, str]: | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.policy_retrieved: | |
| return -0.01, f"Policy already retrieved for case {case.case_id}." | |
| progress.policy_retrieved = True | |
| return 0.05, f"Retrieved policy guidance for case {case.case_id}." | |
| def _add_evidence( | |
| self, | |
| case: InternalCase, | |
| evidence_ids: list[str], | |
| ) -> tuple[float, str]: | |
| if not evidence_ids: | |
| raise ValueError("add_evidence requires at least one evidence id.") | |
| progress = self._progress_by_case[case.case_id] | |
| all_evidence = self._evidence_map(case) | |
| reward = 0.0 | |
| added = [] | |
| for evidence_id in evidence_ids: | |
| if evidence_id not in progress.retrieved_evidence_ids: | |
| reward -= 0.04 | |
| continue | |
| if evidence_id in progress.attached_evidence_ids: | |
| reward -= 0.02 | |
| continue | |
| progress.attached_evidence_ids.append(evidence_id) | |
| added.append(evidence_id) | |
| evidence = all_evidence[evidence_id] | |
| if evidence.helpful: | |
| reward += 0.08 | |
| elif evidence.harmful: | |
| reward -= 0.08 | |
| else: | |
| reward += 0.01 | |
| return reward if added else -0.05, ( | |
| f"Attached evidence {', '.join(added)} to case {case.case_id}." | |
| if added | |
| else f"No evidence was attached to case {case.case_id}." | |
| ) | |
| def _remove_evidence( | |
| self, | |
| case: InternalCase, | |
| evidence_ids: list[str], | |
| ) -> tuple[float, str]: | |
| if not evidence_ids: | |
| raise ValueError("remove_evidence requires at least one evidence id.") | |
| progress = self._progress_by_case[case.case_id] | |
| evidence_map = self._evidence_map(case) | |
| reward = 0.0 | |
| removed = [] | |
| for evidence_id in evidence_ids: | |
| if evidence_id not in progress.attached_evidence_ids: | |
| reward -= 0.02 | |
| continue | |
| progress.attached_evidence_ids.remove(evidence_id) | |
| removed.append(evidence_id) | |
| if evidence_map[evidence_id].harmful: | |
| reward += 0.05 | |
| elif evidence_map[evidence_id].helpful: | |
| reward -= 0.03 | |
| else: | |
| reward += 0.01 | |
| return reward if removed else -0.04, ( | |
| f"Removed evidence {', '.join(removed)} from case {case.case_id}." | |
| if removed | |
| else f"No evidence was removed from case {case.case_id}." | |
| ) | |
| def _set_strategy( | |
| self, | |
| case: InternalCase, | |
| strategy: str | None, | |
| ) -> tuple[float, str]: | |
| if strategy is None: | |
| raise ValueError("set_strategy requires strategy.") | |
| progress = self._progress_by_case[case.case_id] | |
| progress.current_strategy = strategy | |
| if strategy == case.optimal_strategy: | |
| return ( | |
| 0.1, | |
| f"Set the optimal strategy '{strategy}' for case {case.case_id}.", | |
| ) | |
| if strategy in case.acceptable_strategies: | |
| return ( | |
| 0.03, | |
| f"Set an acceptable fallback strategy '{strategy}' for case {case.case_id}.", | |
| ) | |
| return -0.08, f"Set a weak strategy '{strategy}' for case {case.case_id}." | |
| def _submit_representment( | |
| self, case: InternalCase, *, note: str | None = None | |
| ) -> tuple[float, str]: | |
| progress = self._progress_by_case[case.case_id] | |
| progress.submit_attempts += 1 | |
| progress.merchant_submitted_at_step = self._state.step_count | |
| if note: | |
| progress.representment_note = note | |
| if progress.current_strategy != "contest": | |
| raise ValueError( | |
| "submit_representment requires current strategy to be 'contest'." | |
| ) | |
| if progress.resolution_status != "open": | |
| return -0.05, f"Case {case.case_id} is already resolved." | |
| if self._state.step_count > case.deadline_step: | |
| progress.final_resolution = "contest" | |
| progress.resolution_status = "lost_late" | |
| progress.resolved_at_step = self._state.step_count | |
| return ( | |
| -0.2, | |
| f"Representment for case {case.case_id} was submitted after the deadline.", | |
| ) | |
| if case.issuer_response_delay_steps > 0: | |
| progress.resolution_status = "pending_issuer_review" | |
| progress.pending_issuer_round_number = 1 | |
| progress.pending_issuer_due_step = ( | |
| self._state.step_count + case.issuer_response_delay_steps | |
| ) | |
| return ( | |
| 0.12, | |
| f"Submitted representment for case {case.case_id}; issuer review is pending " | |
| f"for {case.issuer_response_delay_steps} steps.", | |
| ) | |
| # Every on-time packet is handed to the scripted Issuer. Missing | |
| # required evidence and attached harmful evidence are not terminal — | |
| # they push the score down so the Issuer requests more evidence | |
| # (round 2) or escalates to arbitration (round 3), exercising the | |
| # multi-round dispute path the rubric is built for. | |
| review = self._invoke_issuer_review(case, progress, round_number=1) | |
| return self._apply_issuer_review_result(case, progress, review, round_number=1) | |
| def _invoke_issuer_review( | |
| self, | |
| case: InternalCase, | |
| progress: CaseProgress, | |
| *, | |
| round_number: int, | |
| ) -> IssuerReview: | |
| """Shared helper that calls the scripted Issuer and records the decision.""" | |
| review = self._issuer_agent.decide_review(case, progress, round_number=round_number) | |
| progress.issuer_decisions.append(review.decision.value) | |
| progress.issuer_rationales.append(review.rationale) | |
| return review | |
| def _apply_issuer_review_result( | |
| self, | |
| case: InternalCase, | |
| progress: CaseProgress, | |
| review: IssuerReview, | |
| *, | |
| round_number: int, | |
| ) -> tuple[float, str]: | |
| """Apply a completed Issuer review to case state.""" | |
| progress.pending_issuer_round_number = None | |
| progress.pending_issuer_due_step = None | |
| if round_number == 1: | |
| if review.decision == IssuerDecision.ACCEPT: | |
| progress.final_resolution = "contest" | |
| progress.resolution_status = "won" | |
| progress.resolved_at_step = self._state.step_count | |
| progress.final_economic_outcome = case.amount | |
| return ( | |
| 0.45, | |
| f"Issuer accepted representment for case {case.case_id} " | |
| f"(score {review.evidence_strength_score:.2f}). {review.rationale}", | |
| ) | |
| if review.decision == IssuerDecision.REQUEST_MORE_EVIDENCE: | |
| progress.round_number = 2 | |
| progress.resolution_status = "open" | |
| return ( | |
| -0.05, | |
| f"Issuer requested compelling evidence for case {case.case_id} " | |
| f"(score {review.evidence_strength_score:.2f}). {review.rationale}", | |
| ) | |
| # Defensive: Issuer should not escalate from round 1, but handle it. | |
| progress.final_resolution = "contest" | |
| progress.resolution_status = "lost_contest" | |
| progress.resolved_at_step = self._state.step_count | |
| return ( | |
| -0.12, | |
| f"Issuer escalated case {case.case_id} unexpectedly. {review.rationale}", | |
| ) | |
| if review.decision == IssuerDecision.ACCEPT: | |
| progress.final_resolution = "contest" | |
| progress.resolution_status = "won_pre_arb" | |
| progress.resolved_at_step = self._state.step_count | |
| progress.final_economic_outcome = case.amount | |
| return ( | |
| 0.35, | |
| f"Issuer accepted pre-arbitration packet for case {case.case_id} " | |
| f"(score {review.evidence_strength_score:.2f}). {review.rationale}", | |
| ) | |
| ruling = self._apply_arbitration(case, progress) | |
| return ( | |
| self._arbitration_reward(ruling), | |
| f"Issuer escalated case {case.case_id} to arbitration " | |
| f"(score {review.evidence_strength_score:.2f}). {ruling.rationale}", | |
| ) | |
| def _advance_pending_events(self) -> tuple[float, list[str]]: | |
| """Resolve delayed evidence pulls and delayed Issuer reviews.""" | |
| reward = 0.0 | |
| messages: list[str] = [] | |
| for case in self._task.cases: | |
| progress = self._progress_by_case[case.case_id] | |
| due_systems = [ | |
| system_name | |
| for system_name, due_step in progress.pending_evidence_systems.items() | |
| if self._state.step_count >= due_step | |
| ] | |
| for system_name in due_systems: | |
| new_evidence = case.evidence_by_system.get(system_name, ()) | |
| progress.retrieved_evidence_ids.update( | |
| item.evidence_id for item in new_evidence | |
| ) | |
| del progress.pending_evidence_systems[system_name] | |
| useful = sum(1 for item in new_evidence if item.helpful) | |
| reward += 0.03 if useful else 0.0 | |
| messages.append( | |
| f"Delayed {system_name} evidence arrived for {case.case_id} " | |
| f"({len(new_evidence)} items)." | |
| ) | |
| if ( | |
| progress.resolution_status == "pending_issuer_review" | |
| and progress.pending_issuer_due_step is not None | |
| and self._state.step_count >= progress.pending_issuer_due_step | |
| ): | |
| round_number = progress.pending_issuer_round_number or progress.round_number | |
| review = self._invoke_issuer_review( | |
| case, progress, round_number=round_number | |
| ) | |
| review_reward, review_result = self._apply_issuer_review_result( | |
| case, progress, review, round_number=round_number | |
| ) | |
| reward += review_reward | |
| messages.append(review_result) | |
| return reward, messages | |
| def _respond_to_pre_arb( | |
| self, | |
| case: InternalCase, | |
| *, | |
| compelling_evidence_ids: list[str], | |
| note: str | None = None, | |
| ) -> tuple[float, str]: | |
| """handler: attach compelling evidence and re-invoke the Issuer.""" | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.round_number != 2: | |
| raise ValueError( | |
| "respond_to_pre_arb is only valid after the Issuer requests more evidence." | |
| ) | |
| if progress.resolution_status != "open": | |
| return -0.05, f"Case {case.case_id} is already resolved." | |
| if not compelling_evidence_ids: | |
| raise ValueError( | |
| "respond_to_pre_arb requires at least one compelling_evidence_id." | |
| ) | |
| if note: | |
| progress.representment_note = note | |
| progress.merchant_submitted_at_step = self._state.step_count | |
| all_evidence = self._evidence_map(case) | |
| added: list[str] = [] | |
| reward = 0.0 | |
| for evidence_id in compelling_evidence_ids: | |
| if evidence_id not in all_evidence: | |
| reward -= 0.04 | |
| continue | |
| if evidence_id in progress.attached_evidence_ids: | |
| reward -= 0.02 | |
| continue | |
| # Retrieve it lazily if the agent points at a known system-sourced id. | |
| progress.retrieved_evidence_ids.add(evidence_id) | |
| progress.attached_evidence_ids.append(evidence_id) | |
| progress.pre_arb_evidence_added.append(evidence_id) | |
| added.append(evidence_id) | |
| evidence = all_evidence[evidence_id] | |
| if evidence.helpful: | |
| reward += 0.06 | |
| elif evidence.harmful: | |
| reward -= 0.1 | |
| else: | |
| reward += 0.01 | |
| if case.issuer_response_delay_steps > 0: | |
| progress.resolution_status = "pending_issuer_review" | |
| progress.pending_issuer_round_number = 2 | |
| progress.pending_issuer_due_step = ( | |
| self._state.step_count + case.issuer_response_delay_steps | |
| ) | |
| return ( | |
| reward + 0.08, | |
| f"Submitted pre-arbitration response for case {case.case_id} " | |
| f"with {', '.join(added) or 'no new'}; issuer review is pending.", | |
| ) | |
| review = self._invoke_issuer_review(case, progress, round_number=2) | |
| review_reward, review_result = self._apply_issuer_review_result( | |
| case, progress, review, round_number=2 | |
| ) | |
| return reward + review_reward, review_result | |
| def _escalate_to_arbitration(self, case: InternalCase) -> tuple[float, str]: | |
| """handler: merchant voluntarily files for arbitration.""" | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.round_number != 2: | |
| raise ValueError( | |
| "escalate_to_arbitration is only valid after a pre-arbitration round." | |
| ) | |
| if progress.resolution_status != "open": | |
| return -0.05, f"Case {case.case_id} is already resolved." | |
| ruling = self._apply_arbitration(case, progress) | |
| return ( | |
| self._arbitration_reward(ruling), | |
| f"Merchant escalated case {case.case_id} to network arbitration. " | |
| f"{ruling.rationale}", | |
| ) | |
| def _accept_arbitration_loss(self, case: InternalCase) -> tuple[float, str]: | |
| """handler: merchant concedes rather than pay the arbitration fee.""" | |
| progress = self._progress_by_case[case.case_id] | |
| if progress.round_number != 2: | |
| raise ValueError( | |
| "accept_arbitration_loss is only valid after a pre-arbitration round." | |
| ) | |
| if progress.resolution_status != "open": | |
| return -0.05, f"Case {case.case_id} is already resolved." | |
| progress.final_resolution = "accept_arbitration_loss" | |
| progress.resolution_status = "conceded_pre_arb" | |
| progress.resolved_at_step = self._state.step_count | |
| progress.arbitration_outcome = None | |
| progress.arb_fees_paid = 0.0 | |
| progress.final_economic_outcome = -case.amount | |
| return ( | |
| -0.1, | |
| f"Merchant accepted arbitration loss on case {case.case_id}; " | |
| f"no arb fee paid but the $" | |
| f"{case.amount:.2f} dispute amount is forfeited.", | |
| ) | |
| def _apply_arbitration( | |
| self, case: InternalCase, progress: CaseProgress | |
| ) -> ArbitrationRuling: | |
| """Run the deterministic arbitration rule and record its economic impact.""" | |
| ruling = arbitration_ruling(case, progress) | |
| progress.round_number = 3 | |
| progress.arbitration_outcome = ruling.outcome.value | |
| progress.arb_fees_paid = ruling.arb_fee_per_side | |
| progress.final_economic_outcome = ruling.merchant_net_pnl | |
| progress.final_resolution = "contest" | |
| progress.resolved_at_step = self._state.step_count | |
| if ruling.outcome == ArbitrationOutcome.MERCHANT_WINS: | |
| progress.resolution_status = "won_arbitration" | |
| else: | |
| progress.resolution_status = "lost_arbitration" | |
| return ruling | |
| def _arbitration_reward(ruling: ArbitrationRuling) -> float: | |
| """Map an arbitration ruling to a step reward. | |
| Winning arbitration is worth more than winning round 1 because it | |
| clears a harder bar; losing stings more because both the fee and the | |
| disputed amount are gone. | |
| """ | |
| if ruling.outcome == ArbitrationOutcome.MERCHANT_WINS: | |
| return 0.55 | |
| return -0.35 | |
| def _resolve_case( | |
| self, | |
| case: InternalCase, | |
| strategy: str | None, | |
| ) -> tuple[float, str]: | |
| progress = self._progress_by_case[case.case_id] | |
| resolution = strategy or progress.current_strategy | |
| if resolution not in {"accept_chargeback", "issue_refund"}: | |
| raise ValueError( | |
| "resolve_case requires strategy accept_chargeback or issue_refund." | |
| ) | |
| if progress.resolution_status != "open": | |
| return -0.04, f"Case {case.case_id} is already resolved." | |
| progress.final_resolution = resolution | |
| progress.current_strategy = resolution | |
| progress.resolved_at_step = self._state.step_count | |
| progress.resolution_status = ( | |
| "refunded" if resolution == "issue_refund" else "accepted_chargeback" | |
| ) | |
| if self._state.step_count > case.deadline_step: | |
| return -0.15, f"Resolved case {case.case_id} after the response deadline." | |
| if resolution == case.optimal_strategy: | |
| return ( | |
| 0.16, | |
| f"Resolved case {case.case_id} with the optimal non-contest strategy.", | |
| ) | |
| if resolution in case.acceptable_strategies: | |
| return ( | |
| 0.06, | |
| f"Resolved case {case.case_id} with an acceptable fallback strategy.", | |
| ) | |
| return -0.12, f"Resolved case {case.case_id} with the wrong strategy." | |
| def _apply_deadline_penalties(self) -> float: | |
| penalty = 0.0 | |
| for case in self._task.cases: | |
| if not self._case_arrived(case): | |
| continue | |
| progress = self._progress_by_case[case.case_id] | |
| if ( | |
| progress.resolution_status == "open" | |
| and self._state.step_count > case.deadline_step | |
| ): | |
| if not progress.deadline_penalized: | |
| progress.deadline_penalized = True | |
| penalty -= 0.15 | |
| return penalty | |
| def _check_done(self) -> bool: | |
| if self._all_cases_resolved(): | |
| self._done = True | |
| elif self._state.step_count >= self._task.max_steps: | |
| self._done = True | |
| return self._done | |
| def _all_cases_resolved(self) -> bool: | |
| return all( | |
| progress.resolution_status != "open" | |
| and progress.resolution_status != "pending_issuer_review" | |
| for progress in self._progress_by_case.values() | |
| ) | |
| def _case_arrived(self, case: InternalCase) -> bool: | |
| return self._state.step_count >= case.arrival_step | |
| def _visible_task_cases(self) -> list[InternalCase]: | |
| return [case for case in self._task.cases if self._case_arrived(case)] | |
| def _future_case_count(self) -> int: | |
| return sum(1 for case in self._task.cases if not self._case_arrived(case)) | |
| def _pending_work_count(self) -> int: | |
| pending = 0 | |
| for progress in self._progress_by_case.values(): | |
| if progress.resolution_status == "pending_issuer_review": | |
| pending += 1 | |
| pending += len(progress.pending_evidence_systems) | |
| return pending | |
| def _lookup_case(self, case_id: str) -> InternalCase: | |
| for case in self._task.cases: | |
| if case.case_id == case_id: | |
| return case | |
| raise ValueError(f"Unknown case_id '{case_id}'.") | |
| def _require_case(self, case_id: str | None) -> InternalCase: | |
| target_case_id = case_id or self._selected_case_id | |
| if target_case_id is None: | |
| raise ValueError("Select a case before taking this action.") | |
| case = self._lookup_case(target_case_id) | |
| if not self._case_arrived(case): | |
| raise ValueError(f"Case {case.case_id} has not arrived in the queue yet.") | |
| return case | |
| def _evidence_map(self, case: InternalCase): | |
| return { | |
| item.evidence_id: item | |
| for items in case.evidence_by_system.values() | |
| for item in items | |
| } | |
| def _case_fingerprint(self, case: InternalCase) -> str: | |
| return hashlib.sha1( | |
| f"{case.case_id}|{case.order_id}|{case.customer_id}|{case.reason_code}".encode( | |
| "utf-8" | |
| ) | |
| ).hexdigest() | |
| def _case_display_metadata(self, case: InternalCase) -> dict[str, str]: | |
| fingerprint = self._case_fingerprint(case) | |
| merchant_name, merchant_mcc = self._MERCHANT_PROFILES.get( | |
| case.reason_code, | |
| ("Atlas Commerce", "5999"), | |
| ) | |
| last4 = str(int(fingerprint[:8], 16))[-4:].zfill(4) | |
| base_time = datetime(2025, 1, 1, 9, 0, tzinfo=timezone.utc) | |
| txn_minutes = int(fingerprint[8:14], 16) % (180 * 24 * 60) | |
| dispute_lag_hours = 24 + (int(fingerprint[14:18], 16) % 96) | |
| transaction_dt = base_time + timedelta(minutes=txn_minutes) | |
| dispute_dt = transaction_dt + timedelta(hours=dispute_lag_hours) | |
| return { | |
| "transaction_id": f"txn_{fingerprint[:14]}", | |
| "transaction_timestamp": transaction_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| "dispute_opened_at": dispute_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| "merchant_name": merchant_name, | |
| "merchant_mcc": merchant_mcc, | |
| "masked_card": f"**** **** **** {last4}", | |
| } | |
| def _episode_metrics(self) -> dict[str, float]: | |
| """User-observable episode metrics. Never exposes grader-internal | |
| labels such as required/helpful/harmful evidence IDs or coverage | |
| against the hidden answer key.""" | |
| open_cases = 0 | |
| urgent_cases = 0 | |
| resolved_cases = 0 | |
| total_attached = 0 | |
| total_retrieved = 0 | |
| pending_issuer = 0 | |
| pending_evidence = 0 | |
| for case in self._task.cases: | |
| progress = self._progress_by_case[case.case_id] | |
| total_attached += len(progress.attached_evidence_ids) | |
| total_retrieved += len(progress.retrieved_evidence_ids) | |
| pending_evidence += len(progress.pending_evidence_systems) | |
| if progress.resolution_status == "pending_issuer_review": | |
| pending_issuer += 1 | |
| if not self._case_arrived(case): | |
| continue | |
| steps_until_deadline = case.deadline_step - self._state.step_count | |
| if progress.resolution_status == "open": | |
| open_cases += 1 | |
| if steps_until_deadline <= 2: | |
| urgent_cases += 1 | |
| else: | |
| resolved_cases += 1 | |
| deadline_pressure = ( | |
| 0.0 if len(self._task.cases) == 0 else urgent_cases / len(self._task.cases) | |
| ) | |
| triage_efficiency = resolved_cases / max(1, self._state.step_count) | |
| return { | |
| "open_case_count": float(open_cases), | |
| "resolved_case_count": float(resolved_cases), | |
| "deadline_pressure_index": round(deadline_pressure, 4), | |
| "triage_efficiency": round(triage_efficiency, 4), | |
| "total_evidence_attached": float(total_attached), | |
| "total_evidence_retrieved": float(total_retrieved), | |
| "pending_issuer_reviews": float(pending_issuer), | |
| "pending_evidence_requests": float(pending_evidence), | |
| "future_case_count": float(self._future_case_count()), | |
| } | |
| def _selected_case_info(self) -> dict[str, object]: | |
| """Per-case diagnostic info visible to agents. Only exposes | |
| signals an analyst could observe (deadline proximity, which | |
| systems haven't been queried, counts). Does NOT expose which | |
| evidence IDs are required, helpful, or harmful.""" | |
| if self._selected_case_id is None: | |
| return { | |
| "deadline_warning": False, | |
| "unqueried_systems": [], | |
| } | |
| case = self._lookup_case(self._selected_case_id) | |
| progress = self._progress_by_case[case.case_id] | |
| all_systems = {"orders", "payment", "shipping", "support", "refunds", "risk"} | |
| return { | |
| "deadline_warning": (case.deadline_step - self._state.step_count) <= 2, | |
| "unqueried_systems": sorted( | |
| all_systems.difference(progress.revealed_systems) | |
| ), | |
| "attached_evidence_count": len(progress.attached_evidence_ids), | |
| "retrieved_evidence_count": len(progress.retrieved_evidence_ids), | |
| "pending_evidence_systems": sorted(progress.pending_evidence_systems), | |
| "pending_issuer_review": progress.resolution_status | |
| == "pending_issuer_review", | |
| "steps_until_deadline": case.deadline_step - self._state.step_count, | |
| } | |
| def _build_queue(self) -> list[CaseQueueItem]: | |
| queue = [] | |
| for case in self._visible_task_cases(): | |
| progress = self._progress_by_case[case.case_id] | |
| display = self._case_display_metadata(case) | |
| queue.append( | |
| CaseQueueItem( | |
| case_id=case.case_id, | |
| transaction_id=display["transaction_id"], | |
| transaction_timestamp=display["transaction_timestamp"], | |
| dispute_opened_at=display["dispute_opened_at"], | |
| merchant_name=display["merchant_name"], | |
| merchant_mcc=display["merchant_mcc"], | |
| masked_card=display["masked_card"], | |
| card_network=case.card_network, | |
| network_reason_code=case.network_reason_code, | |
| response_window_days=case.response_window_days, | |
| amount=case.amount, | |
| currency=case.currency, | |
| reason_code=case.reason_code, | |
| status=progress.resolution_status, | |
| summary=case.summary, | |
| deadline_step=case.deadline_step, | |
| steps_until_deadline=case.deadline_step - self._state.step_count, | |
| ) | |
| ) | |
| return queue | |
| def _build_visible_case(self) -> VisibleCase | None: | |
| if self._selected_case_id is None: | |
| return None | |
| case = self._lookup_case(self._selected_case_id) | |
| progress = self._progress_by_case[case.case_id] | |
| display = self._case_display_metadata(case) | |
| evidence_map = self._evidence_map(case) | |
| retrieved = [ | |
| EvidenceCard( | |
| evidence_id=evidence_id, | |
| source_system=evidence_map[evidence_id].source_system, | |
| title=evidence_map[evidence_id].title, | |
| summary=evidence_map[evidence_id].summary, | |
| attached=evidence_id in progress.attached_evidence_ids, | |
| ) | |
| for evidence_id in sorted(progress.retrieved_evidence_ids) | |
| ] | |
| attached = [ | |
| EvidenceCard( | |
| evidence_id=evidence_id, | |
| source_system=evidence_map[evidence_id].source_system, | |
| title=evidence_map[evidence_id].title, | |
| summary=evidence_map[evidence_id].summary, | |
| attached=True, | |
| ) | |
| for evidence_id in progress.attached_evidence_ids | |
| ] | |
| policy = None | |
| if progress.policy_retrieved: | |
| policy = PolicyView( | |
| reason_code=case.reason_code, | |
| guidance=case.policy_guidance, | |
| required_evidence=list(case.policy_requirements), | |
| ) | |
| return VisibleCase( | |
| case_id=case.case_id, | |
| transaction_id=display["transaction_id"], | |
| transaction_timestamp=display["transaction_timestamp"], | |
| dispute_opened_at=display["dispute_opened_at"], | |
| order_id=case.order_id, | |
| customer_id=case.customer_id, | |
| merchant_name=display["merchant_name"], | |
| merchant_mcc=display["merchant_mcc"], | |
| masked_card=display["masked_card"], | |
| card_network=case.card_network, | |
| network_reason_code=case.network_reason_code, | |
| response_window_days=case.response_window_days, | |
| compelling_evidence_category=case.compelling_evidence_category, | |
| amount=case.amount, | |
| currency=case.currency, | |
| reason_code=case.reason_code, | |
| status=progress.resolution_status, | |
| current_strategy=progress.current_strategy, | |
| summary=case.summary, | |
| inspection_notes=case.inspection_notes if progress.inspected else None, | |
| systems_revealed=sorted(progress.revealed_systems), | |
| retrieved_evidence=retrieved, | |
| attached_evidence=attached, | |
| policy=policy, | |
| submission_status=progress.resolution_status | |
| if progress.resolution_status != "open" | |
| else None, | |
| round_number=progress.round_number, | |
| last_issuer_decision=( | |
| progress.issuer_decisions[-1] if progress.issuer_decisions else None | |
| ), | |
| last_issuer_rationale=( | |
| progress.issuer_rationales[-1] if progress.issuer_rationales else None | |
| ), | |
| pre_arb_evidence_added=list(progress.pre_arb_evidence_added), | |
| arbitration_outcome=progress.arbitration_outcome, | |
| arb_fees_paid=progress.arb_fees_paid, | |
| final_economic_outcome=progress.final_economic_outcome, | |
| ) | |
| def _build_available_actions(self) -> list[str]: | |
| if self._done: | |
| return [] | |
| base = ["select_case"] | |
| open_visible = [ | |
| case | |
| for case in self._visible_task_cases() | |
| if self._progress_by_case[case.case_id].resolution_status == "open" | |
| ] | |
| has_pending_or_future = ( | |
| self._pending_work_count() > 0 or self._future_case_count() > 0 | |
| ) | |
| if self._selected_case_id is None: | |
| return base if open_visible else ["wait_for_updates"] | |
| case_progress = self._progress_by_case[self._selected_case_id] | |
| if case_progress.resolution_status != "open": | |
| return base if open_visible else ["wait_for_updates"] | |
| if case_progress.round_number == 2: | |
| # Pre-arbitration: investigation actions still help (e.g. to pull | |
| # compelling evidence from a system) but the round-1 submit path | |
| # is closed off in favour of the three terminal pre-arb actions. | |
| return base + [ | |
| "query_system", | |
| "retrieve_policy", | |
| "add_evidence", | |
| "remove_evidence", | |
| "respond_to_pre_arb", | |
| "escalate_to_arbitration", | |
| "accept_arbitration_loss", | |
| ] | |
| actions = base + [ | |
| "inspect_case", | |
| "query_system", | |
| "retrieve_policy", | |
| "add_evidence", | |
| "remove_evidence", | |
| "set_strategy", | |
| "submit_representment", | |
| "resolve_case", | |
| ] | |
| if has_pending_or_future and not open_visible: | |
| actions.append("wait_for_updates") | |
| return actions | |
| def _estimated_progress_score(self) -> float: | |
| report = grade_episode( | |
| self._task, | |
| self._progress_by_case, | |
| self._state.step_count, | |
| self._state.episode_id or "", | |
| completed=False, | |
| ) | |
| return report.normalized_score | |
| def _build_observation( | |
| self, | |
| reward: float, | |
| done: bool, | |
| result: str | None = None, | |
| ) -> ChargebackOpsObservation: | |
| progress_score = ( | |
| self._latest_report.normalized_score | |
| if self._latest_report is not None | |
| else self._estimated_progress_score() | |
| ) | |
| self._state.queue_state = [ | |
| CaseResolutionState( | |
| case_id=case.case_id, | |
| status=self._progress_by_case[case.case_id].resolution_status, | |
| current_strategy=self._progress_by_case[case.case_id].current_strategy, | |
| resolved=self._progress_by_case[case.case_id].resolution_status | |
| != "open", | |
| steps_until_deadline=case.deadline_step - self._state.step_count, | |
| ) | |
| for case in self._task.cases | |
| ] | |
| self._state.action_history = [ | |
| ActionTraceItem( | |
| step_index=record.step_index, | |
| action_type=record.action_type, | |
| case_id=record.case_id, | |
| outcome=record.outcome, | |
| reward=record.reward, | |
| ) | |
| for record in self._action_history | |
| ] | |
| self._state.selected_case_id = self._selected_case_id | |
| self._state.metrics = self._episode_metrics() | |
| observation_info = { | |
| **self._selected_case_info(), | |
| "episode_metrics": self._state.metrics, | |
| "current_task_max_steps": self._task.max_steps, | |
| } | |
| return ChargebackOpsObservation( | |
| task_id=self._task.task_id, | |
| task_title=self._task.title, | |
| difficulty=self._task.difficulty, | |
| objective=self._task.objective, | |
| selected_case_id=self._selected_case_id, | |
| queue=self._build_queue(), | |
| visible_case=self._build_visible_case(), | |
| last_action_result=result or self._last_action_result, | |
| available_actions=self._build_available_actions(), | |
| steps_remaining=max(0, self._task.max_steps - self._state.step_count), | |
| progress_score=round(progress_score, 4), | |
| info=observation_info, | |
| grader_report=self._latest_report, | |
| done=done, | |
| reward=reward, | |
| metadata={ | |
| "reward_components": { | |
| "step_reward": reward, | |
| "progress_score": round(progress_score, 4), | |
| }, | |
| "info": observation_info, | |
| }, | |
| ) | |
| def state(self) -> ChargebackOpsState: | |
| return self._state | |