| |
| import asyncio |
| import os |
| import textwrap |
| import json |
| from typing import List, Optional |
|
|
| from openai import OpenAI |
|
|
| from client import CustomerSupportEnv |
| from models import CustomerSupportAction |
|
|
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| API_KEY = HF_TOKEN or os.getenv("OPENAI_API_KEY") |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") |
| BENCHMARK = "customer_support" |
| MAX_STEPS = 10 |
| TEMPERATURE = 0.5 |
| MAX_TOKENS = 150 |
| SUCCESS_SCORE_THRESHOLD = 0.5 |
|
|
| SYSTEM_PROMPT = textwrap.dedent( |
| """ |
| You are an AI customer support agent. You must act on the currently active ticket. |
| Your available actions are: |
| 1. assign: requires 'department' (e.g. TechSupport, Billing, Sales, Retention) and optionally 'priority' (Low, Medium, High, Urgent). |
| 2. ask_user: requires 'reply_text' to ask the user for more info. |
| 3. escalate: escalates a critical/churn ticket. |
| |
| You must reply ONLY with a valid JSON object matching the action schema. DO NOT wrap the json in backticks or markdown, just return raw JSON. |
| Example: |
| {"action_type": "assign", "department": "TechSupport", "priority": "High"} |
| {"action_type": "ask_user", "reply_text": "What is your OS?"} |
| {"action_type": "escalate"} |
| """ |
| ).strip() |
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| error_val = error if error else "null" |
| done_val = str(done).lower() |
| print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) |
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) |
|
|
| def build_user_prompt(step: int, obs: dict, history: List[str]) -> str: |
| history_block = "\n".join(history[-3:]) if history else "None" |
| return textwrap.dedent( |
| f""" |
| Step: {step} |
| Active Ticket: |
| Content: {obs.get("ticket_content")} |
| Metadata: {obs.get("ticket_metadata")} |
| |
| Available Departments: {obs.get("available_departments")} |
| Available Priorities: {obs.get("available_priorities")} |
| |
| Previous actions: |
| {history_block} |
| |
| Provide the next action as JSON. |
| """ |
| ).strip() |
|
|
| def get_model_action(client: OpenAI, step: int, obs: dict, history: List[str]) -> tuple: |
| user_prompt = build_user_prompt(step, obs, history) |
| try: |
| completion = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| stream=False, |
| ) |
| text = (completion.choices[0].message.content or "").strip() |
| if text.startswith("```json"): text = text[7:] |
| if text.endswith("```"): text = text[:-3] |
| text = text.strip() |
| data = json.loads(text) |
| return CustomerSupportAction(**data), text |
| except Exception as exc: |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) |
| return CustomerSupportAction(action_type="assign", department="TechSupport", priority="Low"), "{}" |
|
|
| async def run_task(task_name: str): |
| |
| os.environ["TASK_NAME"] = task_name |
| |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
| |
| if LOCAL_IMAGE_NAME: |
| env = await CustomerSupportEnv.from_docker_image(LOCAL_IMAGE_NAME, env_vars={"PORT": "8000", "TASK_NAME": task_name}) |
| else: |
| env = CustomerSupportEnv(base_url="http://localhost:8000") |
| |
| history: List[str] = [] |
| rewards: List[float] = [] |
| steps_taken = 0 |
| success = False |
|
|
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) |
| score = 0.0 |
|
|
| try: |
| |
| result = await env.reset(task_name=task_name) |
| obs = result.observation |
| |
| for step in range(1, MAX_STEPS + 1): |
| if result.done: |
| break |
| |
| |
| obs_dict = { |
| "ticket_content": obs.ticket_content, |
| "ticket_metadata": obs.ticket_metadata, |
| "available_departments": obs.available_departments, |
| "available_priorities": obs.available_priorities, |
| } |
| |
| action_obj, raw_text = get_model_action(client, step, obs_dict, history) |
| |
| result = await env.step(action_obj) |
| obs = result.observation |
| |
| reward = result.reward or 0.0 |
| done = result.done |
| |
| rewards.append(reward) |
| steps_taken = step |
| |
| safe_action_text = raw_text.replace('\n', ' ').replace('\r', '') |
| log_step(step=step, action=safe_action_text, reward=reward, done=done, error=None) |
| |
| history.append(f"Step {step} action: {safe_action_text} -> reward {reward}") |
| |
| if done: |
| break |
| |
| MAX_TOTAL_REWARD = max(float(len(obs.tickets_summary)), 1.0) |
| score = sum(rewards) / MAX_TOTAL_REWARD |
| score = min(max(score, 0.01), 0.99) |
| success = score >= SUCCESS_SCORE_THRESHOLD |
|
|
| except Exception as e: |
| print(f"[DEBUG] Error during run: {e}") |
| score = 0.01 |
| success = False |
| finally: |
| try: |
| await env.close() |
| except: |
| pass |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) |
|
|
| async def main(): |
| tasks = ["task1", "task2", "task3"] |
| from threading import Thread |
| import uvicorn |
| import time |
| from server.app import app |
| |
| server_thread = None |
| if not LOCAL_IMAGE_NAME: |
| print("[DEBUG] Starting local server for testing...") |
| server_thread = Thread(target=uvicorn.run, args=(app,), kwargs={"host":"0.0.0.0", "port":8000, "log_level":"error"}, daemon=True) |
| server_thread.start() |
| time.sleep(2) |
| |
| for t in tasks: |
| |
| |
| |
| |
| |
| await run_task(t) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|