Spaces:
Running
Running
Final compliance check for MetaXScalar: Remove underscores from task IDs, update inference logging, and ensure score clipping
010c635 | """ | |
| Deterministic per-task graders for the OpenEnv bus routing environment. | |
| Each ``grade_taskX_Y`` function: | |
| 1. Creates the task environment from ``tasks.py``. | |
| 2. Runs the agent over multiple episodes. | |
| 3. Compares against heuristic baselines. | |
| 4. Returns a normalised **score in [0.0, 1.0]**. | |
| Now expanded to include 30 tasks total. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| from typing import Callable, Dict, List, Tuple | |
| import numpy as np | |
| try: | |
| from scipy import stats | |
| SCIPY_AVAILABLE = True | |
| except ImportError: | |
| SCIPY_AVAILABLE = False | |
| from environment import BusRoutingEnv | |
| from tasks import TASKS, TaskConfig | |
| # Explicitly export grader functions for OpenEnv detection | |
| __all__ = [ | |
| "grade_all_tasks", | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Heuristic baselines | |
| # --------------------------------------------------------------------------- | |
| def random_policy(_obs: np.ndarray, num_actions: int = 3) -> int: | |
| return int(np.random.randint(0, num_actions)) | |
| def greedy_baseline_policy(obs: np.ndarray) -> int: | |
| """ | |
| Simple heuristic: | |
| - If current stop queue is large → wait & pick up | |
| - Else if next stop queue >= current → move + pickup | |
| - Else skip | |
| obs = [pos, fuel, onboard, q0, q1, q2, time] | |
| """ | |
| q0, q1 = obs[3], obs[4] | |
| if q0 >= 8: | |
| return 2 # wait | |
| if q1 >= q0: | |
| return 0 # move+pickup | |
| return 1 # move+skip | |
| def highest_queue_first_policy(obs: np.ndarray) -> int: | |
| """ | |
| Stronger heuristic — serve the largest nearby queue: | |
| - If current queue >= both neighbours → wait | |
| - Else → move + pickup | |
| """ | |
| q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5]) | |
| if q0 >= max(q1, q2): | |
| return 2 | |
| return 0 | |
| def or_tools_greedy_policy(obs: np.ndarray) -> int: | |
| """ | |
| OR-Tools-like greedy routing heuristic: | |
| - If current queue > 5: wait (action=2) | |
| - Else: move to stop with highest queue (action=0 or 1) | |
| - Simulates distance + demand based routing | |
| """ | |
| q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5]) | |
| fuel = float(obs[1]) | |
| if q0 > 5: | |
| return 2 | |
| if fuel < 20: | |
| return 1 | |
| if q1 >= q2: | |
| return 0 | |
| return 1 | |
| def mpc_baseline_policy(obs: np.ndarray) -> int: | |
| """ | |
| Model Predictive Control baseline: | |
| - Look ahead with fuel consideration | |
| - If fuel low (<20): move+skip (conserve fuel) | |
| - If fuel high (>50): aggressive wait+pickup | |
| """ | |
| q0, q1, q2 = float(obs[3]), float(obs[4]), float(obs[5]) | |
| fuel = float(obs[1]) | |
| if fuel < 20: | |
| if q0 > 8: | |
| return 2 | |
| return 1 | |
| if fuel > 50: | |
| if q0 >= max(q1, q2): | |
| return 2 | |
| return 0 | |
| if q0 > 6: | |
| return 2 | |
| if q1 > q0: | |
| return 0 | |
| return 1 | |
| # --------------------------------------------------------------------------- | |
| # Evaluation helpers | |
| # --------------------------------------------------------------------------- | |
| def _run_eval( | |
| env: BusRoutingEnv, | |
| policy: Callable[[np.ndarray], int], | |
| episodes: int = 20, | |
| ) -> Dict[str, float]: | |
| rewards: List[float] = [] | |
| waits: List[float] = [] | |
| fuels: List[float] = [] | |
| covers: List[float] = [] | |
| entropies: List[float] = [] | |
| max_stop_fracs: List[float] = [] | |
| picks: List[float] = [] | |
| for _ in range(int(episodes)): | |
| m = env.run_episode(policy_fn=policy) | |
| rewards.append(m["total_reward"]) | |
| waits.append(m["avg_wait_time"]) | |
| fuels.append(m["fuel_used"]) | |
| covers.append(m["stop_coverage"]) | |
| entropies.append(m.get("route_entropy", 0.0)) | |
| max_stop_fracs.append(m.get("max_stop_fraction", 1.0)) | |
| picks.append(m["passengers_picked"]) | |
| waits_safe = [w if np.isfinite(w) else 50.0 for w in waits] | |
| return { | |
| "avg_wait_time": float(np.mean(waits_safe)), | |
| "total_reward": float(np.mean(rewards)), | |
| "fuel_efficiency": float(np.mean(picks) / (np.mean(fuels) + 1e-6)), | |
| "stop_coverage": float(np.mean(covers)), | |
| "route_entropy": float(np.mean(entropies)), | |
| "max_stop_fraction": float(np.mean(max_stop_fracs)), | |
| "avg_passengers_picked": float(np.mean(picks)), | |
| } | |
| def _add_statistical_tests( | |
| env: BusRoutingEnv, | |
| agent_policy: Callable[[np.ndarray], int], | |
| baseline_policy: Callable[[np.ndarray], int], | |
| episodes: int = 20, | |
| ) -> Dict[str, float]: | |
| """Perform statistical significance testing between agent and baseline.""" | |
| if not SCIPY_AVAILABLE: | |
| return { | |
| "t_statistic": 0.0, | |
| "p_value": 1.0, | |
| "mean_improvement": 0.0, | |
| "confidence_interval": (0.0, 0.0), | |
| "statistical_significance": "scipy not available" | |
| } | |
| agent_rewards = [] | |
| baseline_rewards = [] | |
| for _ in range(episodes): | |
| m_agent = env.run_episode(policy_fn=agent_policy) | |
| m_baseline = env.run_episode(policy_fn=baseline_policy) | |
| agent_rewards.append(m_agent["total_reward"]) | |
| baseline_rewards.append(m_baseline["total_reward"]) | |
| t_statistic, p_value = stats.ttest_ind(agent_rewards, baseline_rewards) | |
| mean_agent = np.mean(agent_rewards) | |
| mean_baseline = np.mean(baseline_rewards) | |
| mean_improvement = ((mean_agent - mean_baseline) / abs(mean_baseline + 1e-6)) * 100 | |
| diff = np.array(agent_rewards) - np.array(baseline_rewards) | |
| ci_low, ci_high = stats.t.interval(0.95, len(diff)-1, loc=np.mean(diff), scale=stats.sem(diff)) | |
| significance = "p < 0.05 [PASS]" if p_value < 0.05 else "p >= 0.05" | |
| return { | |
| "t_statistic": float(t_statistic), | |
| "p_value": float(p_value), | |
| "mean_improvement": float(mean_improvement), | |
| "confidence_interval": (float(ci_low), float(ci_high)), | |
| "statistical_significance": significance | |
| } | |
| def _score_0_1(metrics: Dict[str, float], baseline: Dict[str, float]) -> float: | |
| """ | |
| Weighted score normalised to **[0.0, 1.0]**. | |
| """ | |
| wait_impr = (baseline["avg_wait_time"] - metrics["avg_wait_time"]) / max( | |
| baseline["avg_wait_time"], 1e-6 | |
| ) | |
| rew_impr = (metrics["total_reward"] - baseline["total_reward"]) / ( | |
| abs(baseline["total_reward"]) + 1e-6 | |
| ) | |
| wait_score = float(np.clip(wait_impr, -1.0, 1.0) * 0.5 + 0.5) | |
| rew_score = float(np.clip(rew_impr, -1.0, 1.0) * 0.5 + 0.5) | |
| fuel_score = float(np.clip(metrics["fuel_efficiency"] / 0.25, 0.0, 1.0)) | |
| cov_score = float(np.clip(metrics["stop_coverage"], 0.0, 1.0)) | |
| bal_score = float(np.clip(metrics.get("route_entropy", 0.0), 0.0, 1.0)) | |
| anti_camp_score = float( | |
| np.clip(1.0 - metrics.get("max_stop_fraction", 1.0), 0.0, 1.0) | |
| ) | |
| final = ( | |
| 0.30 * wait_score | |
| + 0.35 * rew_score | |
| + 0.05 * fuel_score | |
| + 0.15 * cov_score | |
| + 0.10 * bal_score | |
| + 0.05 * anti_camp_score | |
| ) | |
| if not np.isfinite(final): | |
| return 0.15 | |
| # Strict (0, 1) range: ensures score is never 0.0 and never 1.0 | |
| return float(np.clip(final, 0.05, 0.95)) | |
| # --------------------------------------------------------------------------- | |
| # Per-task grading (deterministic) — core OpenEnv requirement | |
| # --------------------------------------------------------------------------- | |
| def _grade_task( | |
| task_cfg: TaskConfig, | |
| agent_policy: Callable[[np.ndarray], int], | |
| episodes: int = 20, | |
| ) -> Dict: | |
| """Generic grader.""" | |
| env = task_cfg.build_env() | |
| rl_metrics = _run_eval(env, policy=agent_policy, episodes=episodes) | |
| baseline_metrics = _run_eval( | |
| env, policy=greedy_baseline_policy, episodes=episodes | |
| ) | |
| stats_results = _add_statistical_tests( | |
| env, agent_policy, greedy_baseline_policy, episodes=episodes | |
| ) | |
| score = _score_0_1(rl_metrics, baseline_metrics) | |
| return { | |
| "task": task_cfg.name, | |
| "difficulty": task_cfg.difficulty, | |
| "score": score, | |
| "rl_agent": rl_metrics, | |
| "baseline_greedy": baseline_metrics, | |
| "statistical_tests": stats_results, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Per-task grading functions | |
| # --------------------------------------------------------------------------- | |
| # Dynamically generate 30 grading functions (task1 to task30) | |
| for i in range(1, 31): | |
| task_name = f"task{i}" | |
| def make_grader(t_name): | |
| def grader(agent_policy: Callable[[np.ndarray], int], episodes: int = 20) -> float: | |
| return float(_grade_task(TASKS[t_name], agent_policy, episodes)["score"]) | |
| return grader | |
| func_name = f"grade_{task_name}" | |
| globals()[func_name] = make_grader(task_name) | |
| __all__.append(func_name) | |
| def grade_all_tasks( | |
| agent_policy: Callable[[np.ndarray], int], | |
| episodes: int = 20, | |
| ) -> Dict: | |
| """Run explicit task graders and return combined results for all 30 tasks.""" | |
| results = {} | |
| total_score = 0.0 | |
| for i in range(1, 31): | |
| task_id = f"task{i}" | |
| report = _grade_task(TASKS[task_id], agent_policy, episodes) | |
| results[task_id] = report | |
| total_score += report["score"] | |
| aggregate = total_score / 30.0 | |
| return { | |
| **results, | |
| "aggregate_score": float(np.clip(aggregate, 0.05, 0.95)), | |
| "task_ids": list(results.keys()), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # CLI entry-point | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| from agent import DQNAgent | |
| p = argparse.ArgumentParser(description="OpenEnv Bus Routing — Programmatic Grader") | |
| p.add_argument("--model-path", type=str, default="models/dqn_bus_v6_best.pt") | |
| p.add_argument("--episodes", type=int, default=int(os.getenv("MAX_EVAL_EPISODES", 5))) | |
| args = p.parse_args() | |
| agent = DQNAgent.load(args.model_path) | |
| policy = lambda obs: agent.act(obs, greedy=True) # noqa: E731 | |
| report = grade_all_tasks(policy, episodes=args.episodes) | |
| print("=" * 60) | |
| print(" OpenEnv Programmatic Grade Report (30 Tasks)") | |
| print("=" * 60) | |
| for task_key in sorted(report.get("task_ids", [])): | |
| tr = report[task_key] | |
| print(f" {tr['task']} ({tr['difficulty']}) - score: {tr['score']:.4f}") | |
| print(f"\n{'=' * 60}") | |
| print(f" Aggregate score (0.05 - 0.95): {report['aggregate_score']:.4f}") | |
| print(f"{'=' * 60}") | |
| if __name__ == "__main__": | |
| main() | |