pranit / inference.py
RushiMane2003's picture
Upload 41 files
99f938a verified
"""
Autonomous Traffic Control Inference Script
============================================
OpenEnv-compliant inference script for HF Spaces deployment.
MANDATORY ENVIRONMENT VARIABLES:
- API_BASE_URL: LLM API endpoint (default: https://router.huggingface.co/v1)
- MODEL_NAME: Model identifier (default: Qwen/Qwen2.5-72B-Instruct)
- HF_TOKEN: Hugging Face API key
- LOCAL_IMAGE_NAME: Docker image name (if using from_docker_image)
STDOUT FORMAT:
[START] task=<task_name> env=autonomous-traffic-control model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...,rn>
Run:
python inference.py
"""
import asyncio
import os
import sys
import textwrap
import time
import json
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
try:
from openai import OpenAI
except ImportError:
print("[ERROR] openai package required. Install with: pip install openai", file=sys.stderr)
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────────────
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
TASK_NAME = os.getenv("TASK_NAME", "intersection-management")
BENCHMARK = os.getenv("BENCHMARK", "autonomous-traffic-control")
MAX_STEPS = int(os.getenv("MAX_STEPS", "12"))
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "200"))
SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.5"))
# Reward thresholds
_MAX_REWARD_PER_STEP = 1.0 # Each step can earn up to 1.0 reward
MAX_TOTAL_REWARD = MAX_STEPS * _MAX_REWARD_PER_STEP
SYSTEM_PROMPT = textwrap.dedent(
"""
You are a traffic management AI controlling an autonomous intersection.
Your goals:
1. Minimize vehicle wait times
2. Maximize traffic throughput
3. Prevent gridlock
4. Respond to emergency vehicles
Each turn you receive:
- Current queue lengths for each lane
- Emergency vehicle status
- Current signal phase
You must output a JSON decision with:
- "phase": GREEN_NS, GREEN_EW, or ALL_RED
- "duration": time in seconds (1-30)
- "reasoning": brief explanation
Optimize for minimal total queue length and maximum moving vehicles.
"""
).strip()
@dataclass
class TrafficState:
"""Traffic intersection state"""
ns_queue: int = 0
ew_queue: int = 0
current_phase: str = "ALL_RED"
emergency_vehicle: bool = False
step: int = 0
total_wait_time: float = 0.0
@dataclass
class TrafficAction:
"""Traffic control action"""
phase: str
duration: int
reasoning: str
# ─────────────────────────────────────────────────────────────────────
# LOGGING
# ─────────────────────────────────────────────────────────────────────
def log_start(task: str, env: str, model: str) -> None:
"""Log episode start"""
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(
step: int,
action: str,
reward: float,
done: bool,
error: Optional[str],
) -> None:
"""Log step execution"""
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
"""Log episode end"""
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}",
flush=True,
)
def log_debug(msg: str) -> None:
"""Log debug message to stderr"""
print(f"[DEBUG] {msg}", file=sys.stderr, flush=True)
# ─────────────────────────────────────────────────────────────────────
# TRAFFIC SIMULATOR
# ─────────────────────────────────────────────────────────────────────
class SimpleTrafficAssistant:
"""Simple traffic management environment without Docker dependency"""
def __init__(self):
self.state = TrafficState()
self.step_count = 0
self.closed = False
async def reset(self) -> "TrafficState":
"""Reset environment"""
self.state = TrafficState(
ns_queue=15,
ew_queue=10,
current_phase="ALL_RED",
emergency_vehicle=False,
step=0,
total_wait_time=0.0,
)
self.step_count = 0
return self.state
async def step(self, action: TrafficAction) -> tuple[TrafficState, float, bool]:
"""Execute action and return (state, reward, done)"""
self.step_count += 1
done = self.step_count >= MAX_STEPS
# Simulate queue changes based on action
phase_duration = min(max(action.duration, 1), 30)
# Calculate vehicles that pass through
vehicles_per_second = 2 # vehicles per second per lane
vehicles_passing_ns = vehicles_per_second * phase_duration if "NS" in action.phase else 0
vehicles_passing_ew = vehicles_per_second * phase_duration if "EW" in action.phase else 0
# Update queues
if "NS" in action.phase:
self.state.ns_queue = max(0, self.state.ns_queue - vehicles_passing_ns + (1 if self.step_count % 3 == 0 else 0))
else:
self.state.ns_queue = min(50, self.state.ns_queue + (2 if self.step_count % 2 == 0 else 0))
if "EW" in action.phase:
self.state.ew_queue = max(0, self.state.ew_queue - vehicles_passing_ew + (1 if self.step_count % 3 == 0 else 0))
else:
self.state.ew_queue = min(50, self.state.ew_queue + (2 if self.step_count % 2 == 0 else 0))
# Add some randomness
import random
if random.random() < 0.1:
self.state.ew_queue += random.randint(0, 5)
if random.random() < 0.1:
self.state.ns_queue += random.randint(0, 5)
# Calculate reward
# Lower queue total = higher reward
total_queue = self.state.ns_queue + self.state.ew_queue
max_queue = 100
queue_reward = max(0, 1.0 - (total_queue / max_queue))
# Bonus for handling emergency vehicles
emergency_bonus = 0.2 if self.state.emergency_vehicle and "NS" in action.phase else 0
reward = queue_reward + emergency_bonus
reward = min(max(reward, 0.0), 1.0)
self.state.current_phase = action.phase
self.state.step = self.step_count
self.state.total_wait_time += total_queue * phase_duration
return self.state, reward, done
async def close(self) -> None:
"""Close environment"""
self.closed = True
# ─────────────────────────────────────────────────────────────────────
# LLM INTERFACE
# ─────────────────────────────────────────────────────────────────────
def build_user_prompt(state: TrafficState, history: List[str]) -> str:
"""Build LLM prompt from traffic state"""
history_block = "\n".join(history[-3:]) if history else "None"
return textwrap.dedent(
f"""
CURRENT TRAFFIC STATE (Step {state.step}):
β”œβ”€ NS Queue: {state.ns_queue} vehicles
β”œβ”€ EW Queue: {state.ew_queue} vehicles
β”œβ”€ Current Phase: {state.current_phase}
β”œβ”€ Emergency Vehicle: {"YES" if state.emergency_vehicle else "NO"}
└─ Total Wait Time: {state.total_wait_time:.1f}s
RECENT DECISIONS:
{history_block}
DECISION REQUIRED:
Output valid JSON with "phase", "duration", and "reasoning".
Phase must be one of: GREEN_NS, GREEN_EW, ALL_RED
Duration must be 1-30 seconds.
"""
).strip()
def parse_model_response(response_text: str) -> Optional[TrafficAction]:
"""Parse JSON response from model"""
try:
# Try to extract JSON from response
import json
# Clean the response
response_text = response_text.strip()
# Try direct JSON parsing
if response_text.startswith("{"):
data = json.loads(response_text)
else:
# Try to find JSON in the text
start = response_text.find("{")
end = response_text.rfind("}") + 1
if start >= 0 and end > start:
data = json.loads(response_text[start:end])
else:
return None
phase = data.get("phase", "ALL_RED")
if phase not in ["GREEN_NS", "GREEN_EW", "ALL_RED"]:
phase = "ALL_RED"
duration = int(data.get("duration", 10))
duration = min(max(duration, 1), 30)
reasoning = data.get("reasoning", "Automatic decision")
return TrafficAction(
phase=phase,
duration=duration,
reasoning=reasoning,
)
except Exception as e:
log_debug(f"Failed to parse response: {e}")
return None
def get_model_decision(
client: OpenAI,
state: TrafficState,
history: List[str],
) -> TrafficAction:
"""Get decision from LLM"""
user_prompt = build_user_prompt(state, history)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
action = parse_model_response(text)
if action:
return action
else:
log_debug(f"Failed to parse: {text[:100]}")
return TrafficAction(phase="ALL_RED", duration=5, reasoning="Parse failed")
except Exception as exc:
log_debug(f"Model request failed: {exc}")
return TrafficAction(phase="ALL_RED", duration=5, reasoning="API error")
# ─────────────────────────────────────────────────────────────────────
# MAIN
# ─────────────────────────────────────────────────────────────────────
async def main() -> None:
"""Main inference loop"""
# Validate API key
if not API_KEY:
print("[ERROR] HF_TOKEN or API_KEY environment variable not set", file=sys.stderr)
sys.exit(1)
try:
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
except Exception as e:
print(f"[ERROR] Failed to initialize OpenAI client: {e}", file=sys.stderr)
sys.exit(1)
# Initialize environment
env = SimpleTrafficAssistant()
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME)
try:
# Reset environment
state = await env.reset()
for step in range(1, MAX_STEPS + 1):
# Get LLM decision
action = get_model_decision(client, state, history)
# Execute action
state, reward, done = await env.step(action)
rewards.append(reward)
steps_taken = step
# Format action for logging
action_str = f"{action.phase}(duration={action.duration}s)"
error_val = error_msg
log_step(
step=step,
action=action_str,
reward=reward,
done=done,
error=error_val,
)
# Update history
history.append(
f"Step {step}: {action.phase} for {action.duration}s β†’ "
f"Reward {reward:.2f}, NS={state.ns_queue}, EW={state.ew_queue}"
)
if done:
break
# Calculate final score
if rewards:
score = sum(rewards) / MAX_TOTAL_REWARD
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as e:
error_msg = str(e)
log_debug(f"Exception during inference: {e}")
import traceback
traceback.print_exc()
finally:
try:
await env.close()
except Exception as e:
log_debug(f"Error closing environment: {e}")
log_end(
success=success,
steps=steps_taken,
score=score,
rewards=rewards,
)
if __name__ == "__main__":
asyncio.run(main())