""" Autonomous Traffic Control Inference Script ============================================ OpenEnv-compliant inference script for HF Spaces deployment. MANDATORY ENVIRONMENT VARIABLES: - API_BASE_URL: LLM API endpoint (default: https://router.huggingface.co/v1) - MODEL_NAME: Model identifier (default: Qwen/Qwen2.5-72B-Instruct) - HF_TOKEN: Hugging Face API key - LOCAL_IMAGE_NAME: Docker image name (if using from_docker_image) STDOUT FORMAT: [START] task= env=autonomous-traffic-control model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.000> rewards= Run: python inference.py """ import asyncio import os import sys import textwrap import time import json from typing import List, Optional, Dict, Any from dataclasses import dataclass try: from openai import OpenAI except ImportError: print("[ERROR] openai package required. Install with: pip install openai", file=sys.stderr) sys.exit(1) # ───────────────────────────────────────────────────────────────────── # CONFIGURATION # ───────────────────────────────────────────────────────────────────── API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") TASK_NAME = os.getenv("TASK_NAME", "intersection-management") BENCHMARK = os.getenv("BENCHMARK", "autonomous-traffic-control") MAX_STEPS = int(os.getenv("MAX_STEPS", "12")) TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7")) MAX_TOKENS = int(os.getenv("MAX_TOKENS", "200")) SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.5")) # Reward thresholds _MAX_REWARD_PER_STEP = 1.0 # Each step can earn up to 1.0 reward MAX_TOTAL_REWARD = MAX_STEPS * _MAX_REWARD_PER_STEP SYSTEM_PROMPT = textwrap.dedent( """ You are a traffic management AI controlling an autonomous intersection. Your goals: 1. Minimize vehicle wait times 2. Maximize traffic throughput 3. Prevent gridlock 4. Respond to emergency vehicles Each turn you receive: - Current queue lengths for each lane - Emergency vehicle status - Current signal phase You must output a JSON decision with: - "phase": GREEN_NS, GREEN_EW, or ALL_RED - "duration": time in seconds (1-30) - "reasoning": brief explanation Optimize for minimal total queue length and maximum moving vehicles. """ ).strip() @dataclass class TrafficState: """Traffic intersection state""" ns_queue: int = 0 ew_queue: int = 0 current_phase: str = "ALL_RED" emergency_vehicle: bool = False step: int = 0 total_wait_time: float = 0.0 @dataclass class TrafficAction: """Traffic control action""" phase: str duration: int reasoning: str # ───────────────────────────────────────────────────────────────────── # LOGGING # ───────────────────────────────────────────────────────────────────── def log_start(task: str, env: str, model: str) -> None: """Log episode start""" print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: str, reward: float, done: bool, error: Optional[str], ) -> None: """Log step execution""" error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: """Log episode end""" rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True, ) def log_debug(msg: str) -> None: """Log debug message to stderr""" print(f"[DEBUG] {msg}", file=sys.stderr, flush=True) # ───────────────────────────────────────────────────────────────────── # TRAFFIC SIMULATOR # ───────────────────────────────────────────────────────────────────── class SimpleTrafficAssistant: """Simple traffic management environment without Docker dependency""" def __init__(self): self.state = TrafficState() self.step_count = 0 self.closed = False async def reset(self) -> "TrafficState": """Reset environment""" self.state = TrafficState( ns_queue=15, ew_queue=10, current_phase="ALL_RED", emergency_vehicle=False, step=0, total_wait_time=0.0, ) self.step_count = 0 return self.state async def step(self, action: TrafficAction) -> tuple[TrafficState, float, bool]: """Execute action and return (state, reward, done)""" self.step_count += 1 done = self.step_count >= MAX_STEPS # Simulate queue changes based on action phase_duration = min(max(action.duration, 1), 30) # Calculate vehicles that pass through vehicles_per_second = 2 # vehicles per second per lane vehicles_passing_ns = vehicles_per_second * phase_duration if "NS" in action.phase else 0 vehicles_passing_ew = vehicles_per_second * phase_duration if "EW" in action.phase else 0 # Update queues if "NS" in action.phase: self.state.ns_queue = max(0, self.state.ns_queue - vehicles_passing_ns + (1 if self.step_count % 3 == 0 else 0)) else: self.state.ns_queue = min(50, self.state.ns_queue + (2 if self.step_count % 2 == 0 else 0)) if "EW" in action.phase: self.state.ew_queue = max(0, self.state.ew_queue - vehicles_passing_ew + (1 if self.step_count % 3 == 0 else 0)) else: self.state.ew_queue = min(50, self.state.ew_queue + (2 if self.step_count % 2 == 0 else 0)) # Add some randomness import random if random.random() < 0.1: self.state.ew_queue += random.randint(0, 5) if random.random() < 0.1: self.state.ns_queue += random.randint(0, 5) # Calculate reward # Lower queue total = higher reward total_queue = self.state.ns_queue + self.state.ew_queue max_queue = 100 queue_reward = max(0, 1.0 - (total_queue / max_queue)) # Bonus for handling emergency vehicles emergency_bonus = 0.2 if self.state.emergency_vehicle and "NS" in action.phase else 0 reward = queue_reward + emergency_bonus reward = min(max(reward, 0.0), 1.0) self.state.current_phase = action.phase self.state.step = self.step_count self.state.total_wait_time += total_queue * phase_duration return self.state, reward, done async def close(self) -> None: """Close environment""" self.closed = True # ───────────────────────────────────────────────────────────────────── # LLM INTERFACE # ───────────────────────────────────────────────────────────────────── def build_user_prompt(state: TrafficState, history: List[str]) -> str: """Build LLM prompt from traffic state""" history_block = "\n".join(history[-3:]) if history else "None" return textwrap.dedent( f""" CURRENT TRAFFIC STATE (Step {state.step}): ├─ NS Queue: {state.ns_queue} vehicles ├─ EW Queue: {state.ew_queue} vehicles ├─ Current Phase: {state.current_phase} ├─ Emergency Vehicle: {"YES" if state.emergency_vehicle else "NO"} └─ Total Wait Time: {state.total_wait_time:.1f}s RECENT DECISIONS: {history_block} DECISION REQUIRED: Output valid JSON with "phase", "duration", and "reasoning". Phase must be one of: GREEN_NS, GREEN_EW, ALL_RED Duration must be 1-30 seconds. """ ).strip() def parse_model_response(response_text: str) -> Optional[TrafficAction]: """Parse JSON response from model""" try: # Try to extract JSON from response import json # Clean the response response_text = response_text.strip() # Try direct JSON parsing if response_text.startswith("{"): data = json.loads(response_text) else: # Try to find JSON in the text start = response_text.find("{") end = response_text.rfind("}") + 1 if start >= 0 and end > start: data = json.loads(response_text[start:end]) else: return None phase = data.get("phase", "ALL_RED") if phase not in ["GREEN_NS", "GREEN_EW", "ALL_RED"]: phase = "ALL_RED" duration = int(data.get("duration", 10)) duration = min(max(duration, 1), 30) reasoning = data.get("reasoning", "Automatic decision") return TrafficAction( phase=phase, duration=duration, reasoning=reasoning, ) except Exception as e: log_debug(f"Failed to parse response: {e}") return None def get_model_decision( client: OpenAI, state: TrafficState, history: List[str], ) -> TrafficAction: """Get decision from LLM""" user_prompt = build_user_prompt(state, history) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() action = parse_model_response(text) if action: return action else: log_debug(f"Failed to parse: {text[:100]}") return TrafficAction(phase="ALL_RED", duration=5, reasoning="Parse failed") except Exception as exc: log_debug(f"Model request failed: {exc}") return TrafficAction(phase="ALL_RED", duration=5, reasoning="API error") # ───────────────────────────────────────────────────────────────────── # MAIN # ───────────────────────────────────────────────────────────────────── async def main() -> None: """Main inference loop""" # Validate API key if not API_KEY: print("[ERROR] HF_TOKEN or API_KEY environment variable not set", file=sys.stderr) sys.exit(1) try: client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) except Exception as e: print(f"[ERROR] Failed to initialize OpenAI client: {e}", file=sys.stderr) sys.exit(1) # Initialize environment env = SimpleTrafficAssistant() history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False error_msg = None log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) try: # Reset environment state = await env.reset() for step in range(1, MAX_STEPS + 1): # Get LLM decision action = get_model_decision(client, state, history) # Execute action state, reward, done = await env.step(action) rewards.append(reward) steps_taken = step # Format action for logging action_str = f"{action.phase}(duration={action.duration}s)" error_val = error_msg log_step( step=step, action=action_str, reward=reward, done=done, error=error_val, ) # Update history history.append( f"Step {step}: {action.phase} for {action.duration}s → " f"Reward {reward:.2f}, NS={state.ns_queue}, EW={state.ew_queue}" ) if done: break # Calculate final score if rewards: score = sum(rewards) / MAX_TOTAL_REWARD score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: error_msg = str(e) log_debug(f"Exception during inference: {e}") import traceback traceback.print_exc() finally: try: await env.close() except Exception as e: log_debug(f"Error closing environment: {e}") log_end( success=success, steps=steps_taken, score=score, rewards=rewards, ) if __name__ == "__main__": asyncio.run(main())