from __future__ import annotations import json import os import sys from typing import Any from openai import OpenAI from support_ticket_env import BENCHMARK_NAME, DEFAULT_SUCCESS_THRESHOLD, SupportTicketEnv, fallback_action, list_task_ids, parse_action API_KEY = os.getenv("HF_TOKEN") API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" ENV_BASE_URL = os.getenv("ENV_BASE_URL") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") TEMPERATURE = 0.0 MAX_TOKENS = 220 SUCCESS_THRESHOLD = DEFAULT_SUCCESS_THRESHOLD SYSTEM_PROMPT = """You are operating a deterministic customer-support environment. Choose exactly one tool action at each step and respond with exactly one JSON object. Valid actions: - {\"action_type\": \"search_kb\", \"query\": \"...\"} - {\"action_type\": \"lookup_account\", \"customer_id\": \"...\"} - {\"action_type\": \"send_reply\", \"message\": \"...\"} - {\"action_type\": \"issue_refund\", \"amount_cents\": 4900, \"reason_code\": \"duplicate_charge\"} - {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"password_reset_guidance\"} - {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"billing_refund_processed\"} - {\"action_type\": \"escalate_ticket\", \"queue\": \"support_lead\", \"priority\": \"P2\", \"summary\": \"...\"} - {\"action_type\": \"escalate_ticket\", \"queue\": \"legal_data_incident\", \"priority\": \"P0\", \"summary\": \"...\"} Do not include markdown, code fences, or explanations.""" def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: error_value = "null" if not error else error.replace("\n", " ") print( f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error_value}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) print( f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True, ) def _strip_code_fences(text: str) -> str: cleaned = text.strip() if cleaned.startswith("```"): lines = cleaned.splitlines() if lines and lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].startswith("```"): lines = lines[:-1] cleaned = "\n".join(lines).strip() return cleaned def _extract_json_object(text: str) -> dict[str, Any]: cleaned = _strip_code_fences(text) start = cleaned.find("{") end = cleaned.rfind("}") if start == -1 or end == -1 or end <= start: raise ValueError("No JSON object found in model response") return json.loads(cleaned[start : end + 1]) def build_user_prompt(observation: dict[str, Any]) -> str: return ( "Choose the next best action for this support ticket. " "Keep it valid and deterministic. Observation JSON:\n" f"{json.dumps(observation, indent=2)}" ) def choose_action(client: OpenAI | None, observation) -> Any: fallback = fallback_action(observation) if client is None: return fallback try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": build_user_prompt(observation.model_dump(mode="json"))}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) content = (completion.choices[0].message.content or "").strip() return parse_action(_extract_json_object(content)) except Exception as exc: # pragma: no cover - depends on external endpoint print(f"[DEBUG] Falling back to scripted policy: {exc}", file=sys.stderr, flush=True) return fallback def build_env(task_id: str) -> SupportTicketEnv: if LOCAL_IMAGE_NAME: return SupportTicketEnv.from_docker_image(image_name=LOCAL_IMAGE_NAME, task_id=task_id) if ENV_BASE_URL: return SupportTicketEnv.from_env(repo_id=BENCHMARK_NAME, base_url=ENV_BASE_URL, task_id=task_id) return SupportTicketEnv(task_id=task_id) def clamp_score(score: float) -> float: return min(max(score, 0.0), 1.0) def run_episode(task_id: str, client: OpenAI | None) -> None: env = build_env(task_id) rewards: list[float] = [] steps_taken = 0 final_score = 0.0 success = False log_start(task=task_id, env=BENCHMARK_NAME, model=MODEL_NAME) try: result = env.reset(task_id) while not result.done: action = choose_action(client, result.observation) result = env.step(action) steps_taken += 1 rewards.append(result.reward) action_str = json.dumps(action.model_dump(mode="json"), separators=(",", ":")) log_step( step=steps_taken, action=action_str, reward=result.reward, done=result.done, error=result.observation.last_action_error, ) final_score = clamp_score(float(result.info.get("score", 0.0))) success = final_score >= SUCCESS_THRESHOLD finally: env.close() log_end(success=success, steps=steps_taken, score=final_score, rewards=rewards) if __name__ == "__main__": client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) if API_KEY else None for task_id in list_task_ids(): run_episode(task_id, client)