verirl-env / server /evaluator.py
Supreeth's picture
Upload folder using huggingface_hub
e3ca86e verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
VeriRL EDA Evaluator.
Wraps iverilog, vvp, yosys, and optionally SymbiYosys to evaluate agent-submitted
Verilog. Supports multi-file projects: all methods accept either a single string
(backward-compatible) or a Dict[str, str] mapping filename → source.
Provides:
- Compilation checking via iverilog
- Functional simulation via iverilog + vvp against task testbenches
- Synthesis via yosys for area estimation
- Formal verification via SymbiYosys (optional, graceful degradation)
- Weighted per-task grading across compile / sim / timing / area / formal
"""
import base64
import os
import re
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Union
TOOL_TIMEOUT = 30 # seconds — prevents infinite loops in agent-submitted code
SYNTH_TIMEOUT = 60 # synthesis is slower than compilation
FORMAL_TIMEOUT = 90 # SymbiYosys can be slow on complex designs
# ---------------------------------------------------------------------------
# Result data classes
# ---------------------------------------------------------------------------
@dataclass
class CompilationResult:
success: bool
stdout: str
stderr: str
@dataclass
class SimulationResult:
success: bool
stdout: str
stderr: str
tests_passed: int
tests_total: int
timing_cycles: Optional[int] = None
@dataclass
class SynthesisResult:
success: bool
stdout: str
stderr: str
cell_count: int = 0
@dataclass
class FormalResult:
success: bool # True if all properties proven
stdout: str
stderr: str
properties_proven: int = 0
properties_total: int = 0
counterexample: Optional[str] = None # description of first failing trace
@property
def score(self) -> float:
if self.properties_total == 0:
return 0.0
return self.properties_proven / self.properties_total
@dataclass
class VisualizationResult:
success: bool
stdout: str
stderr: str
image_b64: Optional[str] = None # base64-encoded PNG
format: str = "png"
@dataclass
class EvalResult:
compilation: CompilationResult
simulation: Optional[SimulationResult] = None
synthesis: Optional[SynthesisResult] = None
formal: Optional[FormalResult] = None
final_score: Optional[float] = None
score_breakdown: Optional[Dict[str, float]] = None
def to_agent_feedback(self) -> str:
"""Format evaluation results as human-readable feedback for the agent."""
parts = []
if not self.compilation.success:
parts.append(f"Compilation FAILED:\n{self.compilation.stderr}")
return "\n".join(parts)
parts.append("Compilation: OK")
if self.simulation:
if self.simulation.tests_total > 0:
parts.append(
f"Simulation: {self.simulation.tests_passed}/{self.simulation.tests_total} "
f"tests passed"
)
if self.simulation.timing_cycles is not None:
parts.append(f"Timing: {self.simulation.timing_cycles} cycles")
if self.simulation.stdout:
parts.append(f"Sim output:\n{self.simulation.stdout}")
if self.simulation.stderr:
parts.append(f"Sim errors:\n{self.simulation.stderr}")
if self.synthesis:
parts.append(f"Synthesis: {self.synthesis.cell_count} cells")
if self.formal:
parts.append(
f"Formal: {self.formal.properties_proven}/{self.formal.properties_total} "
f"properties proven"
)
if self.formal.counterexample:
parts.append(f"Counterexample:\n{self.formal.counterexample}")
if self.final_score is not None:
parts.append(f"Score: {self.final_score:.3f}")
if self.score_breakdown:
breakdown_str = ", ".join(
f"{k}={v:.2f}" for k, v in self.score_breakdown.items()
)
parts.append(f"Breakdown: {breakdown_str}")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# Per-task grading weights
# Tasks with a "formal" key require SymbiYosys; weight is redistributed if
# sby is not installed.
# ---------------------------------------------------------------------------
TASK_WEIGHTS: Dict[str, Dict[str, float]] = {
"mac_unit": {
"compile": 0.10,
"sim": 0.60,
"timing": 0.20,
"area": 0.10,
},
"axi_fifo": {
"compile": 0.10,
"sim": 0.70,
"area": 0.20,
},
"systolic_array": {
"compile": 0.05,
"sim": 0.50,
"timing": 0.30,
"area": 0.15,
},
# --- New tasks ---
"relu_clip": {
"compile": 0.10,
"sim": 0.75,
"formal": 0.10,
"area": 0.05,
},
"barrel_shifter": {
"compile": 0.10,
"sim": 0.80,
"area": 0.10,
},
"register_file": {
"compile": 0.10,
"sim": 0.70,
"formal": 0.10,
"area": 0.10,
},
"ring_buffer": {
"compile": 0.10,
"sim": 0.70,
"area": 0.20,
},
"dot_product": {
"compile": 0.10,
"sim": 0.60,
"timing": 0.20,
"area": 0.10,
},
"fir_filter": {
"compile": 0.10,
"sim": 0.60,
"timing": 0.20,
"area": 0.10,
},
"fp16_adder": {
"compile": 0.05,
"sim": 0.60,
"formal": 0.15,
"area": 0.20,
},
}
SYSTOLIC_TIMING_LIMIT = 10
# Yosys stat output uses cell names like $_DFF_P_, $_DFF_N_, $_SDFF_PP0_, etc.
# Each line looks like: " $_SDFF_PP0_ 8"
# Sum all DFF-family cell instance counts.
_DFF_CELL_RE = re.compile(r"\$_[A-Z]*DFF[A-Z0-9_]*\s+(\d+)", re.MULTILINE)
def _count_dffs(synth_stdout: str) -> int:
"""Return total DFF instance count from yosys stat output."""
return sum(int(m) for m in _DFF_CELL_RE.findall(synth_stdout))
# ---------------------------------------------------------------------------
# Evaluator
# ---------------------------------------------------------------------------
class VerilogEvaluator:
"""
Evaluates agent-submitted Verilog using real EDA tools.
Accepts both single-file (str) and multi-file (Dict[str, str]) inputs.
"""
def __init__(self, timeout: int = TOOL_TIMEOUT):
self.timeout = timeout
self._sby_available: Optional[bool] = None # cached check
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _files_to_dict(
self, source: Union[str, Dict[str, str]]
) -> Dict[str, str]:
"""Normalize single-string source to a filename dict."""
if isinstance(source, str):
return {"design.v": source}
return dict(source)
def _write_files_to_dir(
self, files: Dict[str, str], tmpdir: str
) -> list:
"""Write all source files to tmpdir. Returns list of absolute paths."""
paths = []
for fname, src in files.items():
# Sanitize filename — no path traversal
safe_name = Path(fname).name
fpath = os.path.join(tmpdir, safe_name)
with open(fpath, "w") as f:
f.write(src)
paths.append(fpath)
return paths
def _check_sby(self) -> bool:
"""Check once whether sby (SymbiYosys) is installed."""
if self._sby_available is None:
self._sby_available = shutil.which("sby") is not None
return self._sby_available
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def compile(
self, source: Union[str, Dict[str, str]]
) -> CompilationResult:
"""Syntax-check Verilog using iverilog. Accepts single file or multi-file dict."""
files = self._files_to_dict(source)
with tempfile.TemporaryDirectory() as tmpdir:
paths = self._write_files_to_dir(files, tmpdir)
try:
result = subprocess.run(
["iverilog", "-o", "/dev/null", "-Wall"] + paths,
capture_output=True,
text=True,
timeout=self.timeout,
)
return CompilationResult(
success=(result.returncode == 0),
stdout=result.stdout,
stderr=result.stderr,
)
except subprocess.TimeoutExpired:
return CompilationResult(
success=False,
stdout="",
stderr="ERROR: compilation timed out after 30s",
)
except FileNotFoundError:
return CompilationResult(
success=False,
stdout="",
stderr="ERROR: iverilog not found — is it installed?",
)
def simulate(
self,
source: Union[str, Dict[str, str]],
testbench_path: str,
) -> SimulationResult:
"""
Compile agent files together with the task testbench and run with vvp.
Testbenches use $display("PASS: ...") / $display("FAIL: ...") conventions.
Counts lines to compute test score. Extracts "CYCLES: N" for timing.
"""
files = self._files_to_dict(source)
with tempfile.TemporaryDirectory() as tmpdir:
paths = self._write_files_to_dir(files, tmpdir)
sim_binary = os.path.join(tmpdir, "sim.vvp")
try:
compile_result = subprocess.run(
["iverilog", "-o", sim_binary] + paths + [testbench_path],
capture_output=True,
text=True,
timeout=self.timeout,
)
if compile_result.returncode != 0:
return SimulationResult(
success=False,
stdout=compile_result.stdout,
stderr=compile_result.stderr,
tests_passed=0,
tests_total=0,
)
sim_result = subprocess.run(
["vvp", sim_binary],
capture_output=True,
text=True,
timeout=self.timeout,
)
stdout = sim_result.stdout
passes = len(re.findall(r"^PASS:", stdout, re.MULTILINE))
fails = len(re.findall(r"^FAIL:", stdout, re.MULTILINE))
timing_match = re.search(r"CYCLES:\s*(\d+)", stdout)
return SimulationResult(
success=(sim_result.returncode == 0),
stdout=stdout,
stderr=sim_result.stderr,
tests_passed=passes,
tests_total=passes + fails,
timing_cycles=int(timing_match.group(1)) if timing_match else None,
)
except subprocess.TimeoutExpired:
return SimulationResult(
success=False,
stdout="",
stderr="ERROR: simulation timed out — possible infinite loop in submitted code",
tests_passed=0,
tests_total=0,
)
def synthesize(
self, source: Union[str, Dict[str, str]]
) -> SynthesisResult:
"""
Run Yosys synthesis. Uses 'synth -flatten; stat' for deterministic cell count.
Accepts single file or multi-file dict.
"""
files = self._files_to_dict(source)
with tempfile.TemporaryDirectory() as tmpdir:
paths = self._write_files_to_dir(files, tmpdir)
read_cmds = " ".join(f"read_verilog {p};" for p in paths)
synth_script = f"{read_cmds} synth -flatten; stat"
try:
result = subprocess.run(
["yosys", "-p", synth_script],
capture_output=True,
text=True,
timeout=SYNTH_TIMEOUT,
)
combined = result.stdout + result.stderr
cell_match = re.search(r"Number of cells:\s*(\d+)", combined)
cell_count = int(cell_match.group(1)) if cell_match else 0
return SynthesisResult(
success=(result.returncode == 0 and cell_count > 0),
stdout=result.stdout,
stderr=result.stderr,
cell_count=cell_count,
)
except subprocess.TimeoutExpired:
return SynthesisResult(
success=False, stdout="", stderr="ERROR: synthesis timed out",
cell_count=0,
)
def visualize(
self,
source: Union[str, Dict[str, str]],
top_module: Optional[str] = None,
) -> "VisualizationResult":
"""
Generate a PNG netlist diagram using yosys show + graphviz dot.
Returns base64-encoded PNG on success.
Requires graphviz (dot) to be installed alongside yosys.
"""
if not shutil.which("dot"):
return VisualizationResult(
success=False,
stdout="",
stderr="ERROR: graphviz 'dot' not found — install graphviz to enable visualization.",
)
files = self._files_to_dict(source)
with tempfile.TemporaryDirectory() as tmpdir:
paths = self._write_files_to_dir(files, tmpdir)
out_prefix = os.path.join(tmpdir, "netlist")
read_cmds = " ".join(f"read_verilog {p};" for p in paths)
show_cmd = f"show -format png -prefix {out_prefix}"
if top_module:
show_cmd += f" {top_module}"
synth_script = f"{read_cmds} proc; opt; {show_cmd}"
try:
result = subprocess.run(
["yosys", "-p", synth_script],
capture_output=True,
text=True,
timeout=SYNTH_TIMEOUT,
)
png_path = f"{out_prefix}.png"
if os.path.exists(png_path):
with open(png_path, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
return VisualizationResult(
success=True,
stdout=result.stdout,
stderr=result.stderr,
image_b64=img_b64,
format="png",
)
return VisualizationResult(
success=False,
stdout=result.stdout,
stderr=result.stderr or "ERROR: yosys produced no PNG output",
)
except subprocess.TimeoutExpired:
return VisualizationResult(
success=False,
stdout="",
stderr="ERROR: visualization timed out",
)
def formal_verify(
self,
source: Union[str, Dict[str, str]],
properties_path: str,
) -> FormalResult:
"""
Run SymbiYosys formal verification on the agent's design + task properties.
Returns a FormalResult with properties_proven / properties_total and any
counterexample trace. Returns a graceful failure if sby is not installed.
"""
if not self._check_sby():
return FormalResult(
success=False,
stdout="",
stderr="SymbiYosys (sby) not installed — formal verification skipped.",
properties_proven=0,
properties_total=0,
)
files = self._files_to_dict(source)
with tempfile.TemporaryDirectory() as tmpdir:
design_paths = self._write_files_to_dir(files, tmpdir)
props_dest = os.path.join(tmpdir, "properties.sv")
# Copy properties file
import shutil as _shutil
_shutil.copy(properties_path, props_dest)
# Parse the top module name from the properties file itself.
# Cannot use the file stem ("properties") — the module name is
# defined inside, e.g. "module relu_clip_formal #(...)".
props_text = Path(properties_path).read_text()
top_match = re.search(r"^\s*module\s+(\w+)", props_text, re.MULTILINE)
top_module = top_match.group(1) if top_match else Path(properties_path).stem
# Build sby config
read_cmds = "\n".join(f"read -formal {p}" for p in design_paths)
sby_config = f"""\
[options]
mode prove
depth 20
[engines]
smtbmc
[script]
{read_cmds}
read -formal {props_dest}
prep -top {top_module}
"""
sby_file = os.path.join(tmpdir, "verify.sby")
with open(sby_file, "w") as f:
f.write(sby_config)
try:
result = subprocess.run(
["sby", "-f", sby_file],
capture_output=True,
text=True,
timeout=FORMAL_TIMEOUT,
cwd=tmpdir,
)
combined = result.stdout + result.stderr
# Parse sby output for PASS / FAIL lines
proven = len(re.findall(r"PASS", combined, re.IGNORECASE))
failed_matches = re.findall(r"FAIL", combined, re.IGNORECASE)
# Heuristic: count unique assertion checks mentioned
assert_checks = re.findall(r"assert\s+\w+", combined)
total = max(proven + len(failed_matches), len(assert_checks), 1)
counterexample = None
if failed_matches:
# Extract first counterexample trace (after "Counterexample:")
cex_match = re.search(
r"(Counterexample.*?)(?=\n\n|\Z)", combined, re.DOTALL
)
if cex_match:
counterexample = cex_match.group(1)[:500] # cap at 500 chars
success = (result.returncode == 0) and not failed_matches
return FormalResult(
success=success,
stdout=result.stdout[:2000],
stderr=result.stderr[:500],
properties_proven=proven if success else max(0, proven - len(failed_matches)),
properties_total=total,
counterexample=counterexample,
)
except subprocess.TimeoutExpired:
return FormalResult(
success=False,
stdout="",
stderr="ERROR: formal verification timed out after 90s",
properties_proven=0,
properties_total=1,
)
def grade(
self,
source: Union[str, Dict[str, str]],
task_id: str,
testbench_path: str,
reference_cells: int,
properties_path: Optional[str] = None,
) -> EvalResult:
"""
Full grading pipeline: compile → simulate → synthesize → [formal] → weighted score.
Args:
source: Agent-submitted Verilog (str or Dict[filename, src])
task_id: Task identifier
testbench_path: Absolute path to the task testbench file
reference_cells: Reference synthesis cell count for area scoring
properties_path: Optional path to .sv formal properties file
Returns:
EvalResult with final_score in [0.01, 0.99] and all breakdown values
"""
files = self._files_to_dict(source)
primary_src = "\n".join(files.values())
weights = dict(TASK_WEIGHTS.get(task_id, {"compile": 1.0}))
breakdown: Dict[str, float] = {k: 0.0 for k in weights}
# If formal is in weights but sby is unavailable (or no properties file),
# redistribute formal weight to sim.
if "formal" in weights:
formal_avail = bool(properties_path) and self._check_sby()
if not formal_avail:
weights["sim"] = weights.get("sim", 0) + weights.pop("formal")
breakdown.pop("formal", None)
breakdown = {k: 0.0 for k in weights}
def _clamp(d: Dict[str, float]) -> Dict[str, float]:
return {k: max(0.01, min(0.99, v)) for k, v in d.items()}
if not primary_src.strip():
return EvalResult(
compilation=CompilationResult(
success=False, stdout="", stderr="Empty submission"
),
final_score=0.01,
score_breakdown=_clamp(breakdown),
)
# Step 1: Compilation
comp = self.compile(files)
if not comp.success:
return EvalResult(
compilation=comp,
final_score=0.01,
score_breakdown=_clamp(breakdown),
)
breakdown["compile"] = 0.99
# Step 2: Simulation
sim = self.simulate(files, testbench_path)
if sim.tests_total > 0:
ratio = sim.tests_passed / sim.tests_total
breakdown["sim"] = max(0.01, min(ratio, 0.99))
# Step 3: Synthesis (always run if code compiles)
synth = self.synthesize(files)
# Step 4: Timing scoring (task-specific)
if "timing" in breakdown:
if task_id == "mac_unit" and synth is not None:
dff_count = _count_dffs(synth.stdout)
if dff_count >= 2:
breakdown["timing"] = 0.99
elif synth.cell_count >= 10:
breakdown["timing"] = 0.50
elif task_id == "systolic_array":
if sim.timing_cycles is not None:
cycles = sim.timing_cycles
if cycles <= SYSTOLIC_TIMING_LIMIT:
breakdown["timing"] = 0.99
elif cycles <= SYSTOLIC_TIMING_LIMIT + 3:
breakdown["timing"] = max(
0.01, min(1.0 - (cycles - SYSTOLIC_TIMING_LIMIT) * 0.2, 0.99)
)
elif sim.tests_passed > 0:
breakdown["timing"] = 0.20
elif sim.tests_passed > 0:
breakdown["timing"] = 0.20
elif task_id in ("dot_product", "fir_filter"):
dff_count = _count_dffs(synth.stdout)
if dff_count >= 2:
breakdown["timing"] = 0.99
elif synth.cell_count >= 5:
breakdown["timing"] = 0.40
# Step 5: Area scoring (gated on functional correctness)
if (
"area" in breakdown
and synth is not None
and synth.success
and synth.cell_count > 0
and reference_cells > 0
and sim.tests_passed > 0
):
ratio = reference_cells / synth.cell_count
breakdown["area"] = max(0.01, min(ratio, 0.99))
# Step 6: Formal verification (optional)
formal_result = None
if "formal" in breakdown and properties_path:
formal_result = self.formal_verify(files, properties_path)
breakdown["formal"] = max(0.01, min(formal_result.score, 0.99))
breakdown = _clamp(breakdown)
# Recompute weights for any keys that may have been redistributed
active_weights = {k: weights[k] for k in weights if k in breakdown}
weight_sum = sum(active_weights.values())
if weight_sum > 0:
norm_weights = {k: v / weight_sum for k, v in active_weights.items()}
else:
norm_weights = active_weights
raw_score = round(sum(norm_weights[k] * breakdown[k] for k in norm_weights), 4)
final_score = max(0.01, min(0.99, raw_score))
return EvalResult(
compilation=comp,
simulation=sim,
synthesis=synth,
formal=formal_result,
final_score=final_score,
score_breakdown=breakdown,
)