Spaces:
Sleeping
Sleeping
| """PolyglotOptimaEnvironment — MCPEnvironment subclass with explicit Gym API. | |
| Implements: | |
| - reset(seed=None) -> Observation # samples a Python function + hardware profile | |
| - step(action) -> StepResult # routes tool calls, advances rounds, computes reward | |
| - state() -> State # episode_id, step_count, round_number | |
| - close() # releases compiler subprocesses, fuzzer pool | |
| Round structure per episode: | |
| round 1: agent has up to N tool calls, then submits via submit_optimization → R1 reward | |
| round 2: same, with R1 result available in observation → R2 reward | |
| round 3: same, FINAL strict gate (≥95% fuzz pass) → R3 reward | |
| episode_reward = 0.3 * R1_reward + 0.7 * R3_reward (R2 is informational) | |
| The four difficulty axes are frozen at reset() time for each episode but the | |
| adaptive_curriculum module updates them across batches based on success rates. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| import uuid | |
| import os | |
| from dataclasses import dataclass | |
| from typing import Any | |
| # OpenEnv imports — actual class names per the framework docs. | |
| # We accept that some specific imports may need to be adjusted at integration time; | |
| # all are documented as confirmed in §12 of the plan. | |
| try: | |
| from openenv.core import MCPEnvironment, StepResult # type: ignore | |
| from openenv.core.exceptions import OpenEnvError # type: ignore | |
| except ImportError: | |
| # Allow stubs for local development before openenv is installed | |
| class MCPEnvironment: # type: ignore | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| async def reset_async(self, seed=None): raise NotImplementedError | |
| async def step_async(self, action): raise NotImplementedError | |
| class StepResult: # type: ignore | |
| observation: Any | |
| reward: float | |
| done: bool | |
| info: dict[str, Any] | None = None | |
| class OpenEnvError(Exception): # type: ignore | |
| pass | |
| from models import ( | |
| OptimizationAction, | |
| OptimizationObservation, | |
| OptimizationState, | |
| ) | |
| # Reserved names that MUST NOT be used as MCP tool names per OpenEnv spec | |
| _RESERVED_TOOL_NAMES = {"reset", "step", "state", "close"} | |
| class PolyglotOptimaEnvironment(MCPEnvironment): | |
| """The hardware-aware Python→C++ optimization environment. | |
| Public API: | |
| env.reset(seed=...) -> OptimizationObservation | |
| env.step(action: OptimizationAction) -> StepResult | |
| env.state() -> OptimizationState | |
| env.close() | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__( | |
| self, | |
| max_rounds: int = 3, | |
| max_calls_per_round: int = 5, | |
| adaptive_axes: dict[str, int] | None = None, | |
| enable_adaptive_curriculum: bool = True, | |
| curriculum_batch_size: int = 8, | |
| ): | |
| super().__init__() | |
| self.max_rounds = max_rounds | |
| self.max_calls_per_round = max_calls_per_round | |
| self.enable_adaptive_curriculum = enable_adaptive_curriculum | |
| self.curriculum_batch_size = max(1, int(curriculum_batch_size)) | |
| # Default axes — overridden by adaptive_curriculum across batches | |
| self._global_axes = adaptive_axes or { | |
| "function_tier": 0, | |
| "hardware_class": 0, | |
| "fuzzer_strictness": 0, | |
| "portability_required": 0, | |
| } | |
| self._sessions: dict[str, OptimizationState] = {} | |
| self._active_episode_id: str | None = None | |
| # Lazy imports — modules built in subsequent hours | |
| self._tool_registry: dict[str, Any] = {} | |
| self._dataset_loader = None | |
| self._hardware_profiles = None | |
| self._reward_dag = None | |
| self._curriculum = None | |
| self._episode_success_buffer: list[float] = [] | |
| # -------------------- Gym-style explicit API -------------------- | |
| def reset(self, seed: int | None = None) -> OptimizationObservation: | |
| """Initialize a new episode. | |
| Samples (Python function, hardware profile, difficulty axes) deterministically | |
| from `seed` if provided. Returns the initial Observation. | |
| """ | |
| rng = random.Random(seed) | |
| episode_id = str(uuid.uuid4()) | |
| # Lazy init of subsystems (built in later hours; placeholders for now) | |
| self._ensure_subsystems_loaded() | |
| # Sample the problem instance | |
| problem = self._sample_problem(rng) | |
| state = OptimizationState( | |
| episode_id=episode_id, | |
| step_count=0, | |
| round_number=1, | |
| is_terminal=False, | |
| python_code=problem["python_code"], | |
| function_signature_cpp=problem["cpp_signature"], | |
| hardware_profile=problem["hardware_profile"], | |
| bottleneck_ground_truth=problem["bottleneck_labels"], | |
| bottleneck_distractors=problem["bottleneck_distractors"], | |
| rtol_override=problem.get("rtol_override"), | |
| difficulty_axes=dict(self._global_axes), | |
| is_trap=problem.get("is_trap", False), | |
| trap_id=problem.get("trap_id"), | |
| ) | |
| self._sessions[episode_id] = state | |
| self._active_episode_id = episode_id | |
| return OptimizationObservation( | |
| done=False, | |
| reward=0.0, | |
| tool_result={"event": "episode_start", "episode_id": episode_id}, | |
| python_code=state.python_code, | |
| hardware_profile=state.hardware_profile, | |
| round_number=1, | |
| rounds_remaining=self.max_rounds - 1, | |
| best_speedup_so_far=0.0, | |
| metadata={ | |
| "episode_id": episode_id, | |
| "difficulty_axes": state.difficulty_axes, | |
| # NOTE: bottleneck_ground_truth is NOT exposed to the agent — | |
| # only used by the server when scoring DiagnosisRubric | |
| }, | |
| ) | |
| def step(self, action: OptimizationAction) -> StepResult: | |
| """Execute one tool call or final submission. | |
| The action.tool_name routes to a registered MCP tool. If the tool is | |
| `submit_optimization`, the current round closes — reward is computed, | |
| round advances, and on round 3 the episode terminates. | |
| """ | |
| if not self._sessions: | |
| raise OpenEnvError("No active episode. Call reset() first.") | |
| requested_episode_id = None | |
| if isinstance(action.tool_args, dict): | |
| maybe_id = action.tool_args.get("episode_id") | |
| if isinstance(maybe_id, str) and maybe_id in self._sessions: | |
| requested_episode_id = maybe_id | |
| if requested_episode_id is not None: | |
| self._active_episode_id = requested_episode_id | |
| state = self._sessions[requested_episode_id] | |
| elif self._active_episode_id and self._active_episode_id in self._sessions: | |
| state = self._sessions[self._active_episode_id] | |
| else: | |
| # Fall back to the most recently created episode. | |
| latest_episode_id = next(reversed(self._sessions)) | |
| self._active_episode_id = latest_episode_id | |
| state = self._sessions[latest_episode_id] | |
| if state.is_terminal: | |
| raise OpenEnvError("Episode is already terminal. Call reset() to start a new one.") | |
| forced_submit = False | |
| effective_tool_name = action.tool_name | |
| effective_tool_args = dict(action.tool_args or {}) | |
| if ( | |
| action.tool_name != "submit_optimization" | |
| and len(state.current_round_tool_calls) >= self.max_calls_per_round | |
| ): | |
| forced_submit = True | |
| effective_tool_name = "submit_optimization" | |
| effective_tool_args = { | |
| "cpp_code": effective_tool_args.get("cpp_code", "// auto-forced submit: call budget reached"), | |
| "reasoning_trace": action.reasoning_trace or "auto forced submit after max tool calls", | |
| } | |
| if effective_tool_name in _RESERVED_TOOL_NAMES: | |
| raise OpenEnvError( | |
| f"Tool name '{effective_tool_name}' is reserved. " | |
| f"Reserved names: {sorted(_RESERVED_TOOL_NAMES)}" | |
| ) | |
| # Track tool call + reasoning trace for this round | |
| state.step_count += 1 | |
| state.current_round_tool_calls.append(effective_tool_name) | |
| if action.reasoning_trace: | |
| state.current_round_reasoning += action.reasoning_trace + "\n" | |
| # Route to the named tool — full implementation in Hour 4–10 | |
| tool_result = self._dispatch_tool(effective_tool_name, effective_tool_args, state) | |
| # Is this a round-closing submission? | |
| is_submit = effective_tool_name == "submit_optimization" | |
| round_reward = 0.0 | |
| terminal = False | |
| if is_submit: | |
| # Compute reward for this round (Hour 10–16 implementation) | |
| round_reward = self._compute_round_reward(state, tool_result) | |
| if self._dataset_loader is not None and hasattr(self._dataset_loader, "record_submission_outcome"): | |
| self._dataset_loader.record_submission_outcome(state, tool_result) | |
| state.round_results.append({ | |
| "round": state.round_number, | |
| "reward": round_reward, | |
| "tool_calls": list(state.current_round_tool_calls), | |
| "reasoning": state.current_round_reasoning, | |
| "submission": tool_result, | |
| }) | |
| # Reset per-round buffers | |
| state.current_round_tool_calls.clear() | |
| state.current_round_reasoning = "" | |
| # Advance round | |
| state.round_number += 1 | |
| if state.round_number > self.max_rounds: | |
| terminal = True | |
| state.is_terminal = True | |
| observation = OptimizationObservation( | |
| done=terminal, | |
| reward=round_reward, | |
| tool_result=tool_result, | |
| python_code=state.python_code, | |
| hardware_profile=state.hardware_profile, | |
| round_number=min(state.round_number, self.max_rounds), | |
| rounds_remaining=max(0, self.max_rounds - state.round_number), | |
| best_speedup_so_far=state.best_speedup, | |
| last_compile_status=tool_result.get("compile_status", "pending"), | |
| last_correctness_pass_rate=tool_result.get("correctness_pass_rate", tool_result.get("pass_rate", 0.0)), | |
| metadata={ | |
| "episode_id": state.episode_id, | |
| "step_count": state.step_count, | |
| "tool_called": effective_tool_name, | |
| "forced_submit": forced_submit, | |
| "target_isa": state.hardware_profile.get("target", "scalar_only"), | |
| "round_reward_breakdown": tool_result.get("_rubric_breakdown", {}), | |
| "round_readiness_score": tool_result.get("readiness_score"), | |
| "round_correctness_pass_rate": tool_result.get("correctness_pass_rate"), | |
| "round_compile_status": tool_result.get("compile_status"), | |
| }, | |
| ) | |
| # Final episode reward = 0.3*R1 + 0.7*R3 (per plan §10) | |
| if terminal: | |
| r1 = next((r["reward"] for r in state.round_results if r["round"] == 1), 0.0) | |
| r3 = next((r["reward"] for r in state.round_results if r["round"] == 3), 0.0) | |
| observation.reward = 0.3 * r1 + 0.7 * r3 | |
| observation.metadata["episode_reward_breakdown"] = { | |
| "r1": r1, | |
| "r3": r3, | |
| "episode_total": observation.reward, | |
| } | |
| self._record_episode_outcome(state, observation) | |
| return StepResult( | |
| observation=observation, | |
| reward=observation.reward, | |
| done=terminal, | |
| info={"state_snapshot_id": state.episode_id, "step": state.step_count}, | |
| ) | |
| def state(self) -> OptimizationState: | |
| """Return current episode state (Gym-style state introspection).""" | |
| if not self._sessions: | |
| raise OpenEnvError("No active episode.") | |
| if self._active_episode_id and self._active_episode_id in self._sessions: | |
| return self._sessions[self._active_episode_id] | |
| latest_episode_id = next(reversed(self._sessions)) | |
| self._active_episode_id = latest_episode_id | |
| return self._sessions[latest_episode_id] | |
| def close(self) -> None: | |
| """Release all resources (compiler subprocesses, fuzzer pool).""" | |
| self._sessions.clear() | |
| self._active_episode_id = None | |
| # Subsystem-specific cleanup — implemented as tools come online | |
| if self._tool_registry: | |
| for tool in self._tool_registry.values(): | |
| if hasattr(tool, "close"): | |
| tool.close() | |
| # -------------------- Async variants for parallel rollouts ---- | |
| async def reset_async(self, seed: int | None = None) -> OptimizationObservation: | |
| return self.reset(seed) | |
| async def step_async(self, action: OptimizationAction) -> StepResult: | |
| return self.step(action) | |
| async def close_async(self) -> None: | |
| self.close() | |
| # -------------------- Internal scaffolding -------------------- | |
| def _ensure_subsystems_loaded(self) -> None: | |
| """Lazy-load tools/dataset/profiles. Real implementations land at Hour 16.""" | |
| # Tools registry | |
| if not self._tool_registry: | |
| try: | |
| from server.tools import TOOL_REGISTRY | |
| self._tool_registry = TOOL_REGISTRY | |
| except ImportError: | |
| self._tool_registry = {} | |
| # Dataset loader (real, post-Hour 16) | |
| if self._dataset_loader is None: | |
| try: | |
| from server.scenarios import DatasetLoader | |
| prefer_real = os.environ.get("POLYGLOT_OPTIMA_PREFER_REAL_DATASETS", "1") == "1" | |
| self._dataset_loader = DatasetLoader(prefer_real_datasets=prefer_real) | |
| except ImportError: | |
| self._dataset_loader = _StubDatasetLoader() | |
| # Hardware profiles (full 8-profile set, post-Hour 16) | |
| if self._hardware_profiles is None: | |
| try: | |
| from server.scenarios.hardware_profiles import HARDWARE_PROFILES | |
| # Filter held-out for training; eval scripts override this | |
| self._hardware_profiles = [p for p in HARDWARE_PROFILES if not p.get("held_out")] | |
| except ImportError: | |
| self._hardware_profiles = _STUB_PROFILES | |
| if self._curriculum is None and self.enable_adaptive_curriculum: | |
| try: | |
| from server.scenarios import AdaptiveCurriculum | |
| self._curriculum = AdaptiveCurriculum(initial_axes=dict(self._global_axes)) | |
| except ImportError: | |
| self._curriculum = None | |
| def _sample_problem(self, rng: random.Random) -> dict[str, Any]: | |
| """Sample (function, hw_profile, ground_truth_labels) for an episode. | |
| Uses the DatasetLoader to draw a (function, hardware) tuple weighted by | |
| the current global difficulty axes. Falls back to a built-in stub if | |
| the loader is the local dev fallback. | |
| """ | |
| # Real loader path (post-Hour 16) | |
| if isinstance(self._dataset_loader, _StubDatasetLoader): | |
| hw = rng.choice(self._hardware_profiles) | |
| return { | |
| "python_code": _STUB_PYTHON_FUNCTION, | |
| "cpp_signature": 'extern "C" double agent_function(const double* arr, size_t n);', | |
| "hardware_profile": hw, | |
| "bottleneck_labels": ["compute-bound", "vectorizable"], | |
| "bottleneck_distractors": ["memory-bound", "branch-heavy", "io-bound"], | |
| "rtol_override": None, | |
| "is_trap": False, | |
| } | |
| return self._dataset_loader.sample(self._global_axes, rng) | |
| def _record_episode_outcome(self, state: OptimizationState, observation: OptimizationObservation) -> None: | |
| """Update adaptive curriculum after fixed-size batches of completed episodes.""" | |
| if not self.enable_adaptive_curriculum or self._curriculum is None: | |
| return | |
| final_submission = state.round_results[-1]["submission"] if state.round_results else {} | |
| pass_rate = float(final_submission.get("correctness_pass_rate", 0.0)) | |
| compile_ok = final_submission.get("compile_status") == "success" | |
| episode_success = 1.0 if (compile_ok and pass_rate >= 0.8) else 0.0 | |
| self._episode_success_buffer.append(episode_success) | |
| observation.metadata["curriculum_pending_batch_count"] = len(self._episode_success_buffer) | |
| if len(self._episode_success_buffer) < self.curriculum_batch_size: | |
| return | |
| success_rate = sum(self._episode_success_buffer) / len(self._episode_success_buffer) | |
| action = self._curriculum.observe_batch(success_rate) | |
| self._global_axes = dict(self._curriculum.axes) | |
| self._episode_success_buffer.clear() | |
| observation.metadata["curriculum"] = { | |
| "success_rate": success_rate, | |
| "action": action, | |
| "axes": dict(self._global_axes), | |
| "batches_seen": self._curriculum.n_batches_seen, | |
| } | |
| def _dispatch_tool(self, tool_name: str, tool_args: dict[str, Any], state: OptimizationState) -> dict[str, Any]: | |
| """Route a tool call to the registered handler. | |
| Real implementations land in Hour 4–10. Until then, stub responses keep the | |
| Gym API live for smoke tests. | |
| """ | |
| if tool_name not in self._tool_registry: | |
| return { | |
| "stub": True, | |
| "tool": tool_name, | |
| "message": f"Tool '{tool_name}' not yet implemented (Hour 4-10).", | |
| } | |
| return self._tool_registry[tool_name](tool_args, state) | |
| def _compute_round_reward(self, state: OptimizationState, submission: dict[str, Any]) -> float: | |
| """Apply the round-appropriate Sequential(Gate, Gate, WeightedSum) rubric. | |
| Per plan §10: | |
| R1: soft gate (60% correctness), 3 components | |
| R2: medium gate (80%), informational | |
| R3: strict gate (95%), 5 components incl. portability + self-correction | |
| Returns the rubric DAG's score in [0, 1], or 0.0 if any gate fails. | |
| """ | |
| try: | |
| from server.rewards import build_round_reward_dag | |
| except ImportError: | |
| return 0.0 | |
| # Append a synthetic round_result entry NOW so DiagnosisRubric / SelfCorrectionRubric | |
| # can read the just-completed round's tool calls. The caller (step()) appends the | |
| # *real* round_results entry after this returns; we only need a temp lookup. | |
| # Note: we already appended state.round_results in step() BEFORE computing reward, | |
| # so this is fine. Diagnosis and SelfCorrection both read state.round_results. | |
| dag = build_round_reward_dag(state.round_number) | |
| score = dag.score(state, submission) | |
| # Stash breakdown in submission for telemetry / wandb logging | |
| submission["_rubric_breakdown"] = getattr(dag, "last_breakdown", {}) | |
| return score | |
| # --------------------------- Stubs (Hour 0–4 only) ------------------- | |
| class _StubDatasetLoader: | |
| """Placeholder. Replaced in Hour 16 by server.scenarios.dataset_loader.""" | |
| def sample(self, axes: dict[str, int], rng: random.Random) -> dict[str, Any]: | |
| return {"python_code": _STUB_PYTHON_FUNCTION} | |
| _STUB_PROFILES = [ | |
| { | |
| "id": "desktop_avx2", | |
| "cores": 8, | |
| "freq_ghz": 3.8, | |
| "l1_kb": 32, | |
| "simd": "AVX2", | |
| "bw_gbs": 51, | |
| "roofline_bound_gflops": 25.5, | |
| "target": "x86_AVX2", | |
| }, | |
| ] | |
| _STUB_PYTHON_FUNCTION = '''def sum_squares(arr): | |
| """Compute the sum of squares of an array — placeholder during Hour 0-4.""" | |
| total = 0.0 | |
| for x in arr: | |
| total += x * x | |
| return total | |
| ''' | |
| __all__ = [ | |
| "PolyglotOptimaEnvironment", | |
| ] | |