rl-bus-optimizer / grader.py
voldemort6996's picture
Final compliance check for MetaXScalar: Remove underscores from task IDs, update inference logging, and ensure score clipping
010c635
"""
Deterministic per-task graders for the OpenEnv bus routing environment.
Each ``grade_taskX_Y`` function:
1. Creates the task environment from ``tasks.py``.
2. Runs the agent over multiple episodes.
3. Compares against heuristic baselines.
4. Returns a normalised **score in [0.0, 1.0]**.
Now expanded to include 30 tasks total.
"""
from __future__ import annotations
import argparse
import os
from typing import Callable, Dict, List, Tuple
import numpy as np
try:
from scipy import stats
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
from environment import BusRoutingEnv
from tasks import TASKS, TaskConfig
# Explicitly export grader functions for OpenEnv detection
__all__ = [
"grade_all_tasks",
]
# ---------------------------------------------------------------------------
# Heuristic baselines
# ---------------------------------------------------------------------------
def random_policy(_obs: np.ndarray, num_actions: int = 3) -> int:
return int(np.random.randint(0, num_actions))
def greedy_baseline_policy(obs: np.ndarray) -> int:
"""
Simple heuristic:
- If current stop queue is large → wait & pick up
- Else if next stop queue >= current → move + pickup
- Else skip
obs = [pos, fuel, onboard, q0, q1, q2, time]
"""
q0, q1 = obs[3], obs[4]
if q0 >= 8:
return 2 # wait
if q1 >= q0:
return 0 # move+pickup
return 1 # move+skip
def highest_queue_first_policy(obs: np.ndarray) -> int:
"""
Stronger heuristic — serve the largest nearby queue:
- If current queue >= both neighbours → wait
- Else → move + pickup
"""
q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5])
if q0 >= max(q1, q2):
return 2
return 0
def or_tools_greedy_policy(obs: np.ndarray) -> int:
"""
OR-Tools-like greedy routing heuristic:
- If current queue > 5: wait (action=2)
- Else: move to stop with highest queue (action=0 or 1)
- Simulates distance + demand based routing
"""
q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5])
fuel = float(obs[1])
if q0 > 5:
return 2
if fuel < 20:
return 1
if q1 >= q2:
return 0
return 1
def mpc_baseline_policy(obs: np.ndarray) -> int:
"""
Model Predictive Control baseline:
- Look ahead with fuel consideration
- If fuel low (<20): move+skip (conserve fuel)
- If fuel high (>50): aggressive wait+pickup
"""
q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5])
fuel = float(obs[1])
if fuel < 20:
if q0 > 8:
return 2
return 1
if fuel > 50:
if q0 >= max(q1, q2):
return 2
return 0
if q0 > 6:
return 2
if q1 > q0:
return 0
return 1
# ---------------------------------------------------------------------------
# Evaluation helpers
# ---------------------------------------------------------------------------
def _run_eval(
env: BusRoutingEnv,
policy: Callable[[np.ndarray], int],
episodes: int = 20,
) -> Dict[str, float]:
rewards: List[float] = []
waits: List[float] = []
fuels: List[float] = []
covers: List[float] = []
entropies: List[float] = []
max_stop_fracs: List[float] = []
picks: List[float] = []
for _ in range(int(episodes)):
m = env.run_episode(policy_fn=policy)
rewards.append(m["total_reward"])
waits.append(m["avg_wait_time"])
fuels.append(m["fuel_used"])
covers.append(m["stop_coverage"])
entropies.append(m.get("route_entropy", 0.0))
max_stop_fracs.append(m.get("max_stop_fraction", 1.0))
picks.append(m["passengers_picked"])
waits_safe = [w if np.isfinite(w) else 50.0 for w in waits]
return {
"avg_wait_time": float(np.mean(waits_safe)),
"total_reward": float(np.mean(rewards)),
"fuel_efficiency": float(np.mean(picks) / (np.mean(fuels) + 1e-6)),
"stop_coverage": float(np.mean(covers)),
"route_entropy": float(np.mean(entropies)),
"max_stop_fraction": float(np.mean(max_stop_fracs)),
"avg_passengers_picked": float(np.mean(picks)),
}
def _add_statistical_tests(
env: BusRoutingEnv,
agent_policy: Callable[[np.ndarray], int],
baseline_policy: Callable[[np.ndarray], int],
episodes: int = 20,
) -> Dict[str, float]:
"""Perform statistical significance testing between agent and baseline."""
if not SCIPY_AVAILABLE:
return {
"t_statistic": 0.0,
"p_value": 1.0,
"mean_improvement": 0.0,
"confidence_interval": (0.0, 0.0),
"statistical_significance": "scipy not available"
}
agent_rewards = []
baseline_rewards = []
for _ in range(episodes):
m_agent = env.run_episode(policy_fn=agent_policy)
m_baseline = env.run_episode(policy_fn=baseline_policy)
agent_rewards.append(m_agent["total_reward"])
baseline_rewards.append(m_baseline["total_reward"])
t_statistic, p_value = stats.ttest_ind(agent_rewards, baseline_rewards)
mean_agent = np.mean(agent_rewards)
mean_baseline = np.mean(baseline_rewards)
mean_improvement = ((mean_agent - mean_baseline) / abs(mean_baseline + 1e-6)) * 100
diff = np.array(agent_rewards) - np.array(baseline_rewards)
ci_low, ci_high = stats.t.interval(0.95, len(diff)-1, loc=np.mean(diff), scale=stats.sem(diff))
significance = "p < 0.05 [PASS]" if p_value < 0.05 else "p >= 0.05"
return {
"t_statistic": float(t_statistic),
"p_value": float(p_value),
"mean_improvement": float(mean_improvement),
"confidence_interval": (float(ci_low), float(ci_high)),
"statistical_significance": significance
}
def _score_0_1(metrics: Dict[str, float], baseline: Dict[str, float]) -> float:
"""
Weighted score normalised to **[0.0, 1.0]**.
"""
wait_impr = (baseline["avg_wait_time"] - metrics["avg_wait_time"]) / max(
baseline["avg_wait_time"], 1e-6
)
rew_impr = (metrics["total_reward"] - baseline["total_reward"]) / (
abs(baseline["total_reward"]) + 1e-6
)
wait_score = float(np.clip(wait_impr, -1.0, 1.0) * 0.5 + 0.5)
rew_score = float(np.clip(rew_impr, -1.0, 1.0) * 0.5 + 0.5)
fuel_score = float(np.clip(metrics["fuel_efficiency"] / 0.25, 0.0, 1.0))
cov_score = float(np.clip(metrics["stop_coverage"], 0.0, 1.0))
bal_score = float(np.clip(metrics.get("route_entropy", 0.0), 0.0, 1.0))
anti_camp_score = float(
np.clip(1.0 - metrics.get("max_stop_fraction", 1.0), 0.0, 1.0)
)
final = (
0.30 * wait_score
+ 0.35 * rew_score
+ 0.05 * fuel_score
+ 0.15 * cov_score
+ 0.10 * bal_score
+ 0.05 * anti_camp_score
)
if not np.isfinite(final):
return 0.15
# Strict (0, 1) range: ensures score is never 0.0 and never 1.0
return float(np.clip(final, 0.05, 0.95))
# ---------------------------------------------------------------------------
# Per-task grading (deterministic) — core OpenEnv requirement
# ---------------------------------------------------------------------------
def _grade_task(
task_cfg: TaskConfig,
agent_policy: Callable[[np.ndarray], int],
episodes: int = 20,
) -> Dict:
"""Generic grader."""
env = task_cfg.build_env()
rl_metrics = _run_eval(env, policy=agent_policy, episodes=episodes)
baseline_metrics = _run_eval(
env, policy=greedy_baseline_policy, episodes=episodes
)
stats_results = _add_statistical_tests(
env, agent_policy, greedy_baseline_policy, episodes=episodes
)
score = _score_0_1(rl_metrics, baseline_metrics)
return {
"task": task_cfg.name,
"difficulty": task_cfg.difficulty,
"score": score,
"rl_agent": rl_metrics,
"baseline_greedy": baseline_metrics,
"statistical_tests": stats_results,
}
# ---------------------------------------------------------------------------
# Per-task grading functions
# ---------------------------------------------------------------------------
# Dynamically generate 30 grading functions (task1 to task30)
for i in range(1, 31):
task_name = f"task{i}"
def make_grader(t_name):
def grader(agent_policy: Callable[[np.ndarray], int], episodes: int = 20) -> float:
return float(_grade_task(TASKS[t_name], agent_policy, episodes)["score"])
return grader
func_name = f"grade_{task_name}"
globals()[func_name] = make_grader(task_name)
__all__.append(func_name)
def grade_all_tasks(
agent_policy: Callable[[np.ndarray], int],
episodes: int = 20,
) -> Dict:
"""Run explicit task graders and return combined results for all 30 tasks."""
results = {}
total_score = 0.0
for i in range(1, 31):
task_id = f"task{i}"
report = _grade_task(TASKS[task_id], agent_policy, episodes)
results[task_id] = report
total_score += report["score"]
aggregate = total_score / 30.0
return {
**results,
"aggregate_score": float(np.clip(aggregate, 0.05, 0.95)),
"task_ids": list(results.keys()),
}
# ---------------------------------------------------------------------------
# CLI entry-point
# ---------------------------------------------------------------------------
def main() -> None:
from agent import DQNAgent
p = argparse.ArgumentParser(description="OpenEnv Bus Routing — Programmatic Grader")
p.add_argument("--model-path", type=str, default="models/dqn_bus_v6_best.pt")
p.add_argument("--episodes", type=int, default=int(os.getenv("MAX_EVAL_EPISODES", 5)))
args = p.parse_args()
agent = DQNAgent.load(args.model_path)
policy = lambda obs: agent.act(obs, greedy=True) # noqa: E731
report = grade_all_tasks(policy, episodes=args.episodes)
print("=" * 60)
print(" OpenEnv Programmatic Grade Report (30 Tasks)")
print("=" * 60)
for task_key in sorted(report.get("task_ids", [])):
tr = report[task_key]
print(f" {tr['task']} ({tr['difficulty']}) - score: {tr['score']:.4f}")
print(f"\n{'=' * 60}")
print(f" Aggregate score (0.05 - 0.95): {report['aggregate_score']:.4f}")
print(f"{'=' * 60}")
if __name__ == "__main__":
main()