""" Rust Coder Environment Implementation. Evaluates LLM-generated Rust code against 10 sequential coding problems. Multi-dimensional reward system: Compilation(40%), Correctness(20%), Coverage(20%), Elegance(10%), Efficiency(10%). """ import os import re import subprocess import tempfile import time import logging import json from typing import Dict, List, Optional, Tuple from openenv.core.env_server.interfaces import Environment from models import RustCoderAction, RustCoderObservation from openai import OpenAI # Resolve problems.json: look in same dir as this file, then parent _HERE = os.path.dirname(os.path.abspath(__file__)) _PROBLEMS_PATHS = [ os.path.join(_HERE, "problems.json"), # server/problems.json os.path.join(_HERE, "..", "problems.json"), # root problems.json "problems.json", # cwd fallback ] def _find_problems_file() -> str: """Return the first existing problems.json path.""" for path in _PROBLEMS_PATHS: if os.path.exists(path): return os.path.abspath(path) raise FileNotFoundError( f"problems.json not found. Searched: {_PROBLEMS_PATHS}" ) class RustCoderEnvironment(Environment): """ OpenEnv-compliant environment for evaluating Rust code submissions. Manages 10 sequential coding problems. Each episode is a single problem: - reset() → loads the current problem, returns its description - step(action) → compiles & tests submitted code, returns reward - After step(), the episode is done; next reset() loads the next problem. Reward breakdown (all components normalized to [0, 1]): Compilation 40% — code compiles without errors Correctness 20% — fraction of test assertions that pass Coverage 20% — fraction of tests attempted to run Elegance 10% — code quality heuristics Efficiency 10% — execution time vs. problem baseline """ SUPPORTS_CONCURRENT_SESSIONS: bool = True # Compile / run timeouts (seconds) COMPILE_TIMEOUT = 30 RUN_TIMEOUT = 10 def __init__(self) -> None: """Initialize environment and load problems from JSON.""" self._logger = logging.getLogger("rust_coder.env") self.problems: List[Dict] = self._load_problems() self.current_problem_idx: int = 0 self.step_count: int = 0 # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _load_problems(self) -> List[Dict]: """Load and validate the problems list from problems.json.""" path = _find_problems_file() with open(path, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, list) or len(data) == 0: raise ValueError("problems.json must be a non-empty JSON array.") return data def _current_problem(self) -> Dict: idx = self.current_problem_idx % len(self.problems) return self.problems[idx] # ------------------------------------------------------------------ # OpenEnv interface # ------------------------------------------------------------------ @property def state(self): """Return minimal state info (step count, problem index).""" from openenv.core.env_server.types import State return State(episode_id=None, step_count=self.step_count) def reset(self, start_index: int = 0) -> RustCoderObservation: """Start a new episode, defaulting to the first problem.""" self.current_problem_idx = start_index % len(self.problems) self.step_count = 0 problem = self.problems[self.current_problem_idx] return RustCoderObservation( problem_description=problem["description"], header_section=problem.get("header_section", ""), compilation_success=False, compilation_output="", test_results=[], reward_breakdown={}, done=False, reward=0.0, ) def step(self, action: RustCoderAction) -> RustCoderObservation: """Evaluate the submitted code and advance the task index within the single episode.""" self.step_count += 1 problem = self.problems[self.current_problem_idx] code = action.code header = problem.get("header_section", "") if not code.strip(): # Some UIs may "step" without providing an action payload. # Optionally auto-generate code via LLM so the UI can still progress. # Default ON (because the hosted UI "Step" provides no code). # To disable: set AUTO_LLM_ON_EMPTY_STEP=0/false/no. auto_llm_cfg = (os.getenv("AUTO_LLM_ON_EMPTY_STEP") or "1").strip().lower() auto_llm = auto_llm_cfg not in {"0", "false", "no", "n", "off"} if auto_llm: model = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" base_url = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" token = os.getenv("HF_TOKEN") or os.getenv("API_KEY") prompt = problem.get("description", "") header = problem.get("header_section", "") if header: prompt += f"\n\nHeader Section (must be included verbatim in your final code):\n```rust\n{header}\n```" self._logger.info( "Auto-LLM on empty step: model=%s base_url=%s prompt_chars=%d token_present=%s", model, base_url, len(prompt), bool(token), ) if not token: self._logger.error("AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY missing.") return RustCoderObservation( problem_description=problem.get("description", ""), header_section=problem.get("header_section", ""), compilation_success=False, compilation_output="Error: AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY is missing.", test_results=[], reward_breakdown={ "compilation": 0.0, "correctness": 0.0, "coverage": 0.0, "elegance": 0.0, "efficiency": 0.0, }, done=False, reward=0.0, ) try: client_llm = OpenAI(base_url=base_url, api_key=token) completion = client_llm.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a senior Rust engineer. Return ONLY the complete Rust code file. No explanation."}, {"role": "user", "content": prompt + "\n\nReturn the COMPLETE Rust code, including the header section above."}, ], temperature=0.1, ) text = (completion.choices[0].message.content or "").strip() if "```rust" in text: text = text.split("```rust")[1].split("```")[0] elif "```" in text: text = text.split("```")[1].split("```")[0] text = text.strip() if text: code = text self._logger.info("Auto-LLM generated code chars=%d", len(code)) else: self._logger.warning("Auto-LLM returned empty after cleanup.") except Exception as e: self._logger.exception("Auto-LLM call failed.") if not code.strip(): # Invalid/empty submission: do not advance the problem index. self._logger.warning( "Empty code submitted step_count=%d problem_id=%s title=%s", self.step_count, problem.get("id"), problem.get("title"), ) # Episode is not finished; allow retry on same problem. done = False return RustCoderObservation( problem_description=problem["description"], header_section=problem.get("header_section", ""), compilation_success=False, compilation_output="Error: no code submitted.", test_results=[], reward_breakdown={ "compilation": 0.0, "correctness": 0.0, "coverage": 0.0, "elegance": 0.0, "efficiency": 0.0, }, done=done, reward=0.0, ) # LeetCode-style header_section is provided as a scaffold/hint in the observation # and in the prompt, but we do NOT require it to appear verbatim in submissions. # Correctness is enforced by compilation + the problem's tests. # ── 1. Compilation (40%) ────────────────────────────────────── compilation_success, compilation_output = self._compile_check(code) r_compilation = 1.0 if compilation_success else 0.0 # Warnings are not compilation errors in Rust, but they indicate lower quality. # Penalize compilation score slightly when warnings are present. warning_count = 0 if compilation_output: warning_count = len(re.findall(r'(?m)^warning:', compilation_output)) if compilation_success and warning_count > 0: r_compilation = max(0.6, 1.0 - min(0.05 * warning_count, 0.4)) # ── 2. Correctness + Coverage (20% each) ───────────────────── test_results: List[Dict] = [] r_correctness = 0.0 r_coverage = 0.0 if compilation_success: tests = problem.get("tests", []) if tests: test_results = self._run_tests(code, tests) passed = sum(1 for t in test_results if t.get("passed", False)) ran = sum(1 for t in test_results if t.get("ran", False)) r_correctness = passed / len(tests) r_coverage = ran / len(tests) else: # No tests defined — give full credit to both dimensions r_correctness = 1.0 r_coverage = 1.0 # ── 3. Elegance (10%) ───────────────────────────────────────── # Only score elegance for code that compiles; otherwise it can # incorrectly award points for non-compiling submissions. r_elegance = self._score_elegance(code) if compilation_success else 0.0 if compilation_success and warning_count > 0: r_elegance = max(0.0, round(r_elegance - min(0.02 * warning_count, 0.2), 4)) # ── 4. Efficiency (10%) ─────────────────────────────────────── baseline_ms: float = problem.get("performance_baseline_ms", 100.0) r_efficiency = 0.0 if compilation_success: r_efficiency = self._score_efficiency(code, baseline_ms) if warning_count > 0: r_efficiency = max(0.0, round(r_efficiency - min(0.02 * warning_count, 0.2), 4)) # ── Total reward ────────────────────────────────────────────── reward_breakdown = { "compilation": round(r_compilation, 4), "correctness": round(r_correctness, 4), "coverage": round(r_coverage, 4), "elegance": round(r_elegance, 4), "efficiency": round(r_efficiency, 4), } # Calculate weighted total reward. # Hard rule: if it doesn't compile, total reward must be 0.0. if not compilation_success: total_reward = 0.0 else: total_reward = round( r_compilation * 0.40 + r_correctness * 0.20 + r_coverage * 0.20 + r_elegance * 0.10 + r_efficiency * 0.10, 4, ) # ── Advance Logic ───────────────────────────────────────────── # One step = one evaluated task. We advance to the next task, and the episode # ends only after the final task has been evaluated. self.current_problem_idx += 1 done = self.current_problem_idx >= len(self.problems) next_prob_desc = "--- ALL TASKS COMPLETED in this episode ---" next_header = "" if not done: next_prob = self.problems[self.current_problem_idx] next_prob_desc = f"--- NEXT TASK: {next_prob['title']} ---\n\n{next_prob['description']}" next_header = next_prob.get("header_section", "") # IMPORTANT: The compilation/test results correspond to the code evaluated # on `problem` (the current task), while the UI should also know what's next. # To avoid confusion, include both "evaluated" and "next" in the description. response_problem_desc = ( f"--- EVALUATED TASK: {problem.get('title', '')} ---\n\n" f"{problem.get('description', '')}\n\n" f"{next_prob_desc}" ) return RustCoderObservation( problem_description=response_problem_desc, header_section=next_header, compilation_success=compilation_success, compilation_output=compilation_output[:2000], # cap length test_results=test_results, reward_breakdown=reward_breakdown, done=done, reward=total_reward, ) # ------------------------------------------------------------------ # Compilation # ------------------------------------------------------------------ def _compile_check(self, code: str) -> Tuple[bool, str]: """ Compile code as a Rust library crate. Returns (success, compiler output). """ with tempfile.TemporaryDirectory() as tmpdir: src = os.path.join(tmpdir, "submission.rs") out = os.path.join(tmpdir, "submission.rlib") with open(src, "w", encoding="utf-8") as f: f.write(code) try: proc = subprocess.run( ["rustc", "--crate-type=lib", src, "-o", out, "--edition=2021"], capture_output=True, text=True, timeout=self.COMPILE_TIMEOUT, ) return proc.returncode == 0, (proc.stdout + proc.stderr).strip() except subprocess.TimeoutExpired: return False, "Compilation timed out." except FileNotFoundError: return False, "rustc not found — is the Rust toolchain installed?" # ------------------------------------------------------------------ # Correctness / Coverage # ------------------------------------------------------------------ def _strip_main(self, code: str) -> str: """ Remove fn main() { ... } blocks from submitted code so we can inject our own test main. Handles simple single-level braces. """ # Remove pub/private fn main() { ... } pattern = re.compile( r'(pub\s+)?fn\s+main\s*\(\s*\)\s*(?:->\s*[^{]+)?\s*\{', re.MULTILINE, ) match = pattern.search(code) if not match: return code start = match.start() depth = 0 i = match.end() - 1 # position of the opening '{' while i < len(code): if code[i] == '{': depth += 1 elif code[i] == '}': depth -= 1 if depth == 0: return code[:start] + code[i + 1:] i += 1 return code # malformed; return as-is def _build_test_binary( self, code: str, assertion: str, tmpdir: str, test_name: str ) -> Tuple[bool, str, str]: """ Build a runnable Rust binary that executes one test assertion. Returns (compiled_ok, binary_path, compiler_output). """ body = self._strip_main(code) src_code = f""" #[allow(unused_imports, dead_code, unused_variables, unused_mut)] {body} fn main() {{ {assertion}; println!("PASS:{test_name}"); }} """ src_path = os.path.join(tmpdir, f"{test_name}.rs") bin_path = os.path.join(tmpdir, test_name) with open(src_path, "w", encoding="utf-8") as f: f.write(src_code) try: proc = subprocess.run( ["rustc", src_path, "-o", bin_path, "--edition=2021"], capture_output=True, text=True, timeout=self.COMPILE_TIMEOUT, ) return proc.returncode == 0, bin_path, (proc.stdout + proc.stderr).strip() except subprocess.TimeoutExpired: return False, "", "Compile timed out for test." except FileNotFoundError: return False, "", "rustc not found." def _run_tests(self, code: str, tests: List[Dict]) -> List[Dict]: """ Run each test assertion as a separate Rust binary. Returns list of result dicts with keys: name, passed, ran, error. """ results = [] with tempfile.TemporaryDirectory() as tmpdir: for i, test in enumerate(tests): name = test.get("name", f"test_{i}") assertion = test.get("test_assertion", "") should_compile = test.get("should_compile", True) result: Dict = { "name": name, "passed": False, "ran": False, "error": None, } if not assertion: result["error"] = "No test assertion defined." results.append(result) continue # Some tests are expected to fail compilation (should_compile=False) # treat successful compilation + correct output as pass bin_test_name = f"t{i}_{name[:20]}" compiled, bin_path, compiler_out = self._build_test_binary( code, assertion, tmpdir, bin_test_name ) if not compiled: if not should_compile: # The problem's starter code deliberately doesn't compile; # if the submission also doesn't compile this test → skip result["error"] = "Binary compile failed (expected for broken starter)." else: result["error"] = f"Compile error: {compiler_out[:300]}" result["ran"] = False results.append(result) continue # Run the binary result["ran"] = True try: run_proc = subprocess.run( [bin_path], capture_output=True, text=True, timeout=self.RUN_TIMEOUT, ) stdout = run_proc.stdout.strip() if run_proc.returncode == 0 and f"PASS:{bin_test_name}" in stdout: result["passed"] = True else: result["error"] = ( f"Test failed. Exit={run_proc.returncode}. " f"stderr={run_proc.stderr[:200]}" ) except subprocess.TimeoutExpired: result["error"] = "Test execution timed out." except Exception as exc: result["error"] = str(exc) results.append(result) return results # ------------------------------------------------------------------ # Elegance scoring # ------------------------------------------------------------------ def _score_elegance(self, code: str) -> float: """ Heuristic code-quality score in [0, 1]. Penalties: - Each `.unwrap()` call → -0.15 (max -0.45) - Each `.expect(` call → -0.05 (max -0.15) - Lines > 100 chars → -0.05 per violation (max -0.20) - `unsafe` blocks → -0.20 unless problem requires FFI Bonuses: - Uses `?` operator → +0.10 - Uses `match` expressions → +0.05 - Has doc comments (`///`) → +0.05 """ score = 1.0 unwrap_count = len(re.findall(r'\.unwrap\(\)', code)) score -= min(unwrap_count * 0.15, 0.45) expect_count = len(re.findall(r'\.expect\(', code)) score -= min(expect_count * 0.05, 0.15) long_lines = sum(1 for line in code.splitlines() if len(line) > 100) score -= min(long_lines * 0.05, 0.20) if "unsafe" in code: score -= 0.20 if "?" in code: score += 0.10 if "match " in code or "match\n" in code: score += 0.05 if "///" in code: score += 0.05 return round(max(0.0, min(1.0, score)), 4) # ------------------------------------------------------------------ # Efficiency scoring # ------------------------------------------------------------------ def _score_efficiency(self, code: str, baseline_ms: float) -> float: """ Time the execution by compiling + running a minimal binary. Score = min(1.0, baseline_ms / actual_ms). Returns 0.0 if compilation or execution fails. """ body = self._strip_main(code) # Build a binary with an empty main to measure startup + run overhead test_src = f""" #[allow(unused_imports, dead_code, unused_variables)] {body} fn main() {{}} """ with tempfile.TemporaryDirectory() as tmpdir: src_path = os.path.join(tmpdir, "eff.rs") bin_path = os.path.join(tmpdir, "eff") with open(src_path, "w", encoding="utf-8") as f: f.write(test_src) try: # Compile proc = subprocess.run( ["rustc", src_path, "-o", bin_path, "--edition=2021"], capture_output=True, text=True, timeout=self.COMPILE_TIMEOUT, ) if proc.returncode != 0: return 0.0 # Time the run t0 = time.monotonic() run_proc = subprocess.run( [bin_path], capture_output=True, timeout=self.RUN_TIMEOUT ) elapsed_ms = (time.monotonic() - t0) * 1000.0 if run_proc.returncode != 0: return 0.0 return round(min(1.0, baseline_ms / max(elapsed_ms, 0.1)), 4) except Exception: return 0.0