Spaces:
Sleeping
Sleeping
| """ | |
| Autonomous Traffic Control Inference Script | |
| ============================================ | |
| OpenEnv-compliant inference script for HF Spaces deployment. | |
| MANDATORY ENVIRONMENT VARIABLES: | |
| - API_BASE_URL: LLM API endpoint (default: https://router.huggingface.co/v1) | |
| - MODEL_NAME: Model identifier (default: Qwen/Qwen2.5-72B-Instruct) | |
| - HF_TOKEN: Hugging Face API key | |
| - LOCAL_IMAGE_NAME: Docker image name (if using from_docker_image) | |
| STDOUT FORMAT: | |
| [START] task=<task_name> env=autonomous-traffic-control model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...,rn> | |
| Run: | |
| python inference.py | |
| """ | |
| import asyncio | |
| import os | |
| import sys | |
| import textwrap | |
| import time | |
| import json | |
| from typing import List, Optional, Dict, Any | |
| from dataclasses import dataclass | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| print("[ERROR] openai package required. Install with: pip install openai", file=sys.stderr) | |
| sys.exit(1) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIGURATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| TASK_NAME = os.getenv("TASK_NAME", "intersection-management") | |
| BENCHMARK = os.getenv("BENCHMARK", "autonomous-traffic-control") | |
| MAX_STEPS = int(os.getenv("MAX_STEPS", "12")) | |
| TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7")) | |
| MAX_TOKENS = int(os.getenv("MAX_TOKENS", "200")) | |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.5")) | |
| # Reward thresholds | |
| _MAX_REWARD_PER_STEP = 1.0 # Each step can earn up to 1.0 reward | |
| MAX_TOTAL_REWARD = MAX_STEPS * _MAX_REWARD_PER_STEP | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are a traffic management AI controlling an autonomous intersection. | |
| Your goals: | |
| 1. Minimize vehicle wait times | |
| 2. Maximize traffic throughput | |
| 3. Prevent gridlock | |
| 4. Respond to emergency vehicles | |
| Each turn you receive: | |
| - Current queue lengths for each lane | |
| - Emergency vehicle status | |
| - Current signal phase | |
| You must output a JSON decision with: | |
| - "phase": GREEN_NS, GREEN_EW, or ALL_RED | |
| - "duration": time in seconds (1-30) | |
| - "reasoning": brief explanation | |
| Optimize for minimal total queue length and maximum moving vehicles. | |
| """ | |
| ).strip() | |
| class TrafficState: | |
| """Traffic intersection state""" | |
| ns_queue: int = 0 | |
| ew_queue: int = 0 | |
| current_phase: str = "ALL_RED" | |
| emergency_vehicle: bool = False | |
| step: int = 0 | |
| total_wait_time: float = 0.0 | |
| class TrafficAction: | |
| """Traffic control action""" | |
| phase: str | |
| duration: int | |
| reasoning: str | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOGGING | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_start(task: str, env: str, model: str) -> None: | |
| """Log episode start""" | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step( | |
| step: int, | |
| action: str, | |
| reward: float, | |
| done: bool, | |
| error: Optional[str], | |
| ) -> None: | |
| """Log step execution""" | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| """Log episode end""" | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| def log_debug(msg: str) -> None: | |
| """Log debug message to stderr""" | |
| print(f"[DEBUG] {msg}", file=sys.stderr, flush=True) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TRAFFIC SIMULATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SimpleTrafficAssistant: | |
| """Simple traffic management environment without Docker dependency""" | |
| def __init__(self): | |
| self.state = TrafficState() | |
| self.step_count = 0 | |
| self.closed = False | |
| async def reset(self) -> "TrafficState": | |
| """Reset environment""" | |
| self.state = TrafficState( | |
| ns_queue=15, | |
| ew_queue=10, | |
| current_phase="ALL_RED", | |
| emergency_vehicle=False, | |
| step=0, | |
| total_wait_time=0.0, | |
| ) | |
| self.step_count = 0 | |
| return self.state | |
| async def step(self, action: TrafficAction) -> tuple[TrafficState, float, bool]: | |
| """Execute action and return (state, reward, done)""" | |
| self.step_count += 1 | |
| done = self.step_count >= MAX_STEPS | |
| # Simulate queue changes based on action | |
| phase_duration = min(max(action.duration, 1), 30) | |
| # Calculate vehicles that pass through | |
| vehicles_per_second = 2 # vehicles per second per lane | |
| vehicles_passing_ns = vehicles_per_second * phase_duration if "NS" in action.phase else 0 | |
| vehicles_passing_ew = vehicles_per_second * phase_duration if "EW" in action.phase else 0 | |
| # Update queues | |
| if "NS" in action.phase: | |
| self.state.ns_queue = max(0, self.state.ns_queue - vehicles_passing_ns + (1 if self.step_count % 3 == 0 else 0)) | |
| else: | |
| self.state.ns_queue = min(50, self.state.ns_queue + (2 if self.step_count % 2 == 0 else 0)) | |
| if "EW" in action.phase: | |
| self.state.ew_queue = max(0, self.state.ew_queue - vehicles_passing_ew + (1 if self.step_count % 3 == 0 else 0)) | |
| else: | |
| self.state.ew_queue = min(50, self.state.ew_queue + (2 if self.step_count % 2 == 0 else 0)) | |
| # Add some randomness | |
| import random | |
| if random.random() < 0.1: | |
| self.state.ew_queue += random.randint(0, 5) | |
| if random.random() < 0.1: | |
| self.state.ns_queue += random.randint(0, 5) | |
| # Calculate reward | |
| # Lower queue total = higher reward | |
| total_queue = self.state.ns_queue + self.state.ew_queue | |
| max_queue = 100 | |
| queue_reward = max(0, 1.0 - (total_queue / max_queue)) | |
| # Bonus for handling emergency vehicles | |
| emergency_bonus = 0.2 if self.state.emergency_vehicle and "NS" in action.phase else 0 | |
| reward = queue_reward + emergency_bonus | |
| reward = min(max(reward, 0.0), 1.0) | |
| self.state.current_phase = action.phase | |
| self.state.step = self.step_count | |
| self.state.total_wait_time += total_queue * phase_duration | |
| return self.state, reward, done | |
| async def close(self) -> None: | |
| """Close environment""" | |
| self.closed = True | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LLM INTERFACE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_user_prompt(state: TrafficState, history: List[str]) -> str: | |
| """Build LLM prompt from traffic state""" | |
| history_block = "\n".join(history[-3:]) if history else "None" | |
| return textwrap.dedent( | |
| f""" | |
| CURRENT TRAFFIC STATE (Step {state.step}): | |
| ββ NS Queue: {state.ns_queue} vehicles | |
| ββ EW Queue: {state.ew_queue} vehicles | |
| ββ Current Phase: {state.current_phase} | |
| ββ Emergency Vehicle: {"YES" if state.emergency_vehicle else "NO"} | |
| ββ Total Wait Time: {state.total_wait_time:.1f}s | |
| RECENT DECISIONS: | |
| {history_block} | |
| DECISION REQUIRED: | |
| Output valid JSON with "phase", "duration", and "reasoning". | |
| Phase must be one of: GREEN_NS, GREEN_EW, ALL_RED | |
| Duration must be 1-30 seconds. | |
| """ | |
| ).strip() | |
| def parse_model_response(response_text: str) -> Optional[TrafficAction]: | |
| """Parse JSON response from model""" | |
| try: | |
| # Try to extract JSON from response | |
| import json | |
| # Clean the response | |
| response_text = response_text.strip() | |
| # Try direct JSON parsing | |
| if response_text.startswith("{"): | |
| data = json.loads(response_text) | |
| else: | |
| # Try to find JSON in the text | |
| start = response_text.find("{") | |
| end = response_text.rfind("}") + 1 | |
| if start >= 0 and end > start: | |
| data = json.loads(response_text[start:end]) | |
| else: | |
| return None | |
| phase = data.get("phase", "ALL_RED") | |
| if phase not in ["GREEN_NS", "GREEN_EW", "ALL_RED"]: | |
| phase = "ALL_RED" | |
| duration = int(data.get("duration", 10)) | |
| duration = min(max(duration, 1), 30) | |
| reasoning = data.get("reasoning", "Automatic decision") | |
| return TrafficAction( | |
| phase=phase, | |
| duration=duration, | |
| reasoning=reasoning, | |
| ) | |
| except Exception as e: | |
| log_debug(f"Failed to parse response: {e}") | |
| return None | |
| def get_model_decision( | |
| client: OpenAI, | |
| state: TrafficState, | |
| history: List[str], | |
| ) -> TrafficAction: | |
| """Get decision from LLM""" | |
| user_prompt = build_user_prompt(state, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| action = parse_model_response(text) | |
| if action: | |
| return action | |
| else: | |
| log_debug(f"Failed to parse: {text[:100]}") | |
| return TrafficAction(phase="ALL_RED", duration=5, reasoning="Parse failed") | |
| except Exception as exc: | |
| log_debug(f"Model request failed: {exc}") | |
| return TrafficAction(phase="ALL_RED", duration=5, reasoning="API error") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def main() -> None: | |
| """Main inference loop""" | |
| # Validate API key | |
| if not API_KEY: | |
| print("[ERROR] HF_TOKEN or API_KEY environment variable not set", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to initialize OpenAI client: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| # Initialize environment | |
| env = SimpleTrafficAssistant() | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| error_msg = None | |
| log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| # Reset environment | |
| state = await env.reset() | |
| for step in range(1, MAX_STEPS + 1): | |
| # Get LLM decision | |
| action = get_model_decision(client, state, history) | |
| # Execute action | |
| state, reward, done = await env.step(action) | |
| rewards.append(reward) | |
| steps_taken = step | |
| # Format action for logging | |
| action_str = f"{action.phase}(duration={action.duration}s)" | |
| error_val = error_msg | |
| log_step( | |
| step=step, | |
| action=action_str, | |
| reward=reward, | |
| done=done, | |
| error=error_val, | |
| ) | |
| # Update history | |
| history.append( | |
| f"Step {step}: {action.phase} for {action.duration}s β " | |
| f"Reward {reward:.2f}, NS={state.ns_queue}, EW={state.ew_queue}" | |
| ) | |
| if done: | |
| break | |
| # Calculate final score | |
| if rewards: | |
| score = sum(rewards) / MAX_TOTAL_REWARD | |
| score = min(max(score, 0.0), 1.0) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| except Exception as e: | |
| error_msg = str(e) | |
| log_debug(f"Exception during inference: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| log_debug(f"Error closing environment: {e}") | |
| log_end( | |
| success=success, | |
| steps=steps_taken, | |
| score=score, | |
| rewards=rewards, | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |