Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Inference Script for Customer Support Environment | |
| =================================== | |
| MANDATORY | |
| - Before submitting, ensure the following variables are defined in your environment configuration: | |
| API_BASE_URL The API endpoint for the LLM. | |
| MODEL_NAME The model identifier to use for inference. | |
| HF_TOKEN Your Hugging Face / API key. | |
| - The inference script must be named `inference.py` and placed in the root directory of the project | |
| - Participants must use OpenAI Client for all LLM calls using above variables | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| from typing import Dict, List | |
| from openai import OpenAI | |
| # Import environment components | |
| from server.customer_support_env_environment import CustomerSupportEnvironment | |
| from models import CustomerSupportAction, CustomerSupportObservation | |
| # ─── Required environment variables ─────────────────────────────────────────── | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct") | |
| # ─── Inference configuration ───────────────────────────────────────────────── | |
| EPISODES_PER_TASK = 10 | |
| TEMPERATURE = 0.3 | |
| MAX_TOKENS = 500 | |
| OUTPUT_DIR = "outputs" | |
| def get_openai_client() -> OpenAI: | |
| """Create OpenAI client with required env vars.""" | |
| if not API_KEY: | |
| print("Error: HF_TOKEN or API_KEY environment variable not set.") | |
| print("Set it with: export HF_TOKEN='your-token-here'") | |
| sys.exit(1) | |
| return OpenAI(api_key=API_KEY, base_url=API_BASE_URL) | |
| def build_prompt(observation: CustomerSupportObservation, task_id: str) -> str: | |
| """Build the LLM prompt based on task difficulty and observation.""" | |
| if task_id == "easy": | |
| task_instructions = ( | |
| "Categorize this support ticket into one of: billing, technical, account, shipping, general." | |
| ) | |
| elif task_id == "medium": | |
| task_instructions = ( | |
| "Categorize the ticket, assign a priority (low/medium/high/critical), " | |
| "and route to the appropriate team (tier1/tier2/billing/technical/management)." | |
| ) | |
| else: # hard | |
| task_instructions = ( | |
| "Fully handle this ticket: categorize, prioritize, route to the right team, " | |
| "draft a professional response, and decide whether to escalate." | |
| ) | |
| return f"""You are a customer support AI assistant. {task_instructions} | |
| TICKET INFORMATION: | |
| - ID: {observation.ticket_id} | |
| - Channel: {observation.channel} | |
| - Timestamp: {observation.timestamp} | |
| CUSTOMER MESSAGE: | |
| {observation.customer_message} | |
| CUSTOMER HISTORY: | |
| - Account Age: {observation.account_age_days} days | |
| - Total Tickets: {observation.total_tickets} | |
| - Resolved Tickets: {observation.resolved_tickets} | |
| - Satisfaction Score: {observation.satisfaction_score}/5.0 | |
| - Premium Customer: {"Yes" if observation.is_premium else "No"} | |
| - Lifetime Value: ${observation.lifetime_value:.2f} | |
| Based on this information, provide your response in JSON format with these fields: | |
| {{ | |
| "category": "billing" | "technical" | "account" | "shipping" | "general", | |
| "priority": "low" | "medium" | "high" | "critical", | |
| "assigned_team": "tier1" | "tier2" | "billing" | "technical" | "management", | |
| "response_draft": "Your professional response to the customer (minimum 20 characters)", | |
| "internal_notes": "Brief internal notes for the team", | |
| "escalate": true | false | |
| }} | |
| Respond with ONLY the JSON, no additional text.""" | |
| def parse_llm_response(content: str) -> Dict: | |
| """Parse LLM JSON response, handling markdown code blocks.""" | |
| content = content.strip() | |
| if content.startswith("```"): | |
| content = content.split("```")[1] | |
| if content.startswith("json"): | |
| content = content[4:] | |
| content = content.strip() | |
| return json.loads(content) | |
| def get_action( | |
| client: OpenAI, observation: CustomerSupportObservation, task_id: str | |
| ) -> CustomerSupportAction: | |
| """Get agent action using OpenAI-compatible API.""" | |
| prompt = build_prompt(observation, task_id) | |
| try: | |
| response = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a customer support expert. Always respond with valid JSON only.", | |
| }, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| content = response.choices[0].message.content.strip() | |
| action_dict = parse_llm_response(content) | |
| return CustomerSupportAction( | |
| category=action_dict.get("category", "general"), | |
| priority=action_dict.get("priority", "medium"), | |
| assigned_team=action_dict.get("assigned_team", "tier1"), | |
| response_draft=action_dict.get( | |
| "response_draft", | |
| "Thank you for contacting support. We will review your request.", | |
| ), | |
| internal_notes=action_dict.get("internal_notes"), | |
| escalate=action_dict.get("escalate", False), | |
| ) | |
| except Exception as e: | |
| print(f" [WARN] LLM call failed: {e}") | |
| # Return a reasonable fallback action | |
| return CustomerSupportAction( | |
| category="general", | |
| priority="medium", | |
| assigned_team="tier1", | |
| response_draft="Thank you for contacting support. We will review your request and get back to you shortly.", | |
| escalate=False, | |
| ) | |
| def run_episode( | |
| env: CustomerSupportEnvironment, | |
| client: OpenAI, | |
| task_id: str, | |
| episode_num: int, | |
| ) -> Dict: | |
| """Run a single episode and return results.""" | |
| obs = env.reset() | |
| action = get_action(client, obs, task_id) | |
| obs = env.step(action) | |
| result = { | |
| "episode": episode_num, | |
| "reward": obs.reward, | |
| "grader_score": obs.metadata["grader_score"], | |
| "ground_truth": obs.metadata["ground_truth"], | |
| "agent_action": obs.metadata["agent_action"], | |
| } | |
| return result | |
| def evaluate_task(task_id: str, client: OpenAI, num_episodes: int) -> Dict: | |
| """Evaluate the agent on a specific task difficulty.""" | |
| print(f"\n{'='*60}") | |
| print(f" Task: {task_id.upper()} | Episodes: {num_episodes}") | |
| print(f"{'='*60}") | |
| env = CustomerSupportEnvironment(task_id=task_id, seed=42) | |
| results: List[Dict] = [] | |
| for ep in range(num_episodes): | |
| result = run_episode(env, client, task_id, ep + 1) | |
| results.append(result) | |
| print( | |
| f" Episode {ep + 1}/{num_episodes} " | |
| f"score={result['grader_score']:.3f} reward={result['reward']:.3f}" | |
| ) | |
| scores = [r["grader_score"] for r in results] | |
| rewards = [r["reward"] for r in results] | |
| threshold = env.task_configs[task_id]["success_threshold"] | |
| summary = { | |
| "task_id": task_id, | |
| "num_episodes": num_episodes, | |
| "avg_score": sum(scores) / len(scores), | |
| "avg_reward": sum(rewards) / len(rewards), | |
| "min_score": min(scores), | |
| "max_score": max(scores), | |
| "success_rate": sum(1 for s in scores if s >= threshold) / len(scores), | |
| "success_threshold": threshold, | |
| "episodes": results, | |
| } | |
| print(f"\n Avg Score: {summary['avg_score']:.3f}") | |
| print(f" Success Rate: {summary['success_rate']:.1%} (threshold {threshold})") | |
| return summary | |
| def main(): | |
| """Main entry point — runs inference on all 3 tasks.""" | |
| print("=" * 60) | |
| print(" Customer Support Env — Inference Script") | |
| print(f" API_BASE_URL: {API_BASE_URL}") | |
| print(f" MODEL_NAME: {MODEL_NAME}") | |
| print("=" * 60) | |
| # Create output directory | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Initialize OpenAI client | |
| client = get_openai_client() | |
| # Run all three tasks | |
| all_results = {} | |
| start_time = time.time() | |
| for task_id in ["easy", "medium", "hard"]: | |
| all_results[task_id] = evaluate_task(task_id, client, EPISODES_PER_TASK) | |
| elapsed = time.time() - start_time | |
| # Print summary | |
| print(f"\n{'='*60}") | |
| print(" SUMMARY") | |
| print(f"{'='*60}") | |
| for task_id, result in all_results.items(): | |
| print( | |
| f" {task_id.upper():8s} | Score: {result['avg_score']:.3f} | " | |
| f"Success: {result['success_rate']:.1%}" | |
| ) | |
| print(f" Total time: {elapsed:.1f}s") | |
| print(f"{'='*60}") | |
| # Save results | |
| output_path = os.path.join(OUTPUT_DIR, "inference_results.json") | |
| with open(output_path, "w") as f: | |
| json.dump(all_results, f, indent=2, default=str) | |
| print(f"\nResults saved to: {output_path}") | |
| if __name__ == "__main__": | |
| main() | |