"""Core environment implementation for ChargebackOps.""" from __future__ import annotations import hashlib from datetime import datetime, timedelta, timezone from uuid import uuid4 from openenv.core.env_server.interfaces import Environment try: from ..core.episode_store import record_report from ..evaluation.grading import grade_episode from ..evaluation.rubrics import ChargebackOpsEpisodeRubric from ..core.models import ( ActionTraceItem, CaseQueueItem, CaseResolutionState, ChargebackOpsAction, ChargebackOpsObservation, ChargebackOpsState, EvidenceCard, PolicyView, VisibleCase, ) from ..scenarios.arbitration import ( ARB_FEE_PER_SIDE, ArbitrationOutcome, ArbitrationRuling, arbitration_ruling, ) from ..scenarios.issuer_model import IssuerAgent, IssuerDecision, IssuerReview from ..scenarios.simulation import ( ActionRecord, CaseProgress, InternalCase, get_task, ) except ImportError: # pragma: no cover from core.episode_store import record_report from evaluation.grading import grade_episode from evaluation.rubrics import ChargebackOpsEpisodeRubric from core.models import ( ActionTraceItem, CaseQueueItem, CaseResolutionState, ChargebackOpsAction, ChargebackOpsObservation, ChargebackOpsState, EvidenceCard, PolicyView, VisibleCase, ) from scenarios.arbitration import ( ARB_FEE_PER_SIDE, ArbitrationOutcome, ArbitrationRuling, arbitration_ruling, ) from scenarios.issuer_model import IssuerAgent, IssuerDecision, IssuerReview from scenarios.simulation import ActionRecord, CaseProgress, InternalCase, get_task class ChargebackOpsEnvironment( Environment[ChargebackOpsAction, ChargebackOpsObservation, ChargebackOpsState] ): """Synthetic merchant chargeback representment environment.""" SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): super().__init__(rubric=ChargebackOpsEpisodeRubric()) self._task = get_task("goods_not_received_easy") self._selected_case_id: str | None = None self._last_action_result = "Environment initialized." self._action_history: list[ActionRecord] = [] self._progress_by_case: dict[str, CaseProgress] = {} self._issuer_agent = IssuerAgent() self._state = ChargebackOpsState( episode_id=str(uuid4()), step_count=0, task_id=self._task.task_id, task_title=self._task.title, difficulty=self._task.difficulty, objective=self._task.objective, ) self._done = False self._latest_report = None self._reset_task_state() _MERCHANT_PROFILES = { "goods_not_received": ("Northwind Home", "5712"), "fraud_cnp": ("TechForge Direct", "5732"), "credit_not_processed": ("StreamClub Media", "4899"), "duplicate_processing": ("Meridian Retail", "5311"), "product_not_as_described": ("Aurora Commerce", "5942"), "service_not_provided": ("Metro Service Group", "7299"), } def _reset_task_state(self) -> None: self._progress_by_case = { case.case_id: CaseProgress() for case in self._task.cases } self._selected_case_id = None self._action_history = [] self._last_action_result = f"{self._task.title} ready." self._done = False self._latest_report = None def reset( self, seed: int | None = None, episode_id: str | None = None, **kwargs, ) -> ChargebackOpsObservation: task_id = kwargs.get("task_id") difficulty = kwargs.get("difficulty") _VALID_DIFFICULTIES = {"easy", "medium", "hard", "nightmare"} if difficulty is not None and difficulty not in _VALID_DIFFICULTIES: raise ValueError( f"Invalid difficulty {difficulty!r}. " f"Must be one of: {', '.join(sorted(_VALID_DIFFICULTIES))}" ) if task_id is None and difficulty in _VALID_DIFFICULTIES: resolved_seed = ( seed if seed is not None else int(kwargs.get("generated_seed", 42)) ) task_id = f"generated_{difficulty}_s{resolved_seed}" if task_id is None: task_id = "goods_not_received_easy" self._task = get_task(task_id) self._state = ChargebackOpsState( episode_id=episode_id or str(uuid4()), step_count=0, task_id=self._task.task_id, task_title=self._task.title, difficulty=self._task.difficulty, objective=self._task.objective, ) self._reset_task_state() self._reset_rubric() return self._build_observation(reward=0.0, done=False) def step( self, action: ChargebackOpsAction, timeout_s: float | None = None, **kwargs, ) -> ChargebackOpsObservation: del timeout_s, kwargs if self._done: return self._build_observation( reward=-0.1, done=True, result="Episode already completed. Reset to start another task.", ) self._state.step_count += 1 reward = 0.0 result = "" try: reward, result = self._apply_action(action) except ValueError as exc: reward = -0.12 result = str(exc) case_id = action.case_id or self._selected_case_id if case_id and case_id in self._progress_by_case: self._progress_by_case[case_id].invalid_actions += 1 event_reward, event_messages = self._advance_pending_events() reward += event_reward if event_messages: suffix = " ".join(event_messages) result = f"{result} Updates: {suffix}" if result else suffix reward += self._apply_deadline_penalties() done = self._check_done() if done: report = grade_episode( self._task, self._progress_by_case, self._state.step_count, self._state.episode_id or "", completed=self._all_cases_resolved(), ) self._latest_report = report self._state.latest_grade = report.normalized_score self._state.grader_report = report self._state.completed = True record_report(report) reward += 0.5 * report.normalized_score else: self._state.latest_grade = self._estimated_progress_score() self._state.completed = False self._last_action_result = result self._action_history.append( ActionRecord( step_index=self._state.step_count, action_type=action.action_type, case_id=action.case_id or self._selected_case_id, outcome=result, reward=round(reward, 4), ) ) return self._build_observation( reward=round(reward, 4), done=done, result=result, ) def _apply_action(self, action: ChargebackOpsAction) -> tuple[float, str]: if action.action_type == "select_case": return self._select_case(action.case_id) if action.action_type == "wait_for_updates": return self._wait_for_updates() case = self._require_case(action.case_id) progress = self._progress_by_case[case.case_id] if progress.resolution_status == "pending_issuer_review": raise ValueError( f"Case {case.case_id} is pending issuer review; select another case or wait for updates." ) if action.action_type == "inspect_case": return self._inspect_case(case) if action.action_type == "query_system": return self._query_system(case, action.system_name) if action.action_type == "retrieve_policy": return self._retrieve_policy(case) if action.action_type == "add_evidence": return self._add_evidence(case, action.evidence_ids) if action.action_type == "remove_evidence": return self._remove_evidence(case, action.evidence_ids) if action.action_type == "set_strategy": return self._set_strategy(case, action.strategy) if action.action_type == "submit_representment": return self._submit_representment(case, note=action.note) if action.action_type == "resolve_case": return self._resolve_case(case, action.strategy) if action.action_type == "respond_to_pre_arb": return self._respond_to_pre_arb( case, compelling_evidence_ids=action.compelling_evidence_ids, note=action.note, ) if action.action_type == "escalate_to_arbitration": return self._escalate_to_arbitration(case) if action.action_type == "accept_arbitration_loss": return self._accept_arbitration_loss(case) raise ValueError(f"Unsupported action_type '{action.action_type}'.") def _select_case(self, case_id: str | None) -> tuple[float, str]: if not case_id: raise ValueError("select_case requires case_id.") case = self._lookup_case(case_id) if not self._case_arrived(case): raise ValueError(f"Case {case.case_id} has not arrived in the queue yet.") self._selected_case_id = case.case_id progress = self._progress_by_case[case.case_id] if progress.resolution_status != "open": return -0.02, f"Case {case.case_id} is already resolved." return 0.02, f"Selected case {case.case_id}." def _wait_for_updates(self) -> tuple[float, str]: """Advance the clock when all visible work is blocked. Long-horizon tasks include pending issuer reviews, delayed evidence pulls, and future case arrivals. Waiting while work is available is penalised; waiting when the backlog is genuinely blocked is a small neutral/positive planning action. """ open_visible = [ case for case in self._visible_task_cases() if self._progress_by_case[case.case_id].resolution_status == "open" ] if open_visible: return -0.04, "Waited while open cases were available." pending = self._pending_work_count() if pending > 0: return 0.02, f"Waited for {pending} pending updates." future = self._future_case_count() if future > 0: return 0.01, f"Waited for future case arrivals ({future} not yet visible)." return -0.03, "No pending updates or future arrivals to wait for." def _inspect_case(self, case: InternalCase) -> tuple[float, str]: progress = self._progress_by_case[case.case_id] if progress.inspected: return -0.01, f"Case {case.case_id} was already inspected." progress.inspected = True return 0.04, f"Inspected case {case.case_id}." def _query_system( self, case: InternalCase, system_name: str | None, ) -> tuple[float, str]: if system_name is None: raise ValueError("query_system requires system_name.") progress = self._progress_by_case[case.case_id] if system_name in progress.revealed_systems: if system_name in progress.pending_evidence_systems: return ( -0.01, f"System '{system_name}' is already queued for delayed evidence on case {case.case_id}.", ) progress.duplicate_queries += 1 return ( -0.03, f"System '{system_name}' was already queried for case {case.case_id}.", ) progress.revealed_systems.add(system_name) if ( system_name in case.delayed_systems and case.evidence_response_delay_steps > 0 ): progress.pending_evidence_systems[system_name] = ( self._state.step_count + case.evidence_response_delay_steps ) return ( 0.02, f"Queued delayed {system_name} evidence for case {case.case_id}; expected in " f"{case.evidence_response_delay_steps} steps.", ) new_evidence = case.evidence_by_system.get(system_name, ()) progress.retrieved_evidence_ids.update( item.evidence_id for item in new_evidence ) helpful = sum(1 for item in new_evidence if item.helpful) if helpful > 0: return 0.06 + 0.01 * helpful, ( f"Queried {system_name} for case {case.case_id}; found {len(new_evidence)} evidence items, " f"including {helpful} useful ones." ) return -0.01 if len(new_evidence) == 0 else 0.01, ( f"Queried {system_name} for case {case.case_id}; found {len(new_evidence)} evidence items." ) def _retrieve_policy(self, case: InternalCase) -> tuple[float, str]: progress = self._progress_by_case[case.case_id] if progress.policy_retrieved: return -0.01, f"Policy already retrieved for case {case.case_id}." progress.policy_retrieved = True return 0.05, f"Retrieved policy guidance for case {case.case_id}." def _add_evidence( self, case: InternalCase, evidence_ids: list[str], ) -> tuple[float, str]: if not evidence_ids: raise ValueError("add_evidence requires at least one evidence id.") progress = self._progress_by_case[case.case_id] all_evidence = self._evidence_map(case) reward = 0.0 added = [] for evidence_id in evidence_ids: if evidence_id not in progress.retrieved_evidence_ids: reward -= 0.04 continue if evidence_id in progress.attached_evidence_ids: reward -= 0.02 continue progress.attached_evidence_ids.append(evidence_id) added.append(evidence_id) evidence = all_evidence[evidence_id] if evidence.helpful: reward += 0.08 elif evidence.harmful: reward -= 0.08 else: reward += 0.01 return reward if added else -0.05, ( f"Attached evidence {', '.join(added)} to case {case.case_id}." if added else f"No evidence was attached to case {case.case_id}." ) def _remove_evidence( self, case: InternalCase, evidence_ids: list[str], ) -> tuple[float, str]: if not evidence_ids: raise ValueError("remove_evidence requires at least one evidence id.") progress = self._progress_by_case[case.case_id] evidence_map = self._evidence_map(case) reward = 0.0 removed = [] for evidence_id in evidence_ids: if evidence_id not in progress.attached_evidence_ids: reward -= 0.02 continue progress.attached_evidence_ids.remove(evidence_id) removed.append(evidence_id) if evidence_map[evidence_id].harmful: reward += 0.05 elif evidence_map[evidence_id].helpful: reward -= 0.03 else: reward += 0.01 return reward if removed else -0.04, ( f"Removed evidence {', '.join(removed)} from case {case.case_id}." if removed else f"No evidence was removed from case {case.case_id}." ) def _set_strategy( self, case: InternalCase, strategy: str | None, ) -> tuple[float, str]: if strategy is None: raise ValueError("set_strategy requires strategy.") progress = self._progress_by_case[case.case_id] progress.current_strategy = strategy if strategy == case.optimal_strategy: return ( 0.1, f"Set the optimal strategy '{strategy}' for case {case.case_id}.", ) if strategy in case.acceptable_strategies: return ( 0.03, f"Set an acceptable fallback strategy '{strategy}' for case {case.case_id}.", ) return -0.08, f"Set a weak strategy '{strategy}' for case {case.case_id}." def _submit_representment( self, case: InternalCase, *, note: str | None = None ) -> tuple[float, str]: progress = self._progress_by_case[case.case_id] progress.submit_attempts += 1 progress.merchant_submitted_at_step = self._state.step_count if note: progress.representment_note = note if progress.current_strategy != "contest": raise ValueError( "submit_representment requires current strategy to be 'contest'." ) if progress.resolution_status != "open": return -0.05, f"Case {case.case_id} is already resolved." if self._state.step_count > case.deadline_step: progress.final_resolution = "contest" progress.resolution_status = "lost_late" progress.resolved_at_step = self._state.step_count return ( -0.2, f"Representment for case {case.case_id} was submitted after the deadline.", ) if case.issuer_response_delay_steps > 0: progress.resolution_status = "pending_issuer_review" progress.pending_issuer_round_number = 1 progress.pending_issuer_due_step = ( self._state.step_count + case.issuer_response_delay_steps ) return ( 0.12, f"Submitted representment for case {case.case_id}; issuer review is pending " f"for {case.issuer_response_delay_steps} steps.", ) # Every on-time packet is handed to the scripted Issuer. Missing # required evidence and attached harmful evidence are not terminal — # they push the score down so the Issuer requests more evidence # (round 2) or escalates to arbitration (round 3), exercising the # multi-round dispute path the rubric is built for. review = self._invoke_issuer_review(case, progress, round_number=1) return self._apply_issuer_review_result(case, progress, review, round_number=1) def _invoke_issuer_review( self, case: InternalCase, progress: CaseProgress, *, round_number: int, ) -> IssuerReview: """Shared helper that calls the scripted Issuer and records the decision.""" review = self._issuer_agent.decide_review(case, progress, round_number=round_number) progress.issuer_decisions.append(review.decision.value) progress.issuer_rationales.append(review.rationale) return review def _apply_issuer_review_result( self, case: InternalCase, progress: CaseProgress, review: IssuerReview, *, round_number: int, ) -> tuple[float, str]: """Apply a completed Issuer review to case state.""" progress.pending_issuer_round_number = None progress.pending_issuer_due_step = None if round_number == 1: if review.decision == IssuerDecision.ACCEPT: progress.final_resolution = "contest" progress.resolution_status = "won" progress.resolved_at_step = self._state.step_count progress.final_economic_outcome = case.amount return ( 0.45, f"Issuer accepted representment for case {case.case_id} " f"(score {review.evidence_strength_score:.2f}). {review.rationale}", ) if review.decision == IssuerDecision.REQUEST_MORE_EVIDENCE: progress.round_number = 2 progress.resolution_status = "open" return ( -0.05, f"Issuer requested compelling evidence for case {case.case_id} " f"(score {review.evidence_strength_score:.2f}). {review.rationale}", ) # Defensive: Issuer should not escalate from round 1, but handle it. progress.final_resolution = "contest" progress.resolution_status = "lost_contest" progress.resolved_at_step = self._state.step_count return ( -0.12, f"Issuer escalated case {case.case_id} unexpectedly. {review.rationale}", ) if review.decision == IssuerDecision.ACCEPT: progress.final_resolution = "contest" progress.resolution_status = "won_pre_arb" progress.resolved_at_step = self._state.step_count progress.final_economic_outcome = case.amount return ( 0.35, f"Issuer accepted pre-arbitration packet for case {case.case_id} " f"(score {review.evidence_strength_score:.2f}). {review.rationale}", ) ruling = self._apply_arbitration(case, progress) return ( self._arbitration_reward(ruling), f"Issuer escalated case {case.case_id} to arbitration " f"(score {review.evidence_strength_score:.2f}). {ruling.rationale}", ) def _advance_pending_events(self) -> tuple[float, list[str]]: """Resolve delayed evidence pulls and delayed Issuer reviews.""" reward = 0.0 messages: list[str] = [] for case in self._task.cases: progress = self._progress_by_case[case.case_id] due_systems = [ system_name for system_name, due_step in progress.pending_evidence_systems.items() if self._state.step_count >= due_step ] for system_name in due_systems: new_evidence = case.evidence_by_system.get(system_name, ()) progress.retrieved_evidence_ids.update( item.evidence_id for item in new_evidence ) del progress.pending_evidence_systems[system_name] useful = sum(1 for item in new_evidence if item.helpful) reward += 0.03 if useful else 0.0 messages.append( f"Delayed {system_name} evidence arrived for {case.case_id} " f"({len(new_evidence)} items)." ) if ( progress.resolution_status == "pending_issuer_review" and progress.pending_issuer_due_step is not None and self._state.step_count >= progress.pending_issuer_due_step ): round_number = progress.pending_issuer_round_number or progress.round_number review = self._invoke_issuer_review( case, progress, round_number=round_number ) review_reward, review_result = self._apply_issuer_review_result( case, progress, review, round_number=round_number ) reward += review_reward messages.append(review_result) return reward, messages def _respond_to_pre_arb( self, case: InternalCase, *, compelling_evidence_ids: list[str], note: str | None = None, ) -> tuple[float, str]: """handler: attach compelling evidence and re-invoke the Issuer.""" progress = self._progress_by_case[case.case_id] if progress.round_number != 2: raise ValueError( "respond_to_pre_arb is only valid after the Issuer requests more evidence." ) if progress.resolution_status != "open": return -0.05, f"Case {case.case_id} is already resolved." if not compelling_evidence_ids: raise ValueError( "respond_to_pre_arb requires at least one compelling_evidence_id." ) if note: progress.representment_note = note progress.merchant_submitted_at_step = self._state.step_count all_evidence = self._evidence_map(case) added: list[str] = [] reward = 0.0 for evidence_id in compelling_evidence_ids: if evidence_id not in all_evidence: reward -= 0.04 continue if evidence_id in progress.attached_evidence_ids: reward -= 0.02 continue # Retrieve it lazily if the agent points at a known system-sourced id. progress.retrieved_evidence_ids.add(evidence_id) progress.attached_evidence_ids.append(evidence_id) progress.pre_arb_evidence_added.append(evidence_id) added.append(evidence_id) evidence = all_evidence[evidence_id] if evidence.helpful: reward += 0.06 elif evidence.harmful: reward -= 0.1 else: reward += 0.01 if case.issuer_response_delay_steps > 0: progress.resolution_status = "pending_issuer_review" progress.pending_issuer_round_number = 2 progress.pending_issuer_due_step = ( self._state.step_count + case.issuer_response_delay_steps ) return ( reward + 0.08, f"Submitted pre-arbitration response for case {case.case_id} " f"with {', '.join(added) or 'no new'}; issuer review is pending.", ) review = self._invoke_issuer_review(case, progress, round_number=2) review_reward, review_result = self._apply_issuer_review_result( case, progress, review, round_number=2 ) return reward + review_reward, review_result def _escalate_to_arbitration(self, case: InternalCase) -> tuple[float, str]: """handler: merchant voluntarily files for arbitration.""" progress = self._progress_by_case[case.case_id] if progress.round_number != 2: raise ValueError( "escalate_to_arbitration is only valid after a pre-arbitration round." ) if progress.resolution_status != "open": return -0.05, f"Case {case.case_id} is already resolved." ruling = self._apply_arbitration(case, progress) return ( self._arbitration_reward(ruling), f"Merchant escalated case {case.case_id} to network arbitration. " f"{ruling.rationale}", ) def _accept_arbitration_loss(self, case: InternalCase) -> tuple[float, str]: """handler: merchant concedes rather than pay the arbitration fee.""" progress = self._progress_by_case[case.case_id] if progress.round_number != 2: raise ValueError( "accept_arbitration_loss is only valid after a pre-arbitration round." ) if progress.resolution_status != "open": return -0.05, f"Case {case.case_id} is already resolved." progress.final_resolution = "accept_arbitration_loss" progress.resolution_status = "conceded_pre_arb" progress.resolved_at_step = self._state.step_count progress.arbitration_outcome = None progress.arb_fees_paid = 0.0 progress.final_economic_outcome = -case.amount return ( -0.1, f"Merchant accepted arbitration loss on case {case.case_id}; " f"no arb fee paid but the $" f"{case.amount:.2f} dispute amount is forfeited.", ) def _apply_arbitration( self, case: InternalCase, progress: CaseProgress ) -> ArbitrationRuling: """Run the deterministic arbitration rule and record its economic impact.""" ruling = arbitration_ruling(case, progress) progress.round_number = 3 progress.arbitration_outcome = ruling.outcome.value progress.arb_fees_paid = ruling.arb_fee_per_side progress.final_economic_outcome = ruling.merchant_net_pnl progress.final_resolution = "contest" progress.resolved_at_step = self._state.step_count if ruling.outcome == ArbitrationOutcome.MERCHANT_WINS: progress.resolution_status = "won_arbitration" else: progress.resolution_status = "lost_arbitration" return ruling @staticmethod def _arbitration_reward(ruling: ArbitrationRuling) -> float: """Map an arbitration ruling to a step reward. Winning arbitration is worth more than winning round 1 because it clears a harder bar; losing stings more because both the fee and the disputed amount are gone. """ if ruling.outcome == ArbitrationOutcome.MERCHANT_WINS: return 0.55 return -0.35 def _resolve_case( self, case: InternalCase, strategy: str | None, ) -> tuple[float, str]: progress = self._progress_by_case[case.case_id] resolution = strategy or progress.current_strategy if resolution not in {"accept_chargeback", "issue_refund"}: raise ValueError( "resolve_case requires strategy accept_chargeback or issue_refund." ) if progress.resolution_status != "open": return -0.04, f"Case {case.case_id} is already resolved." progress.final_resolution = resolution progress.current_strategy = resolution progress.resolved_at_step = self._state.step_count progress.resolution_status = ( "refunded" if resolution == "issue_refund" else "accepted_chargeback" ) if self._state.step_count > case.deadline_step: return -0.15, f"Resolved case {case.case_id} after the response deadline." if resolution == case.optimal_strategy: return ( 0.16, f"Resolved case {case.case_id} with the optimal non-contest strategy.", ) if resolution in case.acceptable_strategies: return ( 0.06, f"Resolved case {case.case_id} with an acceptable fallback strategy.", ) return -0.12, f"Resolved case {case.case_id} with the wrong strategy." def _apply_deadline_penalties(self) -> float: penalty = 0.0 for case in self._task.cases: if not self._case_arrived(case): continue progress = self._progress_by_case[case.case_id] if ( progress.resolution_status == "open" and self._state.step_count > case.deadline_step ): if not progress.deadline_penalized: progress.deadline_penalized = True penalty -= 0.15 return penalty def _check_done(self) -> bool: if self._all_cases_resolved(): self._done = True elif self._state.step_count >= self._task.max_steps: self._done = True return self._done def _all_cases_resolved(self) -> bool: return all( progress.resolution_status != "open" and progress.resolution_status != "pending_issuer_review" for progress in self._progress_by_case.values() ) def _case_arrived(self, case: InternalCase) -> bool: return self._state.step_count >= case.arrival_step def _visible_task_cases(self) -> list[InternalCase]: return [case for case in self._task.cases if self._case_arrived(case)] def _future_case_count(self) -> int: return sum(1 for case in self._task.cases if not self._case_arrived(case)) def _pending_work_count(self) -> int: pending = 0 for progress in self._progress_by_case.values(): if progress.resolution_status == "pending_issuer_review": pending += 1 pending += len(progress.pending_evidence_systems) return pending def _lookup_case(self, case_id: str) -> InternalCase: for case in self._task.cases: if case.case_id == case_id: return case raise ValueError(f"Unknown case_id '{case_id}'.") def _require_case(self, case_id: str | None) -> InternalCase: target_case_id = case_id or self._selected_case_id if target_case_id is None: raise ValueError("Select a case before taking this action.") case = self._lookup_case(target_case_id) if not self._case_arrived(case): raise ValueError(f"Case {case.case_id} has not arrived in the queue yet.") return case def _evidence_map(self, case: InternalCase): return { item.evidence_id: item for items in case.evidence_by_system.values() for item in items } def _case_fingerprint(self, case: InternalCase) -> str: return hashlib.sha1( f"{case.case_id}|{case.order_id}|{case.customer_id}|{case.reason_code}".encode( "utf-8" ) ).hexdigest() def _case_display_metadata(self, case: InternalCase) -> dict[str, str]: fingerprint = self._case_fingerprint(case) merchant_name, merchant_mcc = self._MERCHANT_PROFILES.get( case.reason_code, ("Atlas Commerce", "5999"), ) last4 = str(int(fingerprint[:8], 16))[-4:].zfill(4) base_time = datetime(2025, 1, 1, 9, 0, tzinfo=timezone.utc) txn_minutes = int(fingerprint[8:14], 16) % (180 * 24 * 60) dispute_lag_hours = 24 + (int(fingerprint[14:18], 16) % 96) transaction_dt = base_time + timedelta(minutes=txn_minutes) dispute_dt = transaction_dt + timedelta(hours=dispute_lag_hours) return { "transaction_id": f"txn_{fingerprint[:14]}", "transaction_timestamp": transaction_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), "dispute_opened_at": dispute_dt.strftime("%Y-%m-%dT%H:%M:%SZ"), "merchant_name": merchant_name, "merchant_mcc": merchant_mcc, "masked_card": f"**** **** **** {last4}", } def _episode_metrics(self) -> dict[str, float]: """User-observable episode metrics. Never exposes grader-internal labels such as required/helpful/harmful evidence IDs or coverage against the hidden answer key.""" open_cases = 0 urgent_cases = 0 resolved_cases = 0 total_attached = 0 total_retrieved = 0 pending_issuer = 0 pending_evidence = 0 for case in self._task.cases: progress = self._progress_by_case[case.case_id] total_attached += len(progress.attached_evidence_ids) total_retrieved += len(progress.retrieved_evidence_ids) pending_evidence += len(progress.pending_evidence_systems) if progress.resolution_status == "pending_issuer_review": pending_issuer += 1 if not self._case_arrived(case): continue steps_until_deadline = case.deadline_step - self._state.step_count if progress.resolution_status == "open": open_cases += 1 if steps_until_deadline <= 2: urgent_cases += 1 else: resolved_cases += 1 deadline_pressure = ( 0.0 if len(self._task.cases) == 0 else urgent_cases / len(self._task.cases) ) triage_efficiency = resolved_cases / max(1, self._state.step_count) return { "open_case_count": float(open_cases), "resolved_case_count": float(resolved_cases), "deadline_pressure_index": round(deadline_pressure, 4), "triage_efficiency": round(triage_efficiency, 4), "total_evidence_attached": float(total_attached), "total_evidence_retrieved": float(total_retrieved), "pending_issuer_reviews": float(pending_issuer), "pending_evidence_requests": float(pending_evidence), "future_case_count": float(self._future_case_count()), } def _selected_case_info(self) -> dict[str, object]: """Per-case diagnostic info visible to agents. Only exposes signals an analyst could observe (deadline proximity, which systems haven't been queried, counts). Does NOT expose which evidence IDs are required, helpful, or harmful.""" if self._selected_case_id is None: return { "deadline_warning": False, "unqueried_systems": [], } case = self._lookup_case(self._selected_case_id) progress = self._progress_by_case[case.case_id] all_systems = {"orders", "payment", "shipping", "support", "refunds", "risk"} return { "deadline_warning": (case.deadline_step - self._state.step_count) <= 2, "unqueried_systems": sorted( all_systems.difference(progress.revealed_systems) ), "attached_evidence_count": len(progress.attached_evidence_ids), "retrieved_evidence_count": len(progress.retrieved_evidence_ids), "pending_evidence_systems": sorted(progress.pending_evidence_systems), "pending_issuer_review": progress.resolution_status == "pending_issuer_review", "steps_until_deadline": case.deadline_step - self._state.step_count, } def _build_queue(self) -> list[CaseQueueItem]: queue = [] for case in self._visible_task_cases(): progress = self._progress_by_case[case.case_id] display = self._case_display_metadata(case) queue.append( CaseQueueItem( case_id=case.case_id, transaction_id=display["transaction_id"], transaction_timestamp=display["transaction_timestamp"], dispute_opened_at=display["dispute_opened_at"], merchant_name=display["merchant_name"], merchant_mcc=display["merchant_mcc"], masked_card=display["masked_card"], card_network=case.card_network, network_reason_code=case.network_reason_code, response_window_days=case.response_window_days, amount=case.amount, currency=case.currency, reason_code=case.reason_code, status=progress.resolution_status, summary=case.summary, deadline_step=case.deadline_step, steps_until_deadline=case.deadline_step - self._state.step_count, ) ) return queue def _build_visible_case(self) -> VisibleCase | None: if self._selected_case_id is None: return None case = self._lookup_case(self._selected_case_id) progress = self._progress_by_case[case.case_id] display = self._case_display_metadata(case) evidence_map = self._evidence_map(case) retrieved = [ EvidenceCard( evidence_id=evidence_id, source_system=evidence_map[evidence_id].source_system, title=evidence_map[evidence_id].title, summary=evidence_map[evidence_id].summary, attached=evidence_id in progress.attached_evidence_ids, ) for evidence_id in sorted(progress.retrieved_evidence_ids) ] attached = [ EvidenceCard( evidence_id=evidence_id, source_system=evidence_map[evidence_id].source_system, title=evidence_map[evidence_id].title, summary=evidence_map[evidence_id].summary, attached=True, ) for evidence_id in progress.attached_evidence_ids ] policy = None if progress.policy_retrieved: policy = PolicyView( reason_code=case.reason_code, guidance=case.policy_guidance, required_evidence=list(case.policy_requirements), ) return VisibleCase( case_id=case.case_id, transaction_id=display["transaction_id"], transaction_timestamp=display["transaction_timestamp"], dispute_opened_at=display["dispute_opened_at"], order_id=case.order_id, customer_id=case.customer_id, merchant_name=display["merchant_name"], merchant_mcc=display["merchant_mcc"], masked_card=display["masked_card"], card_network=case.card_network, network_reason_code=case.network_reason_code, response_window_days=case.response_window_days, compelling_evidence_category=case.compelling_evidence_category, amount=case.amount, currency=case.currency, reason_code=case.reason_code, status=progress.resolution_status, current_strategy=progress.current_strategy, summary=case.summary, inspection_notes=case.inspection_notes if progress.inspected else None, systems_revealed=sorted(progress.revealed_systems), retrieved_evidence=retrieved, attached_evidence=attached, policy=policy, submission_status=progress.resolution_status if progress.resolution_status != "open" else None, round_number=progress.round_number, last_issuer_decision=( progress.issuer_decisions[-1] if progress.issuer_decisions else None ), last_issuer_rationale=( progress.issuer_rationales[-1] if progress.issuer_rationales else None ), pre_arb_evidence_added=list(progress.pre_arb_evidence_added), arbitration_outcome=progress.arbitration_outcome, arb_fees_paid=progress.arb_fees_paid, final_economic_outcome=progress.final_economic_outcome, ) def _build_available_actions(self) -> list[str]: if self._done: return [] base = ["select_case"] open_visible = [ case for case in self._visible_task_cases() if self._progress_by_case[case.case_id].resolution_status == "open" ] has_pending_or_future = ( self._pending_work_count() > 0 or self._future_case_count() > 0 ) if self._selected_case_id is None: return base if open_visible else ["wait_for_updates"] case_progress = self._progress_by_case[self._selected_case_id] if case_progress.resolution_status != "open": return base if open_visible else ["wait_for_updates"] if case_progress.round_number == 2: # Pre-arbitration: investigation actions still help (e.g. to pull # compelling evidence from a system) but the round-1 submit path # is closed off in favour of the three terminal pre-arb actions. return base + [ "query_system", "retrieve_policy", "add_evidence", "remove_evidence", "respond_to_pre_arb", "escalate_to_arbitration", "accept_arbitration_loss", ] actions = base + [ "inspect_case", "query_system", "retrieve_policy", "add_evidence", "remove_evidence", "set_strategy", "submit_representment", "resolve_case", ] if has_pending_or_future and not open_visible: actions.append("wait_for_updates") return actions def _estimated_progress_score(self) -> float: report = grade_episode( self._task, self._progress_by_case, self._state.step_count, self._state.episode_id or "", completed=False, ) return report.normalized_score def _build_observation( self, reward: float, done: bool, result: str | None = None, ) -> ChargebackOpsObservation: progress_score = ( self._latest_report.normalized_score if self._latest_report is not None else self._estimated_progress_score() ) self._state.queue_state = [ CaseResolutionState( case_id=case.case_id, status=self._progress_by_case[case.case_id].resolution_status, current_strategy=self._progress_by_case[case.case_id].current_strategy, resolved=self._progress_by_case[case.case_id].resolution_status != "open", steps_until_deadline=case.deadline_step - self._state.step_count, ) for case in self._task.cases ] self._state.action_history = [ ActionTraceItem( step_index=record.step_index, action_type=record.action_type, case_id=record.case_id, outcome=record.outcome, reward=record.reward, ) for record in self._action_history ] self._state.selected_case_id = self._selected_case_id self._state.metrics = self._episode_metrics() observation_info = { **self._selected_case_info(), "episode_metrics": self._state.metrics, "current_task_max_steps": self._task.max_steps, } return ChargebackOpsObservation( task_id=self._task.task_id, task_title=self._task.title, difficulty=self._task.difficulty, objective=self._task.objective, selected_case_id=self._selected_case_id, queue=self._build_queue(), visible_case=self._build_visible_case(), last_action_result=result or self._last_action_result, available_actions=self._build_available_actions(), steps_remaining=max(0, self._task.max_steps - self._state.step_count), progress_score=round(progress_score, 4), info=observation_info, grader_report=self._latest_report, done=done, reward=reward, metadata={ "reward_components": { "step_reward": reward, "progress_score": round(progress_score, 4), }, "info": observation_info, }, ) @property def state(self) -> ChargebackOpsState: return self._state