Spaces:
Running
Running
| """ | |
| Rust Coder Environment Implementation. | |
| Evaluates LLM-generated Rust code against 10 sequential coding problems. | |
| Multi-dimensional reward system: Compilation(40%), Correctness(20%), | |
| Coverage(20%), Elegance(10%), Efficiency(10%). | |
| """ | |
| import os | |
| import re | |
| import subprocess | |
| import tempfile | |
| import time | |
| import logging | |
| import json | |
| from typing import Dict, List, Optional, Tuple | |
| from openenv.core.env_server.interfaces import Environment | |
| from models import RustCoderAction, RustCoderObservation | |
| from openai import OpenAI | |
| # Resolve problems.json: look in same dir as this file, then parent | |
| _HERE = os.path.dirname(os.path.abspath(__file__)) | |
| _PROBLEMS_PATHS = [ | |
| os.path.join(_HERE, "problems.json"), # server/problems.json | |
| os.path.join(_HERE, "..", "problems.json"), # root problems.json | |
| "problems.json", # cwd fallback | |
| ] | |
| def _find_problems_file() -> str: | |
| """Return the first existing problems.json path.""" | |
| for path in _PROBLEMS_PATHS: | |
| if os.path.exists(path): | |
| return os.path.abspath(path) | |
| raise FileNotFoundError( | |
| f"problems.json not found. Searched: {_PROBLEMS_PATHS}" | |
| ) | |
| class RustCoderEnvironment(Environment): | |
| """ | |
| OpenEnv-compliant environment for evaluating Rust code submissions. | |
| Manages 10 sequential coding problems. Each episode is a single problem: | |
| - reset() β loads the current problem, returns its description | |
| - step(action) β compiles & tests submitted code, returns reward | |
| - After step(), the episode is done; next reset() loads the next problem. | |
| Reward breakdown (all components normalized to [0, 1]): | |
| Compilation 40% β code compiles without errors | |
| Correctness 20% β fraction of test assertions that pass | |
| Coverage 20% β fraction of tests attempted to run | |
| Elegance 10% β code quality heuristics | |
| Efficiency 10% β execution time vs. problem baseline | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| # Compile / run timeouts (seconds) | |
| COMPILE_TIMEOUT = 30 | |
| RUN_TIMEOUT = 10 | |
| def __init__(self) -> None: | |
| """Initialize environment and load problems from JSON.""" | |
| self._logger = logging.getLogger("rust_coder.env") | |
| self.problems: List[Dict] = self._load_problems() | |
| self.current_problem_idx: int = 0 | |
| self.step_count: int = 0 | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _load_problems(self) -> List[Dict]: | |
| """Load and validate the problems list from problems.json.""" | |
| path = _find_problems_file() | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not isinstance(data, list) or len(data) == 0: | |
| raise ValueError("problems.json must be a non-empty JSON array.") | |
| return data | |
| def _current_problem(self) -> Dict: | |
| idx = self.current_problem_idx % len(self.problems) | |
| return self.problems[idx] | |
| # ------------------------------------------------------------------ | |
| # OpenEnv interface | |
| # ------------------------------------------------------------------ | |
| def state(self): | |
| """Return minimal state info (step count, problem index).""" | |
| from openenv.core.env_server.types import State | |
| return State(episode_id=None, step_count=self.step_count) | |
| def reset(self, start_index: int = 0) -> RustCoderObservation: | |
| """Start a new episode, defaulting to the first problem.""" | |
| self.current_problem_idx = start_index % len(self.problems) | |
| self.step_count = 0 | |
| problem = self.problems[self.current_problem_idx] | |
| return RustCoderObservation( | |
| problem_description=problem["description"], | |
| header_section=problem.get("header_section", ""), | |
| compilation_success=False, | |
| compilation_output="", | |
| test_results=[], | |
| reward_breakdown={}, | |
| done=False, | |
| reward=0.0, | |
| ) | |
| def step(self, action: RustCoderAction) -> RustCoderObservation: | |
| """Evaluate the submitted code and advance the task index within the single episode.""" | |
| self.step_count += 1 | |
| problem = self.problems[self.current_problem_idx] | |
| code = action.code | |
| header = problem.get("header_section", "") | |
| if not code.strip(): | |
| # Some UIs may "step" without providing an action payload. | |
| # Optionally auto-generate code via LLM so the UI can still progress. | |
| # Default ON (because the hosted UI "Step" provides no code). | |
| # To disable: set AUTO_LLM_ON_EMPTY_STEP=0/false/no. | |
| auto_llm_cfg = (os.getenv("AUTO_LLM_ON_EMPTY_STEP") or "1").strip().lower() | |
| auto_llm = auto_llm_cfg not in {"0", "false", "no", "n", "off"} | |
| if auto_llm: | |
| model = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" | |
| base_url = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" | |
| token = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| prompt = problem.get("description", "") | |
| header = problem.get("header_section", "") | |
| if header: | |
| prompt += f"\n\nHeader Section (must be included verbatim in your final code):\n```rust\n{header}\n```" | |
| self._logger.info( | |
| "Auto-LLM on empty step: model=%s base_url=%s prompt_chars=%d token_present=%s", | |
| model, | |
| base_url, | |
| len(prompt), | |
| bool(token), | |
| ) | |
| if not token: | |
| self._logger.error("AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY missing.") | |
| return RustCoderObservation( | |
| problem_description=problem.get("description", ""), | |
| header_section=problem.get("header_section", ""), | |
| compilation_success=False, | |
| compilation_output="Error: AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY is missing.", | |
| test_results=[], | |
| reward_breakdown={ | |
| "compilation": 0.0, | |
| "correctness": 0.0, | |
| "coverage": 0.0, | |
| "elegance": 0.0, | |
| "efficiency": 0.0, | |
| }, | |
| done=False, | |
| reward=0.0, | |
| ) | |
| try: | |
| client_llm = OpenAI(base_url=base_url, api_key=token) | |
| completion = client_llm.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": "You are a senior Rust engineer. Return ONLY the complete Rust code file. No explanation."}, | |
| {"role": "user", "content": prompt + "\n\nReturn the COMPLETE Rust code, including the header section above."}, | |
| ], | |
| temperature=0.1, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| if "```rust" in text: | |
| text = text.split("```rust")[1].split("```")[0] | |
| elif "```" in text: | |
| text = text.split("```")[1].split("```")[0] | |
| text = text.strip() | |
| if text: | |
| code = text | |
| self._logger.info("Auto-LLM generated code chars=%d", len(code)) | |
| else: | |
| self._logger.warning("Auto-LLM returned empty after cleanup.") | |
| except Exception as e: | |
| self._logger.exception("Auto-LLM call failed.") | |
| if not code.strip(): | |
| # Invalid/empty submission: do not advance the problem index. | |
| self._logger.warning( | |
| "Empty code submitted step_count=%d problem_id=%s title=%s", | |
| self.step_count, | |
| problem.get("id"), | |
| problem.get("title"), | |
| ) | |
| # Episode is not finished; allow retry on same problem. | |
| done = False | |
| return RustCoderObservation( | |
| problem_description=problem["description"], | |
| header_section=problem.get("header_section", ""), | |
| compilation_success=False, | |
| compilation_output="Error: no code submitted.", | |
| test_results=[], | |
| reward_breakdown={ | |
| "compilation": 0.0, | |
| "correctness": 0.0, | |
| "coverage": 0.0, | |
| "elegance": 0.0, | |
| "efficiency": 0.0, | |
| }, | |
| done=done, | |
| reward=0.0, | |
| ) | |
| # LeetCode-style header_section is provided as a scaffold/hint in the observation | |
| # and in the prompt, but we do NOT require it to appear verbatim in submissions. | |
| # Correctness is enforced by compilation + the problem's tests. | |
| # ββ 1. Compilation (40%) ββββββββββββββββββββββββββββββββββββββ | |
| compilation_success, compilation_output = self._compile_check(code) | |
| r_compilation = 1.0 if compilation_success else 0.0 | |
| # Warnings are not compilation errors in Rust, but they indicate lower quality. | |
| # Penalize compilation score slightly when warnings are present. | |
| warning_count = 0 | |
| if compilation_output: | |
| warning_count = len(re.findall(r'(?m)^warning:', compilation_output)) | |
| if compilation_success and warning_count > 0: | |
| r_compilation = max(0.6, 1.0 - min(0.05 * warning_count, 0.4)) | |
| # ββ 2. Correctness + Coverage (20% each) βββββββββββββββββββββ | |
| test_results: List[Dict] = [] | |
| r_correctness = 0.0 | |
| r_coverage = 0.0 | |
| if compilation_success: | |
| tests = problem.get("tests", []) | |
| if tests: | |
| test_results = self._run_tests(code, tests) | |
| passed = sum(1 for t in test_results if t.get("passed", False)) | |
| ran = sum(1 for t in test_results if t.get("ran", False)) | |
| r_correctness = passed / len(tests) | |
| r_coverage = ran / len(tests) | |
| else: | |
| # No tests defined β give full credit to both dimensions | |
| r_correctness = 1.0 | |
| r_coverage = 1.0 | |
| # ββ 3. Elegance (10%) βββββββββββββββββββββββββββββββββββββββββ | |
| # Only score elegance for code that compiles; otherwise it can | |
| # incorrectly award points for non-compiling submissions. | |
| r_elegance = self._score_elegance(code) if compilation_success else 0.0 | |
| if compilation_success and warning_count > 0: | |
| r_elegance = max(0.0, round(r_elegance - min(0.02 * warning_count, 0.2), 4)) | |
| # ββ 4. Efficiency (10%) βββββββββββββββββββββββββββββββββββββββ | |
| baseline_ms: float = problem.get("performance_baseline_ms", 100.0) | |
| r_efficiency = 0.0 | |
| if compilation_success: | |
| r_efficiency = self._score_efficiency(code, baseline_ms) | |
| if warning_count > 0: | |
| r_efficiency = max(0.0, round(r_efficiency - min(0.02 * warning_count, 0.2), 4)) | |
| # ββ Total reward ββββββββββββββββββββββββββββββββββββββββββββββ | |
| reward_breakdown = { | |
| "compilation": round(r_compilation, 4), | |
| "correctness": round(r_correctness, 4), | |
| "coverage": round(r_coverage, 4), | |
| "elegance": round(r_elegance, 4), | |
| "efficiency": round(r_efficiency, 4), | |
| } | |
| # Calculate weighted total reward. | |
| # Hard rule: if it doesn't compile, total reward must be 0.0. | |
| if not compilation_success: | |
| total_reward = 0.0 | |
| else: | |
| total_reward = round( | |
| r_compilation * 0.40 | |
| + r_correctness * 0.20 | |
| + r_coverage * 0.20 | |
| + r_elegance * 0.10 | |
| + r_efficiency * 0.10, | |
| 4, | |
| ) | |
| # ββ Advance Logic βββββββββββββββββββββββββββββββββββββββββββββ | |
| # One step = one evaluated task. We advance to the next task, and the episode | |
| # ends only after the final task has been evaluated. | |
| self.current_problem_idx += 1 | |
| done = self.current_problem_idx >= len(self.problems) | |
| next_prob_desc = "--- ALL TASKS COMPLETED in this episode ---" | |
| next_header = "" | |
| if not done: | |
| next_prob = self.problems[self.current_problem_idx] | |
| next_prob_desc = f"--- NEXT TASK: {next_prob['title']} ---\n\n{next_prob['description']}" | |
| next_header = next_prob.get("header_section", "") | |
| # IMPORTANT: The compilation/test results correspond to the code evaluated | |
| # on `problem` (the current task), while the UI should also know what's next. | |
| # To avoid confusion, include both "evaluated" and "next" in the description. | |
| response_problem_desc = ( | |
| f"--- EVALUATED TASK: {problem.get('title', '')} ---\n\n" | |
| f"{problem.get('description', '')}\n\n" | |
| f"{next_prob_desc}" | |
| ) | |
| return RustCoderObservation( | |
| problem_description=response_problem_desc, | |
| header_section=next_header, | |
| compilation_success=compilation_success, | |
| compilation_output=compilation_output[:2000], # cap length | |
| test_results=test_results, | |
| reward_breakdown=reward_breakdown, | |
| done=done, | |
| reward=total_reward, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Compilation | |
| # ------------------------------------------------------------------ | |
| def _compile_check(self, code: str) -> Tuple[bool, str]: | |
| """ | |
| Compile code as a Rust library crate. | |
| Returns (success, compiler output). | |
| """ | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| src = os.path.join(tmpdir, "submission.rs") | |
| out = os.path.join(tmpdir, "submission.rlib") | |
| with open(src, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| try: | |
| proc = subprocess.run( | |
| ["rustc", "--crate-type=lib", src, "-o", out, | |
| "--edition=2021"], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.COMPILE_TIMEOUT, | |
| ) | |
| return proc.returncode == 0, (proc.stdout + proc.stderr).strip() | |
| except subprocess.TimeoutExpired: | |
| return False, "Compilation timed out." | |
| except FileNotFoundError: | |
| return False, "rustc not found β is the Rust toolchain installed?" | |
| # ------------------------------------------------------------------ | |
| # Correctness / Coverage | |
| # ------------------------------------------------------------------ | |
| def _strip_main(self, code: str) -> str: | |
| """ | |
| Remove fn main() { ... } blocks from submitted code so we can | |
| inject our own test main. Handles simple single-level braces. | |
| """ | |
| # Remove pub/private fn main() { ... } | |
| pattern = re.compile( | |
| r'(pub\s+)?fn\s+main\s*\(\s*\)\s*(?:->\s*[^{]+)?\s*\{', | |
| re.MULTILINE, | |
| ) | |
| match = pattern.search(code) | |
| if not match: | |
| return code | |
| start = match.start() | |
| depth = 0 | |
| i = match.end() - 1 # position of the opening '{' | |
| while i < len(code): | |
| if code[i] == '{': | |
| depth += 1 | |
| elif code[i] == '}': | |
| depth -= 1 | |
| if depth == 0: | |
| return code[:start] + code[i + 1:] | |
| i += 1 | |
| return code # malformed; return as-is | |
| def _build_test_binary( | |
| self, code: str, assertion: str, tmpdir: str, test_name: str | |
| ) -> Tuple[bool, str, str]: | |
| """ | |
| Build a runnable Rust binary that executes one test assertion. | |
| Returns (compiled_ok, binary_path, compiler_output). | |
| """ | |
| body = self._strip_main(code) | |
| src_code = f""" | |
| #[allow(unused_imports, dead_code, unused_variables, unused_mut)] | |
| {body} | |
| fn main() {{ | |
| {assertion}; | |
| println!("PASS:{test_name}"); | |
| }} | |
| """ | |
| src_path = os.path.join(tmpdir, f"{test_name}.rs") | |
| bin_path = os.path.join(tmpdir, test_name) | |
| with open(src_path, "w", encoding="utf-8") as f: | |
| f.write(src_code) | |
| try: | |
| proc = subprocess.run( | |
| ["rustc", src_path, "-o", bin_path, "--edition=2021"], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.COMPILE_TIMEOUT, | |
| ) | |
| return proc.returncode == 0, bin_path, (proc.stdout + proc.stderr).strip() | |
| except subprocess.TimeoutExpired: | |
| return False, "", "Compile timed out for test." | |
| except FileNotFoundError: | |
| return False, "", "rustc not found." | |
| def _run_tests(self, code: str, tests: List[Dict]) -> List[Dict]: | |
| """ | |
| Run each test assertion as a separate Rust binary. | |
| Returns list of result dicts with keys: name, passed, ran, error. | |
| """ | |
| results = [] | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| for i, test in enumerate(tests): | |
| name = test.get("name", f"test_{i}") | |
| assertion = test.get("test_assertion", "") | |
| should_compile = test.get("should_compile", True) | |
| result: Dict = { | |
| "name": name, | |
| "passed": False, | |
| "ran": False, | |
| "error": None, | |
| } | |
| if not assertion: | |
| result["error"] = "No test assertion defined." | |
| results.append(result) | |
| continue | |
| # Some tests are expected to fail compilation (should_compile=False) | |
| # treat successful compilation + correct output as pass | |
| bin_test_name = f"t{i}_{name[:20]}" | |
| compiled, bin_path, compiler_out = self._build_test_binary( | |
| code, assertion, tmpdir, bin_test_name | |
| ) | |
| if not compiled: | |
| if not should_compile: | |
| # The problem's starter code deliberately doesn't compile; | |
| # if the submission also doesn't compile this test β skip | |
| result["error"] = "Binary compile failed (expected for broken starter)." | |
| else: | |
| result["error"] = f"Compile error: {compiler_out[:300]}" | |
| result["ran"] = False | |
| results.append(result) | |
| continue | |
| # Run the binary | |
| result["ran"] = True | |
| try: | |
| run_proc = subprocess.run( | |
| [bin_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.RUN_TIMEOUT, | |
| ) | |
| stdout = run_proc.stdout.strip() | |
| if run_proc.returncode == 0 and f"PASS:{bin_test_name}" in stdout: | |
| result["passed"] = True | |
| else: | |
| result["error"] = ( | |
| f"Test failed. Exit={run_proc.returncode}. " | |
| f"stderr={run_proc.stderr[:200]}" | |
| ) | |
| except subprocess.TimeoutExpired: | |
| result["error"] = "Test execution timed out." | |
| except Exception as exc: | |
| result["error"] = str(exc) | |
| results.append(result) | |
| return results | |
| # ------------------------------------------------------------------ | |
| # Elegance scoring | |
| # ------------------------------------------------------------------ | |
| def _score_elegance(self, code: str) -> float: | |
| """ | |
| Heuristic code-quality score in [0, 1]. | |
| Penalties: | |
| - Each `.unwrap()` call β -0.15 (max -0.45) | |
| - Each `.expect(` call β -0.05 (max -0.15) | |
| - Lines > 100 chars β -0.05 per violation (max -0.20) | |
| - `unsafe` blocks β -0.20 unless problem requires FFI | |
| Bonuses: | |
| - Uses `?` operator β +0.10 | |
| - Uses `match` expressions β +0.05 | |
| - Has doc comments (`///`) β +0.05 | |
| """ | |
| score = 1.0 | |
| unwrap_count = len(re.findall(r'\.unwrap\(\)', code)) | |
| score -= min(unwrap_count * 0.15, 0.45) | |
| expect_count = len(re.findall(r'\.expect\(', code)) | |
| score -= min(expect_count * 0.05, 0.15) | |
| long_lines = sum(1 for line in code.splitlines() if len(line) > 100) | |
| score -= min(long_lines * 0.05, 0.20) | |
| if "unsafe" in code: | |
| score -= 0.20 | |
| if "?" in code: | |
| score += 0.10 | |
| if "match " in code or "match\n" in code: | |
| score += 0.05 | |
| if "///" in code: | |
| score += 0.05 | |
| return round(max(0.0, min(1.0, score)), 4) | |
| # ------------------------------------------------------------------ | |
| # Efficiency scoring | |
| # ------------------------------------------------------------------ | |
| def _score_efficiency(self, code: str, baseline_ms: float) -> float: | |
| """ | |
| Time the execution by compiling + running a minimal binary. | |
| Score = min(1.0, baseline_ms / actual_ms). | |
| Returns 0.0 if compilation or execution fails. | |
| """ | |
| body = self._strip_main(code) | |
| # Build a binary with an empty main to measure startup + run overhead | |
| test_src = f""" | |
| #[allow(unused_imports, dead_code, unused_variables)] | |
| {body} | |
| fn main() {{}} | |
| """ | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| src_path = os.path.join(tmpdir, "eff.rs") | |
| bin_path = os.path.join(tmpdir, "eff") | |
| with open(src_path, "w", encoding="utf-8") as f: | |
| f.write(test_src) | |
| try: | |
| # Compile | |
| proc = subprocess.run( | |
| ["rustc", src_path, "-o", bin_path, "--edition=2021"], | |
| capture_output=True, text=True, timeout=self.COMPILE_TIMEOUT, | |
| ) | |
| if proc.returncode != 0: | |
| return 0.0 | |
| # Time the run | |
| t0 = time.monotonic() | |
| run_proc = subprocess.run( | |
| [bin_path], capture_output=True, timeout=self.RUN_TIMEOUT | |
| ) | |
| elapsed_ms = (time.monotonic() - t0) * 1000.0 | |
| if run_proc.returncode != 0: | |
| return 0.0 | |
| return round(min(1.0, baseline_ms / max(elapsed_ms, 0.1)), 4) | |
| except Exception: | |
| return 0.0 | |