rust_coder / server /rust_coder_environment.py
Parthiban007's picture
Upload folder using huggingface_hub
2154988 verified
"""
Rust Coder Environment Implementation.
Evaluates LLM-generated Rust code against 10 sequential coding problems.
Multi-dimensional reward system: Compilation(40%), Correctness(20%),
Coverage(20%), Elegance(10%), Efficiency(10%).
"""
import os
import re
import subprocess
import tempfile
import time
import logging
import json
from typing import Dict, List, Optional, Tuple
from openenv.core.env_server.interfaces import Environment
from models import RustCoderAction, RustCoderObservation
from openai import OpenAI
# Resolve problems.json: look in same dir as this file, then parent
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROBLEMS_PATHS = [
os.path.join(_HERE, "problems.json"), # server/problems.json
os.path.join(_HERE, "..", "problems.json"), # root problems.json
"problems.json", # cwd fallback
]
def _find_problems_file() -> str:
"""Return the first existing problems.json path."""
for path in _PROBLEMS_PATHS:
if os.path.exists(path):
return os.path.abspath(path)
raise FileNotFoundError(
f"problems.json not found. Searched: {_PROBLEMS_PATHS}"
)
class RustCoderEnvironment(Environment):
"""
OpenEnv-compliant environment for evaluating Rust code submissions.
Manages 10 sequential coding problems. Each episode is a single problem:
- reset() β†’ loads the current problem, returns its description
- step(action) β†’ compiles & tests submitted code, returns reward
- After step(), the episode is done; next reset() loads the next problem.
Reward breakdown (all components normalized to [0, 1]):
Compilation 40% β€” code compiles without errors
Correctness 20% β€” fraction of test assertions that pass
Coverage 20% β€” fraction of tests attempted to run
Elegance 10% β€” code quality heuristics
Efficiency 10% β€” execution time vs. problem baseline
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
# Compile / run timeouts (seconds)
COMPILE_TIMEOUT = 30
RUN_TIMEOUT = 10
def __init__(self) -> None:
"""Initialize environment and load problems from JSON."""
self._logger = logging.getLogger("rust_coder.env")
self.problems: List[Dict] = self._load_problems()
self.current_problem_idx: int = 0
self.step_count: int = 0
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load_problems(self) -> List[Dict]:
"""Load and validate the problems list from problems.json."""
path = _find_problems_file()
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list) or len(data) == 0:
raise ValueError("problems.json must be a non-empty JSON array.")
return data
def _current_problem(self) -> Dict:
idx = self.current_problem_idx % len(self.problems)
return self.problems[idx]
# ------------------------------------------------------------------
# OpenEnv interface
# ------------------------------------------------------------------
@property
def state(self):
"""Return minimal state info (step count, problem index)."""
from openenv.core.env_server.types import State
return State(episode_id=None, step_count=self.step_count)
def reset(self, start_index: int = 0) -> RustCoderObservation:
"""Start a new episode, defaulting to the first problem."""
self.current_problem_idx = start_index % len(self.problems)
self.step_count = 0
problem = self.problems[self.current_problem_idx]
return RustCoderObservation(
problem_description=problem["description"],
header_section=problem.get("header_section", ""),
compilation_success=False,
compilation_output="",
test_results=[],
reward_breakdown={},
done=False,
reward=0.0,
)
def step(self, action: RustCoderAction) -> RustCoderObservation:
"""Evaluate the submitted code and advance the task index within the single episode."""
self.step_count += 1
problem = self.problems[self.current_problem_idx]
code = action.code
header = problem.get("header_section", "")
if not code.strip():
# Some UIs may "step" without providing an action payload.
# Optionally auto-generate code via LLM so the UI can still progress.
# Default ON (because the hosted UI "Step" provides no code).
# To disable: set AUTO_LLM_ON_EMPTY_STEP=0/false/no.
auto_llm_cfg = (os.getenv("AUTO_LLM_ON_EMPTY_STEP") or "1").strip().lower()
auto_llm = auto_llm_cfg not in {"0", "false", "no", "n", "off"}
if auto_llm:
model = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct"
base_url = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
token = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
prompt = problem.get("description", "")
header = problem.get("header_section", "")
if header:
prompt += f"\n\nHeader Section (must be included verbatim in your final code):\n```rust\n{header}\n```"
self._logger.info(
"Auto-LLM on empty step: model=%s base_url=%s prompt_chars=%d token_present=%s",
model,
base_url,
len(prompt),
bool(token),
)
if not token:
self._logger.error("AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY missing.")
return RustCoderObservation(
problem_description=problem.get("description", ""),
header_section=problem.get("header_section", ""),
compilation_success=False,
compilation_output="Error: AUTO_LLM_ON_EMPTY_STEP enabled but HF_TOKEN/API_KEY is missing.",
test_results=[],
reward_breakdown={
"compilation": 0.0,
"correctness": 0.0,
"coverage": 0.0,
"elegance": 0.0,
"efficiency": 0.0,
},
done=False,
reward=0.0,
)
try:
client_llm = OpenAI(base_url=base_url, api_key=token)
completion = client_llm.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a senior Rust engineer. Return ONLY the complete Rust code file. No explanation."},
{"role": "user", "content": prompt + "\n\nReturn the COMPLETE Rust code, including the header section above."},
],
temperature=0.1,
)
text = (completion.choices[0].message.content or "").strip()
if "```rust" in text:
text = text.split("```rust")[1].split("```")[0]
elif "```" in text:
text = text.split("```")[1].split("```")[0]
text = text.strip()
if text:
code = text
self._logger.info("Auto-LLM generated code chars=%d", len(code))
else:
self._logger.warning("Auto-LLM returned empty after cleanup.")
except Exception as e:
self._logger.exception("Auto-LLM call failed.")
if not code.strip():
# Invalid/empty submission: do not advance the problem index.
self._logger.warning(
"Empty code submitted step_count=%d problem_id=%s title=%s",
self.step_count,
problem.get("id"),
problem.get("title"),
)
# Episode is not finished; allow retry on same problem.
done = False
return RustCoderObservation(
problem_description=problem["description"],
header_section=problem.get("header_section", ""),
compilation_success=False,
compilation_output="Error: no code submitted.",
test_results=[],
reward_breakdown={
"compilation": 0.0,
"correctness": 0.0,
"coverage": 0.0,
"elegance": 0.0,
"efficiency": 0.0,
},
done=done,
reward=0.0,
)
# LeetCode-style header_section is provided as a scaffold/hint in the observation
# and in the prompt, but we do NOT require it to appear verbatim in submissions.
# Correctness is enforced by compilation + the problem's tests.
# ── 1. Compilation (40%) ──────────────────────────────────────
compilation_success, compilation_output = self._compile_check(code)
r_compilation = 1.0 if compilation_success else 0.0
# Warnings are not compilation errors in Rust, but they indicate lower quality.
# Penalize compilation score slightly when warnings are present.
warning_count = 0
if compilation_output:
warning_count = len(re.findall(r'(?m)^warning:', compilation_output))
if compilation_success and warning_count > 0:
r_compilation = max(0.6, 1.0 - min(0.05 * warning_count, 0.4))
# ── 2. Correctness + Coverage (20% each) ─────────────────────
test_results: List[Dict] = []
r_correctness = 0.0
r_coverage = 0.0
if compilation_success:
tests = problem.get("tests", [])
if tests:
test_results = self._run_tests(code, tests)
passed = sum(1 for t in test_results if t.get("passed", False))
ran = sum(1 for t in test_results if t.get("ran", False))
r_correctness = passed / len(tests)
r_coverage = ran / len(tests)
else:
# No tests defined β€” give full credit to both dimensions
r_correctness = 1.0
r_coverage = 1.0
# ── 3. Elegance (10%) ─────────────────────────────────────────
# Only score elegance for code that compiles; otherwise it can
# incorrectly award points for non-compiling submissions.
r_elegance = self._score_elegance(code) if compilation_success else 0.0
if compilation_success and warning_count > 0:
r_elegance = max(0.0, round(r_elegance - min(0.02 * warning_count, 0.2), 4))
# ── 4. Efficiency (10%) ───────────────────────────────────────
baseline_ms: float = problem.get("performance_baseline_ms", 100.0)
r_efficiency = 0.0
if compilation_success:
r_efficiency = self._score_efficiency(code, baseline_ms)
if warning_count > 0:
r_efficiency = max(0.0, round(r_efficiency - min(0.02 * warning_count, 0.2), 4))
# ── Total reward ──────────────────────────────────────────────
reward_breakdown = {
"compilation": round(r_compilation, 4),
"correctness": round(r_correctness, 4),
"coverage": round(r_coverage, 4),
"elegance": round(r_elegance, 4),
"efficiency": round(r_efficiency, 4),
}
# Calculate weighted total reward.
# Hard rule: if it doesn't compile, total reward must be 0.0.
if not compilation_success:
total_reward = 0.0
else:
total_reward = round(
r_compilation * 0.40
+ r_correctness * 0.20
+ r_coverage * 0.20
+ r_elegance * 0.10
+ r_efficiency * 0.10,
4,
)
# ── Advance Logic ─────────────────────────────────────────────
# One step = one evaluated task. We advance to the next task, and the episode
# ends only after the final task has been evaluated.
self.current_problem_idx += 1
done = self.current_problem_idx >= len(self.problems)
next_prob_desc = "--- ALL TASKS COMPLETED in this episode ---"
next_header = ""
if not done:
next_prob = self.problems[self.current_problem_idx]
next_prob_desc = f"--- NEXT TASK: {next_prob['title']} ---\n\n{next_prob['description']}"
next_header = next_prob.get("header_section", "")
# IMPORTANT: The compilation/test results correspond to the code evaluated
# on `problem` (the current task), while the UI should also know what's next.
# To avoid confusion, include both "evaluated" and "next" in the description.
response_problem_desc = (
f"--- EVALUATED TASK: {problem.get('title', '')} ---\n\n"
f"{problem.get('description', '')}\n\n"
f"{next_prob_desc}"
)
return RustCoderObservation(
problem_description=response_problem_desc,
header_section=next_header,
compilation_success=compilation_success,
compilation_output=compilation_output[:2000], # cap length
test_results=test_results,
reward_breakdown=reward_breakdown,
done=done,
reward=total_reward,
)
# ------------------------------------------------------------------
# Compilation
# ------------------------------------------------------------------
def _compile_check(self, code: str) -> Tuple[bool, str]:
"""
Compile code as a Rust library crate.
Returns (success, compiler output).
"""
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "submission.rs")
out = os.path.join(tmpdir, "submission.rlib")
with open(src, "w", encoding="utf-8") as f:
f.write(code)
try:
proc = subprocess.run(
["rustc", "--crate-type=lib", src, "-o", out,
"--edition=2021"],
capture_output=True,
text=True,
timeout=self.COMPILE_TIMEOUT,
)
return proc.returncode == 0, (proc.stdout + proc.stderr).strip()
except subprocess.TimeoutExpired:
return False, "Compilation timed out."
except FileNotFoundError:
return False, "rustc not found β€” is the Rust toolchain installed?"
# ------------------------------------------------------------------
# Correctness / Coverage
# ------------------------------------------------------------------
def _strip_main(self, code: str) -> str:
"""
Remove fn main() { ... } blocks from submitted code so we can
inject our own test main. Handles simple single-level braces.
"""
# Remove pub/private fn main() { ... }
pattern = re.compile(
r'(pub\s+)?fn\s+main\s*\(\s*\)\s*(?:->\s*[^{]+)?\s*\{',
re.MULTILINE,
)
match = pattern.search(code)
if not match:
return code
start = match.start()
depth = 0
i = match.end() - 1 # position of the opening '{'
while i < len(code):
if code[i] == '{':
depth += 1
elif code[i] == '}':
depth -= 1
if depth == 0:
return code[:start] + code[i + 1:]
i += 1
return code # malformed; return as-is
def _build_test_binary(
self, code: str, assertion: str, tmpdir: str, test_name: str
) -> Tuple[bool, str, str]:
"""
Build a runnable Rust binary that executes one test assertion.
Returns (compiled_ok, binary_path, compiler_output).
"""
body = self._strip_main(code)
src_code = f"""
#[allow(unused_imports, dead_code, unused_variables, unused_mut)]
{body}
fn main() {{
{assertion};
println!("PASS:{test_name}");
}}
"""
src_path = os.path.join(tmpdir, f"{test_name}.rs")
bin_path = os.path.join(tmpdir, test_name)
with open(src_path, "w", encoding="utf-8") as f:
f.write(src_code)
try:
proc = subprocess.run(
["rustc", src_path, "-o", bin_path, "--edition=2021"],
capture_output=True,
text=True,
timeout=self.COMPILE_TIMEOUT,
)
return proc.returncode == 0, bin_path, (proc.stdout + proc.stderr).strip()
except subprocess.TimeoutExpired:
return False, "", "Compile timed out for test."
except FileNotFoundError:
return False, "", "rustc not found."
def _run_tests(self, code: str, tests: List[Dict]) -> List[Dict]:
"""
Run each test assertion as a separate Rust binary.
Returns list of result dicts with keys: name, passed, ran, error.
"""
results = []
with tempfile.TemporaryDirectory() as tmpdir:
for i, test in enumerate(tests):
name = test.get("name", f"test_{i}")
assertion = test.get("test_assertion", "")
should_compile = test.get("should_compile", True)
result: Dict = {
"name": name,
"passed": False,
"ran": False,
"error": None,
}
if not assertion:
result["error"] = "No test assertion defined."
results.append(result)
continue
# Some tests are expected to fail compilation (should_compile=False)
# treat successful compilation + correct output as pass
bin_test_name = f"t{i}_{name[:20]}"
compiled, bin_path, compiler_out = self._build_test_binary(
code, assertion, tmpdir, bin_test_name
)
if not compiled:
if not should_compile:
# The problem's starter code deliberately doesn't compile;
# if the submission also doesn't compile this test β†’ skip
result["error"] = "Binary compile failed (expected for broken starter)."
else:
result["error"] = f"Compile error: {compiler_out[:300]}"
result["ran"] = False
results.append(result)
continue
# Run the binary
result["ran"] = True
try:
run_proc = subprocess.run(
[bin_path],
capture_output=True,
text=True,
timeout=self.RUN_TIMEOUT,
)
stdout = run_proc.stdout.strip()
if run_proc.returncode == 0 and f"PASS:{bin_test_name}" in stdout:
result["passed"] = True
else:
result["error"] = (
f"Test failed. Exit={run_proc.returncode}. "
f"stderr={run_proc.stderr[:200]}"
)
except subprocess.TimeoutExpired:
result["error"] = "Test execution timed out."
except Exception as exc:
result["error"] = str(exc)
results.append(result)
return results
# ------------------------------------------------------------------
# Elegance scoring
# ------------------------------------------------------------------
def _score_elegance(self, code: str) -> float:
"""
Heuristic code-quality score in [0, 1].
Penalties:
- Each `.unwrap()` call β†’ -0.15 (max -0.45)
- Each `.expect(` call β†’ -0.05 (max -0.15)
- Lines > 100 chars β†’ -0.05 per violation (max -0.20)
- `unsafe` blocks β†’ -0.20 unless problem requires FFI
Bonuses:
- Uses `?` operator β†’ +0.10
- Uses `match` expressions β†’ +0.05
- Has doc comments (`///`) β†’ +0.05
"""
score = 1.0
unwrap_count = len(re.findall(r'\.unwrap\(\)', code))
score -= min(unwrap_count * 0.15, 0.45)
expect_count = len(re.findall(r'\.expect\(', code))
score -= min(expect_count * 0.05, 0.15)
long_lines = sum(1 for line in code.splitlines() if len(line) > 100)
score -= min(long_lines * 0.05, 0.20)
if "unsafe" in code:
score -= 0.20
if "?" in code:
score += 0.10
if "match " in code or "match\n" in code:
score += 0.05
if "///" in code:
score += 0.05
return round(max(0.0, min(1.0, score)), 4)
# ------------------------------------------------------------------
# Efficiency scoring
# ------------------------------------------------------------------
def _score_efficiency(self, code: str, baseline_ms: float) -> float:
"""
Time the execution by compiling + running a minimal binary.
Score = min(1.0, baseline_ms / actual_ms).
Returns 0.0 if compilation or execution fails.
"""
body = self._strip_main(code)
# Build a binary with an empty main to measure startup + run overhead
test_src = f"""
#[allow(unused_imports, dead_code, unused_variables)]
{body}
fn main() {{}}
"""
with tempfile.TemporaryDirectory() as tmpdir:
src_path = os.path.join(tmpdir, "eff.rs")
bin_path = os.path.join(tmpdir, "eff")
with open(src_path, "w", encoding="utf-8") as f:
f.write(test_src)
try:
# Compile
proc = subprocess.run(
["rustc", src_path, "-o", bin_path, "--edition=2021"],
capture_output=True, text=True, timeout=self.COMPILE_TIMEOUT,
)
if proc.returncode != 0:
return 0.0
# Time the run
t0 = time.monotonic()
run_proc = subprocess.run(
[bin_path], capture_output=True, timeout=self.RUN_TIMEOUT
)
elapsed_ms = (time.monotonic() - t0) * 1000.0
if run_proc.returncode != 0:
return 0.0
return round(min(1.0, baseline_ms / max(elapsed_ms, 0.1)), 4)
except Exception:
return 0.0