ChargeBackOps / runners /baseline_runner.py
mitudrudutta's picture
feat: Implement wait_for_updates action for handling delayed cases and evidence
2dedffd
"""Baseline runner for ChargebackOps."""
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from typing import Any
from openai import OpenAI
from pydantic import BaseModel, Field
try:
from ..evaluation.grading import grade_episode
from ..core.models import BaselineRunResult, BaselineTaskResult, ChargebackOpsAction
from ..server.chargeback_ops_environment import ChargebackOpsEnvironment
from ..scenarios.simulation import list_tasks
except ImportError: # pragma: no cover
from evaluation.grading import grade_episode
from core.models import BaselineRunResult, BaselineTaskResult, ChargebackOpsAction
from server.chargeback_ops_environment import ChargebackOpsEnvironment
from scenarios.simulation import list_tasks
try: # pragma: no cover
from dotenv import load_dotenv
except ImportError: # pragma: no cover
load_dotenv = None
if load_dotenv is not None: # pragma: no cover
load_dotenv()
DEFAULT_PROVIDER = "openrouter"
MAX_LLM_CANDIDATES = 4
MAX_PROVIDER_RESPONSE_TOKENS = 200
DEFAULT_MODELS = {
"openrouter": "openai/gpt-oss-120b",
"groq": "llama-3.3-70b-versatile",
"openai": "gpt-4.1-mini",
"anthropic": "claude-sonnet-4-20250514",
"google": "gemini-2.5-flash",
}
# Ordered fallback: try each until one succeeds.
_FALLBACK_CHAIN: list[tuple[str, str]] = [
("openrouter", "openai/gpt-oss-120b"),
("google", "gemini-2.5-flash"),
("groq", "llama-3.3-70b-versatile"),
]
def _provider_timeout_seconds() -> float:
raw_value = os.getenv("BASELINE_REQUEST_TIMEOUT_SECONDS", "15")
try:
return max(1.0, float(raw_value))
except ValueError:
return 4.0
def _provider_retry_attempts() -> int:
raw_value = os.getenv("PROVIDER_RATE_LIMIT_RETRIES", "2")
try:
return max(0, int(raw_value))
except ValueError:
return 0
def _provider_retry_backoff_seconds() -> float:
raw_value = os.getenv("PROVIDER_RETRY_BACKOFF_SECONDS", "1.0")
try:
return max(0.1, float(raw_value))
except ValueError:
return 0.5
def _strict_llm_mode() -> bool:
return os.getenv("STRICT_LLM_MODE", "").strip().lower() in {
"1",
"true",
"yes",
"on",
}
def _should_retry_provider_error(exc: Exception) -> bool:
return exc.__class__.__name__ in {
"RateLimitError",
"APITimeoutError",
"APIConnectionError",
"InternalServerError",
}
def _chat_completion_with_retry(client: OpenAI, **kwargs):
last_exc: Exception | None = None
max_attempts = 1 + _provider_retry_attempts()
backoff = _provider_retry_backoff_seconds()
for attempt in range(max_attempts):
try:
return client.chat.completions.create(**kwargs)
except Exception as exc:
last_exc = exc
if attempt >= max_attempts - 1 or not _should_retry_provider_error(exc):
raise
time.sleep(backoff * (attempt + 1))
if last_exc is not None:
raise last_exc
raise RuntimeError("Provider completion failed without raising an exception.")
class CandidateChoice(BaseModel):
"""Structured choice returned by an LLM provider."""
candidate_index: int = Field(ge=0)
rationale: str
@dataclass
class CandidateAction:
"""One valid candidate action for the baseline policy."""
action: ChargebackOpsAction
summary: str
@dataclass(frozen=True)
class ProviderConfig:
"""Resolved provider configuration."""
provider: str
model_name: str
def _best_open_case(queue: list[dict[str, Any]]) -> dict[str, Any] | None:
open_cases = [case for case in queue if case["status"] == "open"]
if not open_cases:
return None
return sorted(
open_cases,
key=lambda item: (item["steps_until_deadline"], -item["amount"]),
)[0]
_NOTE_TEMPLATES: dict[str, str] = {
"goods_not_received": (
"Order confirmation and carrier delivery confirmation establish fulfillment. "
"The shipment was delivered to the customer address on file."
),
"fraud_cnp": (
"Prior good order linkage and customer account confirmation tie the cardholder "
"to the transaction. Risk analysis and support records confirm legitimacy."
),
"product_not_as_described": (
"Product listing verification confirms the item matches the description. "
"Return policy documentation shows the customer bypassed the return process."
),
"service_not_provided": (
"Service completion record and customer acknowledgment confirm the service "
"was delivered as agreed. Booking confirmation and delivery records attached."
),
"credit_not_processed": (
"Refund record and payment confirmation document the credit processing timeline. "
"Transaction records confirm the refund was issued per policy."
),
"duplicate_processing": (
"Payment records confirm duplicate charge identification. "
"Refund documentation attached to support resolution."
),
}
def _build_representment_note(visible_case: dict[str, Any]) -> str:
"""Generate a representment note summarizing the dispute contest rationale."""
reason = visible_case.get("reason_code", "")
base = _NOTE_TEMPLATES.get(
reason, f"Contesting {reason.replace('_', ' ')} dispute with attached evidence."
)
# Inject policy requirement keywords directly for claims coverage scoring.
policy = visible_case.get("policy")
if policy:
requirements = policy.get("requirements", [])
if requirements:
base += " Evidence covers: " + ", ".join(requirements) + "."
guidance = policy.get("guidance", "")
if guidance and "contest" in guidance.lower():
# Extract requirement phrases from guidance text.
for word in guidance.split():
clean = word.strip(".,;:").lower()
if len(clean) > 4 and clean not in base.lower():
pass # Already covered by requirements list
# Reference evidence IDs directly for coherence scoring.
attached = visible_case.get("attached_evidence", [])
if attached:
eids = [e["evidence_id"] for e in attached if not _is_harmful_evidence(e)]
if eids:
base += " Supporting evidence: " + ", ".join(eids) + "."
return base[:500]
def _visible_case_deadline(queue: list[dict[str, Any]], case_id: str) -> int:
for case in queue:
if case["case_id"] == case_id:
return case["steps_until_deadline"]
return 999
_NEGATIVE_SIGNAL_KEYWORDS = {
"mismatch",
"failed",
"declined",
"suspicious",
"flagged",
"fraud risk",
"unauthorized",
"rejected",
"invalid",
"expired",
"violation",
"non-compliant",
"discrepancy",
"inconsistent",
"unverified",
}
def _is_harmful_evidence(item: dict[str, Any]) -> bool:
"""Conservative heuristic: flag evidence with negative-signal language."""
text = (item.get("title", "") + " " + item.get("summary", "")).lower()
return any(kw in text for kw in _NEGATIVE_SIGNAL_KEYWORDS)
def _rank_attachable(item: dict[str, Any]) -> int:
text = (item["title"] + " " + item["summary"]).lower()
if any(kw in text for kw in _NEGATIVE_SIGNAL_KEYWORDS):
return 999
if "signature" in text:
return 0
if "completion" in text or "booking" in text:
return 0
if "listing" in text:
return 0
if "duplicate" in text:
return 1
if "delivery" in text:
return 1
if "prior" in text or "account" in text or "authenticated" in text:
return 1
if "return policy" in text or "refund" in text or "cancel" in text:
return 2
if "confirmation" in text:
return 2
if "cancellation" in text:
return 2
return 4
def _batch_attachable_ids(
retrieved_items: list[dict[str, Any]], attached_ids: set[str]
) -> list[str]:
filtered = [
item
for item in retrieved_items
if item["evidence_id"] not in attached_ids and _rank_attachable(item) < 999
]
filtered.sort(key=_rank_attachable)
return [item["evidence_id"] for item in filtered]
def candidate_actions(observation: dict[str, Any]) -> list[CandidateAction]:
"""Build a prioritized candidate set from the current observation."""
queue = observation["queue"]
visible_case = observation.get("visible_case")
open_cases = [case for case in queue if case["status"] == "open"]
candidates: list[CandidateAction] = []
if not open_cases and "wait_for_updates" in observation.get("available_actions", []):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(action_type="wait_for_updates"),
summary="Wait for delayed issuer reviews, delayed evidence, or future case arrivals.",
)
)
return candidates
# Step cost estimates per reason code (select_case + full workflow).
_FAST_REASON_CODES = {
"goods_not_received",
"credit_not_processed",
"duplicate_processing",
}
_STEP_COST_ESTIMATE = {
"goods_not_received": 6, # select + 2 queries + attach + strategy + submit
"credit_not_processed": 3, # select + strategy + resolve
"duplicate_processing": 3, # select + strategy + resolve
"fraud_cnp": 8, # select + policy + 2-3 queries + attach + strategy + submit
"product_not_as_described": 8, # select + policy + 2-3 queries + attach + strategy + submit
"service_not_provided": 7, # select + policy + 2 queries + attach + strategy + submit
}
def _case_priority(item):
return (
item["steps_until_deadline"],
0 if item["reason_code"] in _FAST_REASON_CODES else 1,
-item["amount"],
)
if visible_case is None:
steps_remaining = observation.get("steps_remaining", 999)
# Smart triage: if total estimated cost > budget, fast-concede the cheapest-to-lose cases first.
if len(open_cases) > 1:
total_cost = sum(
_STEP_COST_ESTIMATE.get(c["reason_code"], 7) for c in open_cases
)
if total_cost > steps_remaining:
# Budget can't fit all cases. Strategy:
# 1. Handle deterministic-strategy cases first (cheapest, guaranteed outcome).
# 2. Then prioritize highest-amount cases with tightest deadlines.
# 3. Cases that can't fit get auto-conceded by the per-case budget check.
def _triage_key(c):
is_fast = c["reason_code"] in _FAST_REASON_CODES
# Fast cases go first (tier 0), then by amount descending (highest value first).
return (0 if is_fast else 1, -c["amount"])
ordered = sorted(open_cases, key=_triage_key)
for case in ordered:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="select_case", case_id=case["case_id"]
),
summary=(
f"Select case {case['case_id']} ({case['reason_code']}, amount ${case['amount']}, "
f"deadline in {case['steps_until_deadline']} steps)."
),
)
)
return candidates
for case in sorted(open_cases, key=_case_priority):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="select_case", case_id=case["case_id"]
),
summary=(
f"Select case {case['case_id']} ({case['reason_code']}, amount ${case['amount']}, "
f"deadline in {case['steps_until_deadline']} steps)."
),
)
)
return candidates
case_id = visible_case["case_id"]
if visible_case["status"] != "open":
for case in sorted(open_cases, key=_case_priority):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="select_case", case_id=case["case_id"]
),
summary=(
f"Switch to open case {case['case_id']} (deadline in {case['steps_until_deadline']} steps, "
f"amount ${case['amount']})."
),
)
)
if not candidates and "wait_for_updates" in observation.get("available_actions", []):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(action_type="wait_for_updates"),
summary="Wait because selected case is blocked and no open case is currently available.",
)
)
return candidates
# Round 2 (pre-arbitration). Issuer rejected the round-1 packet and is
# asking for compelling evidence. Three legal moves: respond_to_pre_arb,
# escalate_to_arbitration, accept_arbitration_loss.
available = set(observation.get("available_actions", []))
if "respond_to_pre_arb" in available:
retrieved_items_r2 = visible_case.get("retrieved_evidence", [])
attached_ids_r2 = {
item["evidence_id"] for item in visible_case.get("attached_evidence", [])
}
compelling_ids = [
item["evidence_id"]
for item in retrieved_items_r2
if item["evidence_id"] not in attached_ids_r2
and not _is_harmful_evidence(item)
]
compelling_ids = sorted(
compelling_ids,
key=lambda eid: _rank_attachable(
next(
item
for item in retrieved_items_r2
if item["evidence_id"] == eid
)
),
)[:2]
if compelling_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="respond_to_pre_arb",
case_id=case_id,
compelling_evidence_ids=compelling_ids,
note=_build_representment_note(visible_case),
),
summary=(
f"Respond to pre-arbitration with compelling evidence "
f"{', '.join(compelling_ids)} for case {case_id}."
),
)
)
return candidates
# No retrieved compelling evidence left. Try querying an unrevealed
# merchant system before giving up — round-2 budget often allows it
# and one extra +0.15 pre_arb piece can clear the 0.60 acceptance bar.
# Order matters: support/risk/refunds tend to hold compelling pieces;
# payment is mostly auth records and harmful AVS/CVV mismatches.
revealed = set(visible_case.get("systems_revealed", []))
all_systems = ("support", "risk", "refunds", "shipping", "orders", "payment")
unrevealed = [s for s in all_systems if s not in revealed]
if unrevealed and "query_system" in available:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=unrevealed[0],
),
summary=(
f"Query {unrevealed[0]} for compelling evidence "
f"on case {case_id} before deciding to escalate."
),
)
)
return candidates
# No compelling evidence anywhere. Decide on ROI: arbitration costs
# $250/side. Use the EV rule: escalate iff p_win * amount > arb_fee.
# Round-2 arbitration score is typically in the ambiguity band
# (P~0.5), so escalate when amount > 2 * 250 = 500.
amount = float(visible_case.get("amount", 0.0))
if amount >= 500.0 and "escalate_to_arbitration" in available:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="escalate_to_arbitration",
case_id=case_id,
),
summary=(
f"Escalate case {case_id} to arbitration "
f"(amount ${amount:.0f} clears the EV break-even)."
),
)
)
return candidates
if "accept_arbitration_loss" in available:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="accept_arbitration_loss",
case_id=case_id,
),
summary=(
f"Accept arbitration loss on case {case_id} — no "
f"compelling evidence and amount below ROI cutoff."
),
)
)
return candidates
current_deadline = _visible_case_deadline(queue, case_id)
best_other = _best_open_case(
[case for case in open_cases if case["case_id"] != case_id]
)
# Only switch to an urgent other case if the current case isn't close to completion.
# "Close" means: strategy is set and evidence attached (1 step to submit),
# OR evidence is attached and strategy just needs to be set (2 steps to finish).
_has_attached = len(visible_case.get("attached_evidence", [])) >= 1
current_near_completion = (
visible_case.get("current_strategy") == "contest" and _has_attached
) or (
_has_attached
and visible_case.get("current_strategy") is None
and current_deadline >= 2
)
if (
best_other is not None
and best_other["steps_until_deadline"] <= 1
and current_deadline > 1
and not current_near_completion
):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="select_case", case_id=best_other["case_id"]
),
summary=(
f"Switch to case {best_other['case_id']} immediately because its deadline is in "
f"{best_other['steps_until_deadline']} steps."
),
)
)
reason_code = visible_case["reason_code"]
# Reason codes with deterministic strategies — no need to retrieve policy.
# Only codes where the optimal strategy NEVER varies across generated/ISO cases.
# fraud_cnp, product_not_as_described, service_not_provided all vary.
_DETERMINISTIC_STRATEGY: dict[str, str] = {
"goods_not_received": "contest",
"credit_not_processed": "issue_refund",
"duplicate_processing": "issue_refund",
}
steps_remaining = observation.get("steps_remaining", 999)
budget_per_case = steps_remaining / max(len(open_cases), 1)
policy = visible_case.get("policy")
if policy is None:
if reason_code in _DETERMINISTIC_STRATEGY:
inferred_strategy = _DETERMINISTIC_STRATEGY[reason_code]
else:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="retrieve_policy", case_id=case_id
),
summary="Retrieve the chargeback policy for the selected reason code.",
)
)
inferred_strategy = None
else:
guidance_text = policy.get("guidance", "").lower()
if (
"do not contest" in guidance_text
or "concede" in guidance_text
or "not supportable" in guidance_text
):
inferred_strategy = "accept_chargeback"
elif (
"refund immediately" in guidance_text
or "refund" in guidance_text
and "contest" not in guidance_text
):
inferred_strategy = "issue_refund"
else:
inferred_strategy = "contest"
# How many steps remain before this case's deadline.
# After querying, we still need: attach(1) + set_strategy(1) + submit/resolve(1) = 3 steps.
# If policy isn't retrieved yet, add 1 for retrieve_policy.
_FIXED_COST = 3 # attach + strategy + submit
steps_to_deadline = current_deadline # steps_until_deadline from the queue
policy_cost = 0 if visible_case.get("policy") is not None else 1
max_queries_before_deadline = max(0, steps_to_deadline - _FIXED_COST - policy_cost)
systems_revealed = set(visible_case.get("systems_revealed", []))
current_strategy = visible_case.get("current_strategy")
retrieved_items = visible_case.get("retrieved_evidence", [])
attached_evidence = visible_case.get("attached_evidence", [])
attached_ids = {item["evidence_id"] for item in attached_evidence}
attachable_ids = _batch_attachable_ids(retrieved_items, attached_ids)
# Detect harmful evidence already attached — must remove before submit.
harmful_attached_ids = [
item["evidence_id"] for item in attached_evidence if _is_harmful_evidence(item)
]
# ── HARMFUL CLEANUP: if harmful evidence is attached, remove it immediately ──
if harmful_attached_ids:
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="remove_evidence",
case_id=case_id,
evidence_ids=harmful_attached_ids,
),
summary=f"Remove harmful evidence {', '.join(harmful_attached_ids)} before submission.",
),
)
return candidates
# ── DEADLINE URGENCY: if near deadline and we have evidence, submit/resolve NOW ──
if current_deadline <= 1:
if (
current_strategy is not None
and len(attached_ids) >= 1
and current_strategy == "contest"
):
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary=f"URGENT: Submit representment for {case_id} — deadline imminent.",
),
)
return candidates
if current_strategy in {"accept_chargeback", "issue_refund"}:
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy=current_strategy,
),
summary=f"URGENT: Resolve {case_id} with {current_strategy} — deadline imminent.",
),
)
return candidates
# ── TIGHT BUDGET: fast-concede if not enough steps to contest this case ──
# Full contest costs ~7 steps (policy + 2-3 queries + attach + strategy + submit).
# Fast-concede when:
# (a) Not enough global steps remaining, OR
# (b) Multi-case scenario where this case is lower-value and budget can't fit all.
_est_cost = (
_STEP_COST_ESTIMATE.get(reason_code, 7) - 1
) # subtract select_case already done
# Minimum contest: policy(1) + query(1) + attach(1) + strategy(1) + submit(1) = 5 steps.
_MIN_CONTEST_STEPS = 5
_should_fast_concede = False
if (
policy is None
and reason_code not in _DETERMINISTIC_STRATEGY
and current_strategy is None
and not systems_revealed
):
if (
steps_remaining < _MIN_CONTEST_STEPS
or current_deadline < _MIN_CONTEST_STEPS
):
# Not enough steps or deadline to even minimally contest.
_should_fast_concede = True
elif len(open_cases) > 1:
# Multi-case triage: concede if total cost > budget and this case is lowest-value.
total_cost = sum(
_STEP_COST_ESTIMATE.get(c["reason_code"], 7) for c in open_cases
)
if total_cost > steps_remaining:
lowest_amount = min(c["amount"] for c in open_cases)
this_amount = next(
c["amount"] for c in open_cases if c["case_id"] == case_id
)
if this_amount <= lowest_amount:
_should_fast_concede = True
if _should_fast_concede:
fallback = "issue_refund"
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy=fallback,
),
summary=f"Budget too tight to contest — fast-resolve {case_id} with {fallback}.",
),
)
return candidates
# ── BUDGET PRESSURE: if more open cases than steps, fast-resolve concedable ──
if steps_remaining <= len(open_cases) * 2 and inferred_strategy in {
"accept_chargeback",
"issue_refund",
}:
target_strat = inferred_strategy
if current_strategy != target_strat:
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy=target_strat,
),
summary=f"Fast-set strategy to {target_strat} under budget pressure.",
),
)
return candidates
candidates.insert(
0,
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy=target_strat,
),
summary=f"Fast-resolve {case_id} with {target_strat} under budget pressure.",
),
)
return candidates
if reason_code == "goods_not_received":
for system_name in ["orders", "shipping"]:
if system_name not in systems_revealed:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for evidence on case {case_id}.",
)
)
if attachable_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="add_evidence",
case_id=case_id,
evidence_ids=attachable_ids,
),
summary=f"Attach the strongest delivery evidence for case {case_id}.",
)
)
if current_strategy != "contest":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="contest",
),
summary="Set the strategy to contest the dispute.",
)
)
if len(attached_ids) >= 2:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary="Submit the current representment package.",
)
)
elif reason_code == "fraud_cnp":
should_contest = inferred_strategy == "contest"
if should_contest:
# Under tight budgets or deadline pressure, skip optional 'orders' query.
fraud_systems = ["risk", "support", "orders"]
unrevealed_fraud = [s for s in fraud_systems if s not in systems_revealed]
if (
len(unrevealed_fraud) > max_queries_before_deadline
or budget_per_case < 7
):
fraud_systems = ["risk", "support"]
unrevealed_fraud = [
s for s in fraud_systems if s not in systems_revealed
]
for system_name in unrevealed_fraud:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for evidence on case {case_id}.",
)
)
if attachable_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="add_evidence",
case_id=case_id,
evidence_ids=attachable_ids,
),
summary=f"Attach the strongest account-linkage evidence for case {case_id}.",
)
)
if current_strategy != "contest":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="contest",
),
summary="Set the strategy to contest the dispute.",
)
)
if len(attached_ids) >= 2:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary="Submit the current representment package.",
)
)
if current_strategy != "accept_chargeback":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="accept_chargeback",
),
summary="Set the strategy to accept the chargeback.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy="accept_chargeback",
),
summary="Concede the dispute and accept the chargeback.",
)
)
elif reason_code in {"credit_not_processed", "duplicate_processing"}:
# Fast-path: set strategy and resolve immediately — don't waste steps querying
if current_strategy != "issue_refund":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="issue_refund",
),
summary="Set the strategy to issue a refund immediately.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy="issue_refund",
),
summary="Resolve the case by issuing a refund.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy="accept_chargeback",
),
summary="Accept the chargeback as a fallback resolution.",
)
)
elif reason_code == "product_not_as_described":
if inferred_strategy in {"accept_chargeback", "issue_refund"}:
# Guidance says concede — fast-path
target = inferred_strategy
if current_strategy != target:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy", case_id=case_id, strategy=target
),
summary=f"Set strategy to {target} — listing defense not supportable.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case", case_id=case_id, strategy=target
),
summary=f"Resolve with {target} — conceding per policy guidance.",
)
)
else:
# Under deadline pressure, skip shipping (least critical for this reason code).
pna_systems = ["orders", "support", "shipping"]
unrevealed = [s for s in pna_systems if s not in systems_revealed]
if len(unrevealed) > max_queries_before_deadline:
pna_systems = ["orders", "support"] # Drop shipping
unrevealed = [s for s in pna_systems if s not in systems_revealed]
for system_name in unrevealed:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for listing and return-process evidence on case {case_id}.",
)
)
if attachable_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="add_evidence",
case_id=case_id,
evidence_ids=attachable_ids,
),
summary=f"Attach listing accuracy and return-policy evidence for case {case_id}.",
)
)
if current_strategy != "contest":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="contest",
),
summary="Set the strategy to contest the dispute.",
)
)
if len(attached_ids) >= 2:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary="Submit the current representment package.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy="issue_refund",
),
summary="Issue a refund as a fallback if the listing defense is not supportable.",
)
)
elif reason_code == "service_not_provided":
if inferred_strategy in {"accept_chargeback", "issue_refund"}:
target = inferred_strategy
if current_strategy != target:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy", case_id=case_id, strategy=target
),
summary=f"Set strategy to {target} — service defense not supportable.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case", case_id=case_id, strategy=target
),
summary=f"Resolve with {target} — conceding per policy guidance.",
)
)
else:
snp_systems = ["orders", "support"]
unrevealed_snp = [s for s in snp_systems if s not in systems_revealed]
if len(unrevealed_snp) > max_queries_before_deadline:
snp_systems = [
"support"
] # Support is most critical for service disputes.
unrevealed_snp = [s for s in snp_systems if s not in systems_revealed]
for system_name in unrevealed_snp:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for booking and completion evidence on case {case_id}.",
)
)
if attachable_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="add_evidence",
case_id=case_id,
evidence_ids=attachable_ids,
),
summary=f"Attach booking and completion evidence for case {case_id}.",
)
)
if current_strategy != "contest":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="contest",
),
summary="Set the strategy to contest the dispute.",
)
)
if len(attached_ids) >= 2:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary="Submit the current representment package.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy="issue_refund",
),
summary="Issue a refund as a fallback if the service-delivery defense is weak.",
)
)
elif inferred_strategy in {"accept_chargeback", "issue_refund"}:
for system_name in ["support", "refunds", "payment"]:
if system_name not in systems_revealed:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for concession evidence on case {case_id}.",
)
)
if current_strategy != inferred_strategy:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy=inferred_strategy,
),
summary=f"Set the strategy to {inferred_strategy}.",
)
)
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="resolve_case",
case_id=case_id,
strategy=inferred_strategy,
),
summary=f"Resolve the case with strategy {inferred_strategy}.",
)
)
else:
for system_name in ["orders", "support", "shipping", "risk"]:
if system_name not in systems_revealed:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="query_system",
case_id=case_id,
system_name=system_name,
),
summary=f"Query the {system_name} system for additional evidence on case {case_id}.",
)
)
if attachable_ids:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="add_evidence",
case_id=case_id,
evidence_ids=attachable_ids,
),
summary=f"Attach the strongest currently available evidence for case {case_id}.",
)
)
if current_strategy != "contest":
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="set_strategy",
case_id=case_id,
strategy="contest",
),
summary="Set the strategy to contest the dispute.",
)
)
if len(attached_ids) >= 1:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="submit_representment",
case_id=case_id,
note=_build_representment_note(visible_case),
),
summary="Submit the current representment package.",
)
)
if (
visible_case.get("inspection_notes") is None
and observation["steps_remaining"] > 3
):
candidates.append(
CandidateAction(
action=ChargebackOpsAction(action_type="inspect_case", case_id=case_id),
summary="Inspect the selected case to reveal merchant notes.",
)
)
for case in sorted(
open_cases, key=lambda item: (item["steps_until_deadline"], -item["amount"])
):
if case["case_id"] != case_id:
candidates.append(
CandidateAction(
action=ChargebackOpsAction(
action_type="select_case", case_id=case["case_id"]
),
summary=(
f"Switch to case {case['case_id']} (deadline in {case['steps_until_deadline']} steps, "
f"amount ${case['amount']})."
),
)
)
return candidates
def _heuristic_pick(candidates: list[CandidateAction]) -> CandidateAction:
return candidates[0]
def _obvious_next_action(
observation: dict[str, Any],
candidates: list[CandidateAction],
) -> CandidateAction | None:
"""Skip provider calls for deterministic housekeeping actions.
This preserves live model decisions for genuine branching states while keeping
baseline/inference runtime inside hackathon-friendly bounds.
"""
if not candidates:
return None
# Single candidate = no decision to make.
if len(candidates) == 1:
return candidates[0]
first = candidates[0]
visible_case = observation.get("visible_case")
queue = observation["queue"]
if visible_case is None:
open_cases = [case for case in queue if case["status"] == "open"]
if len(open_cases) == 1:
return first
urgent_cases = [
case for case in open_cases if case["steps_until_deadline"] <= 1
]
if (
len(urgent_cases) == 1
and first.action.action_type == "select_case"
and first.action.case_id == urgent_cases[0]["case_id"]
):
return first
return None
if visible_case["status"] != "open":
return first if first.action.action_type == "select_case" else None
# Strategy selection: the heuristic already derives the optimal strategy
# from policy + retrieved evidence. The LLM has no additional signal that
# improves this specific call — invoking it here has only caused regressions
# on fraud_signal_ambiguity and generated_medium_s99 where the model picks
# a concede-style strategy over the correct contest.
if first.action.action_type == "set_strategy":
return first
if first.action.action_type in {
"retrieve_policy",
"add_evidence",
"remove_evidence",
"submit_representment",
"resolve_case",
}:
return first
if first.action.action_type == "query_system":
current_strategy = visible_case.get("current_strategy")
if visible_case.get("policy") is None or current_strategy in {None, "contest"}:
return first
if first.action.action_type == "select_case":
current_case_id = visible_case["case_id"]
current_deadline = next(
(
case["steps_until_deadline"]
for case in queue
if case["case_id"] == current_case_id
),
999,
)
target_deadline = next(
(
case["steps_until_deadline"]
for case in queue
if case["case_id"] == first.action.case_id
),
999,
)
if target_deadline < current_deadline:
return first
return None
def _safe_json_loads(text: str) -> CandidateChoice | None:
try:
return CandidateChoice.model_validate_json(text)
except Exception:
start = text.find("{")
end = text.rfind("}")
if start == -1 or end == -1 or end <= start:
return None
try:
return CandidateChoice.model_validate_json(text[start : end + 1])
except Exception:
return None
def _compact_queue_item(case: dict[str, Any]) -> dict[str, Any]:
return {
"case_id": case["case_id"],
"reason_code": case["reason_code"],
"amount": case["amount"],
"status": case["status"],
"steps_until_deadline": case["steps_until_deadline"],
}
def _compact_visible_case(visible_case: dict[str, Any] | None) -> dict[str, Any] | None:
if visible_case is None:
return None
return {
"case_id": visible_case["case_id"],
"reason_code": visible_case["reason_code"],
"current_strategy": visible_case.get("current_strategy"),
"systems_revealed": visible_case.get("systems_revealed", []),
"attached_evidence": [
item["title"] for item in visible_case.get("attached_evidence", [])[:4]
],
"retrieved_evidence": [
item["title"] for item in visible_case.get("retrieved_evidence", [])[:6]
],
"policy": (
{
"guidance": visible_case["policy"]["guidance"],
"required_evidence": visible_case["policy"]["required_evidence"],
}
if visible_case.get("policy")
else None
),
"submission_status": visible_case.get("submission_status"),
}
def _provider_payload(
observation: dict[str, Any],
candidates: list[CandidateAction],
) -> tuple[list[CandidateAction], str]:
shortlist = candidates[: min(MAX_LLM_CANDIDATES, len(candidates))]
payload = json.dumps(
{
"task_id": observation["task_id"],
"steps_remaining": observation["steps_remaining"],
"selected_case_id": observation.get("selected_case_id"),
"queue": [_compact_queue_item(case) for case in observation["queue"]],
"visible_case": _compact_visible_case(observation.get("visible_case")),
"candidates": [
{"index": idx, "summary": candidate.summary}
for idx, candidate in enumerate(shortlist)
],
},
separators=(",", ":"),
)
return shortlist, payload
def _resolve_provider(
provider: str | None,
model_name: str | None,
) -> ProviderConfig:
chosen_provider = (
provider or os.getenv("BASELINE_PROVIDER") or DEFAULT_PROVIDER
).lower()
chosen_model = (
model_name
or os.getenv("BASELINE_MODEL")
or DEFAULT_MODELS.get(
chosen_provider,
"openai/gpt-oss-120b",
)
)
return ProviderConfig(provider=chosen_provider, model_name=chosen_model)
def _openai_compatible_client(config: ProviderConfig) -> OpenAI | None:
timeout_seconds = _provider_timeout_seconds()
if config.provider == "openai":
api_key = os.getenv("OPENAI_API_KEY")
return (
OpenAI(api_key=api_key, timeout=timeout_seconds, max_retries=0)
if api_key
else None
)
if config.provider == "openrouter":
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return None
headers = {}
if os.getenv("OPENROUTER_HTTP_REFERER"):
headers["HTTP-Referer"] = os.getenv("OPENROUTER_HTTP_REFERER", "")
if os.getenv("OPENROUTER_APP_TITLE"):
app_title = os.getenv("OPENROUTER_APP_TITLE", "")
headers["X-OpenRouter-Title"] = app_title
# Keep the legacy header for compatibility with older OpenRouter examples.
headers["X-Title"] = app_title
return OpenAI(
api_key=api_key,
base_url="https://openrouter.ai/api/v1",
default_headers=headers or None,
timeout=timeout_seconds,
max_retries=0,
)
if config.provider == "groq":
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
return None
return OpenAI(
api_key=api_key,
base_url="https://api.groq.com/openai/v1",
timeout=timeout_seconds,
max_retries=0,
)
if config.provider == "google":
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
return None
return OpenAI(
api_key=api_key,
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
timeout=timeout_seconds,
max_retries=0,
)
return None
def _provider_pick(
config: ProviderConfig,
observation: dict[str, Any],
candidates: list[CandidateAction],
) -> tuple[CandidateAction, bool, bool, str | None]:
shortlist, payload = _provider_payload(observation, candidates)
if config.provider in {"openai", "openrouter", "groq", "google"}:
client = _openai_compatible_client(config)
if client is None:
return shortlist[0], False, False, None
try:
response = _chat_completion_with_retry(
client,
model=config.model_name,
temperature=0,
max_tokens=MAX_PROVIDER_RESPONSE_TOKENS,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": (
"You are a merchant chargeback dispute analyst. Pick the single best next action from the ordered candidate list. "
"The candidates are pre-sorted by a deterministic heuristic — candidate 0 is usually correct. Deviate only when you spot a concrete reason. "
"\n"
"Reason-code → optimal strategy (follow unless evidence clearly contradicts):\n"
" goods_not_received → contest (with order + delivery proof)\n"
" fraud_cnp → contest when account linkage exists, otherwise concede\n"
" product_not_as_described → contest (with listing + return policy proof)\n"
" service_not_provided → contest (with completion log)\n"
" credit_not_processed → issue_refund immediately\n"
" duplicate_processing → issue_refund immediately\n"
"\n"
"Priorities: (1) resolve cases whose deadline is 1 step away before anything else, "
"(2) prefer the highest-$ open case when budget is tight, "
"(3) never attach harmful evidence (AVS/CVV mismatch on fraud_cnp, GPS anomalies on goods_not_received), "
"(4) when multiple candidates look equivalent, take candidate 0.\n"
'Return only JSON: {"candidate_index": N, "rationale": "brief reason"}'
),
},
{"role": "user", "content": payload},
],
)
content = response.choices[0].message.content or "{}"
choice = _safe_json_loads(content)
if choice is None:
return shortlist[0], True, False, "InvalidJSONResponse"
index = min(max(choice.candidate_index, 0), len(shortlist) - 1)
return shortlist[index], True, True, None
except Exception as exc:
return shortlist[0], True, False, exc.__class__.__name__
if config.provider == "anthropic":
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
return shortlist[0], False, False, None
try: # pragma: no cover
from anthropic import Anthropic
except ImportError: # pragma: no cover
return shortlist[0], False, False, None
try: # pragma: no cover
client = Anthropic(
api_key=api_key,
timeout=_provider_timeout_seconds(),
max_retries=0,
)
response = client.messages.create(
model=config.model_name,
max_tokens=200,
temperature=0,
system=(
"You are a merchant chargeback analyst. Pick the single best next action. "
"Return only JSON with candidate_index and rationale."
),
messages=[{"role": "user", "content": payload}],
)
text = "".join(
block.text
for block in response.content
if getattr(block, "type", "") == "text"
)
choice = _safe_json_loads(text)
if choice is None:
return shortlist[0], True, False, "InvalidJSONResponse"
index = min(max(choice.candidate_index, 0), len(shortlist) - 1)
return shortlist[index], True, True, None
except Exception as exc:
return shortlist[0], True, False, exc.__class__.__name__
return shortlist[0], False, False, None
def _provider_pick_with_fallback(
config: ProviderConfig,
observation: dict[str, Any],
candidates: list[CandidateAction],
) -> tuple[CandidateAction, bool, bool, str | None]:
"""Try the primary provider, then walk the fallback chain on failure."""
candidate, attempted, succeeded, error = _provider_pick(
config, observation, candidates
)
if succeeded:
return candidate, attempted, succeeded, error
for fb_provider, fb_model in _FALLBACK_CHAIN:
if fb_provider == config.provider:
continue
fb_config = ProviderConfig(provider=fb_provider, model_name=fb_model)
fb_client = _openai_compatible_client(fb_config)
if fb_client is None:
continue
candidate, fb_attempted, fb_succeeded, fb_error = _provider_pick(
fb_config,
observation,
candidates,
)
if fb_succeeded:
return candidate, True, True, None
return candidate, attempted, False, error or "AllProvidersFailed"
def run_baseline(
provider: str | None = None,
model_name: str | None = None,
) -> BaselineRunResult:
"""Run the baseline across all built-in tasks."""
config = _resolve_provider(provider, model_name)
has_provider_key = any(
[
config.provider == "openai" and bool(os.getenv("OPENAI_API_KEY")),
config.provider == "openrouter" and bool(os.getenv("OPENROUTER_API_KEY")),
config.provider == "groq" and bool(os.getenv("GROQ_API_KEY")),
config.provider == "anthropic" and bool(os.getenv("ANTHROPIC_API_KEY")),
config.provider == "google" and bool(os.getenv("GOOGLE_API_KEY")),
]
)
provider_calls_attempted = 0
provider_calls_succeeded = 0
provider_errors: dict[str, int] = {}
task_results: list[BaselineTaskResult] = []
for task in list_tasks():
env = ChargebackOpsEnvironment()
observation = env.reset(task_id=task.task_id)
while not observation.done:
observation_payload = observation.model_dump()
candidates = candidate_actions(observation_payload)
if not candidates:
break
if len(candidates) == 1:
candidate = candidates[0]
observation = env.step(candidate.action)
continue
obvious_candidate = _obvious_next_action(observation_payload, candidates)
if obvious_candidate is not None:
observation = env.step(obvious_candidate.action)
continue
if has_provider_key:
candidate, attempted, succeeded, error_label = (
_provider_pick_with_fallback(
config,
observation_payload,
candidates,
)
)
provider_calls_attempted += int(attempted)
provider_calls_succeeded += int(succeeded)
if attempted and not succeeded and error_label is not None:
provider_errors[error_label] = (
provider_errors.get(error_label, 0) + 1
)
if _strict_llm_mode() and attempted and not succeeded:
raise RuntimeError(
"STRICT_LLM_MODE is enabled and the provider decision failed, "
"so heuristic fallback is not allowed."
)
else:
candidate = _heuristic_pick(candidates)
observation = env.step(candidate.action)
report = env.state.grader_report or grade_episode(
task,
env._progress_by_case, # type: ignore[attr-defined]
env.state.step_count,
env.state.episode_id or "",
completed=env.state.completed,
)
task_results.append(
BaselineTaskResult(
task_id=task.task_id,
title=task.title,
score=report.normalized_score,
steps_used=env.state.step_count,
final_status=report.summary,
)
)
average_score = round(
sum(task_result.score for task_result in task_results) / len(task_results),
4,
)
if provider_calls_attempted == 0:
mode = "heuristic_fallback"
elif provider_calls_succeeded == 0:
mode = "heuristic_fallback"
elif provider_calls_succeeded < provider_calls_attempted:
mode = f"{config.provider}_with_fallback"
else:
mode = config.provider
return BaselineRunResult(
provider=config.provider,
model_name=config.model_name,
mode=mode,
provider_calls_attempted=provider_calls_attempted,
provider_calls_succeeded=provider_calls_succeeded,
provider_errors=provider_errors,
task_results=task_results,
average_score=average_score,
)
def main() -> None:
"""CLI entry point."""
print(json.dumps(run_baseline().model_dump(), indent=2))
if __name__ == "__main__": # pragma: no cover
main()