Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| VeriRL EDA Evaluator. | |
| Wraps iverilog, vvp, yosys, and optionally SymbiYosys to evaluate agent-submitted | |
| Verilog. Supports multi-file projects: all methods accept either a single string | |
| (backward-compatible) or a Dict[str, str] mapping filename → source. | |
| Provides: | |
| - Compilation checking via iverilog | |
| - Functional simulation via iverilog + vvp against task testbenches | |
| - Synthesis via yosys for area estimation | |
| - Formal verification via SymbiYosys (optional, graceful degradation) | |
| - Weighted per-task grading across compile / sim / timing / area / formal | |
| """ | |
| import base64 | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Dict, Optional, Union | |
| TOOL_TIMEOUT = 30 # seconds — prevents infinite loops in agent-submitted code | |
| SYNTH_TIMEOUT = 60 # synthesis is slower than compilation | |
| FORMAL_TIMEOUT = 90 # SymbiYosys can be slow on complex designs | |
| # --------------------------------------------------------------------------- | |
| # Result data classes | |
| # --------------------------------------------------------------------------- | |
| class CompilationResult: | |
| success: bool | |
| stdout: str | |
| stderr: str | |
| class SimulationResult: | |
| success: bool | |
| stdout: str | |
| stderr: str | |
| tests_passed: int | |
| tests_total: int | |
| timing_cycles: Optional[int] = None | |
| class SynthesisResult: | |
| success: bool | |
| stdout: str | |
| stderr: str | |
| cell_count: int = 0 | |
| class FormalResult: | |
| success: bool # True if all properties proven | |
| stdout: str | |
| stderr: str | |
| properties_proven: int = 0 | |
| properties_total: int = 0 | |
| counterexample: Optional[str] = None # description of first failing trace | |
| def score(self) -> float: | |
| if self.properties_total == 0: | |
| return 0.0 | |
| return self.properties_proven / self.properties_total | |
| class VisualizationResult: | |
| success: bool | |
| stdout: str | |
| stderr: str | |
| image_b64: Optional[str] = None # base64-encoded PNG | |
| format: str = "png" | |
| class EvalResult: | |
| compilation: CompilationResult | |
| simulation: Optional[SimulationResult] = None | |
| synthesis: Optional[SynthesisResult] = None | |
| formal: Optional[FormalResult] = None | |
| final_score: Optional[float] = None | |
| score_breakdown: Optional[Dict[str, float]] = None | |
| def to_agent_feedback(self) -> str: | |
| """Format evaluation results as human-readable feedback for the agent.""" | |
| parts = [] | |
| if not self.compilation.success: | |
| parts.append(f"Compilation FAILED:\n{self.compilation.stderr}") | |
| return "\n".join(parts) | |
| parts.append("Compilation: OK") | |
| if self.simulation: | |
| if self.simulation.tests_total > 0: | |
| parts.append( | |
| f"Simulation: {self.simulation.tests_passed}/{self.simulation.tests_total} " | |
| f"tests passed" | |
| ) | |
| if self.simulation.timing_cycles is not None: | |
| parts.append(f"Timing: {self.simulation.timing_cycles} cycles") | |
| if self.simulation.stdout: | |
| parts.append(f"Sim output:\n{self.simulation.stdout}") | |
| if self.simulation.stderr: | |
| parts.append(f"Sim errors:\n{self.simulation.stderr}") | |
| if self.synthesis: | |
| parts.append(f"Synthesis: {self.synthesis.cell_count} cells") | |
| if self.formal: | |
| parts.append( | |
| f"Formal: {self.formal.properties_proven}/{self.formal.properties_total} " | |
| f"properties proven" | |
| ) | |
| if self.formal.counterexample: | |
| parts.append(f"Counterexample:\n{self.formal.counterexample}") | |
| if self.final_score is not None: | |
| parts.append(f"Score: {self.final_score:.3f}") | |
| if self.score_breakdown: | |
| breakdown_str = ", ".join( | |
| f"{k}={v:.2f}" for k, v in self.score_breakdown.items() | |
| ) | |
| parts.append(f"Breakdown: {breakdown_str}") | |
| return "\n".join(parts) | |
| # --------------------------------------------------------------------------- | |
| # Per-task grading weights | |
| # Tasks with a "formal" key require SymbiYosys; weight is redistributed if | |
| # sby is not installed. | |
| # --------------------------------------------------------------------------- | |
| TASK_WEIGHTS: Dict[str, Dict[str, float]] = { | |
| "mac_unit": { | |
| "compile": 0.10, | |
| "sim": 0.60, | |
| "timing": 0.20, | |
| "area": 0.10, | |
| }, | |
| "axi_fifo": { | |
| "compile": 0.10, | |
| "sim": 0.70, | |
| "area": 0.20, | |
| }, | |
| "systolic_array": { | |
| "compile": 0.05, | |
| "sim": 0.50, | |
| "timing": 0.30, | |
| "area": 0.15, | |
| }, | |
| # --- New tasks --- | |
| "relu_clip": { | |
| "compile": 0.10, | |
| "sim": 0.75, | |
| "formal": 0.10, | |
| "area": 0.05, | |
| }, | |
| "barrel_shifter": { | |
| "compile": 0.10, | |
| "sim": 0.80, | |
| "area": 0.10, | |
| }, | |
| "register_file": { | |
| "compile": 0.10, | |
| "sim": 0.70, | |
| "formal": 0.10, | |
| "area": 0.10, | |
| }, | |
| "ring_buffer": { | |
| "compile": 0.10, | |
| "sim": 0.70, | |
| "area": 0.20, | |
| }, | |
| "dot_product": { | |
| "compile": 0.10, | |
| "sim": 0.60, | |
| "timing": 0.20, | |
| "area": 0.10, | |
| }, | |
| "fir_filter": { | |
| "compile": 0.10, | |
| "sim": 0.60, | |
| "timing": 0.20, | |
| "area": 0.10, | |
| }, | |
| "fp16_adder": { | |
| "compile": 0.05, | |
| "sim": 0.60, | |
| "formal": 0.15, | |
| "area": 0.20, | |
| }, | |
| } | |
| SYSTOLIC_TIMING_LIMIT = 10 | |
| # Yosys stat output uses cell names like $_DFF_P_, $_DFF_N_, $_SDFF_PP0_, etc. | |
| # Each line looks like: " $_SDFF_PP0_ 8" | |
| # Sum all DFF-family cell instance counts. | |
| _DFF_CELL_RE = re.compile(r"\$_[A-Z]*DFF[A-Z0-9_]*\s+(\d+)", re.MULTILINE) | |
| def _count_dffs(synth_stdout: str) -> int: | |
| """Return total DFF instance count from yosys stat output.""" | |
| return sum(int(m) for m in _DFF_CELL_RE.findall(synth_stdout)) | |
| # --------------------------------------------------------------------------- | |
| # Evaluator | |
| # --------------------------------------------------------------------------- | |
| class VerilogEvaluator: | |
| """ | |
| Evaluates agent-submitted Verilog using real EDA tools. | |
| Accepts both single-file (str) and multi-file (Dict[str, str]) inputs. | |
| """ | |
| def __init__(self, timeout: int = TOOL_TIMEOUT): | |
| self.timeout = timeout | |
| self._sby_available: Optional[bool] = None # cached check | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _files_to_dict( | |
| self, source: Union[str, Dict[str, str]] | |
| ) -> Dict[str, str]: | |
| """Normalize single-string source to a filename dict.""" | |
| if isinstance(source, str): | |
| return {"design.v": source} | |
| return dict(source) | |
| def _write_files_to_dir( | |
| self, files: Dict[str, str], tmpdir: str | |
| ) -> list: | |
| """Write all source files to tmpdir. Returns list of absolute paths.""" | |
| paths = [] | |
| for fname, src in files.items(): | |
| # Sanitize filename — no path traversal | |
| safe_name = Path(fname).name | |
| fpath = os.path.join(tmpdir, safe_name) | |
| with open(fpath, "w") as f: | |
| f.write(src) | |
| paths.append(fpath) | |
| return paths | |
| def _check_sby(self) -> bool: | |
| """Check once whether sby (SymbiYosys) is installed.""" | |
| if self._sby_available is None: | |
| self._sby_available = shutil.which("sby") is not None | |
| return self._sby_available | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def compile( | |
| self, source: Union[str, Dict[str, str]] | |
| ) -> CompilationResult: | |
| """Syntax-check Verilog using iverilog. Accepts single file or multi-file dict.""" | |
| files = self._files_to_dict(source) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| paths = self._write_files_to_dir(files, tmpdir) | |
| try: | |
| result = subprocess.run( | |
| ["iverilog", "-o", "/dev/null", "-Wall"] + paths, | |
| capture_output=True, | |
| text=True, | |
| timeout=self.timeout, | |
| ) | |
| return CompilationResult( | |
| success=(result.returncode == 0), | |
| stdout=result.stdout, | |
| stderr=result.stderr, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return CompilationResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: compilation timed out after 30s", | |
| ) | |
| except FileNotFoundError: | |
| return CompilationResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: iverilog not found — is it installed?", | |
| ) | |
| def simulate( | |
| self, | |
| source: Union[str, Dict[str, str]], | |
| testbench_path: str, | |
| ) -> SimulationResult: | |
| """ | |
| Compile agent files together with the task testbench and run with vvp. | |
| Testbenches use $display("PASS: ...") / $display("FAIL: ...") conventions. | |
| Counts lines to compute test score. Extracts "CYCLES: N" for timing. | |
| """ | |
| files = self._files_to_dict(source) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| paths = self._write_files_to_dir(files, tmpdir) | |
| sim_binary = os.path.join(tmpdir, "sim.vvp") | |
| try: | |
| compile_result = subprocess.run( | |
| ["iverilog", "-o", sim_binary] + paths + [testbench_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.timeout, | |
| ) | |
| if compile_result.returncode != 0: | |
| return SimulationResult( | |
| success=False, | |
| stdout=compile_result.stdout, | |
| stderr=compile_result.stderr, | |
| tests_passed=0, | |
| tests_total=0, | |
| ) | |
| sim_result = subprocess.run( | |
| ["vvp", sim_binary], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.timeout, | |
| ) | |
| stdout = sim_result.stdout | |
| passes = len(re.findall(r"^PASS:", stdout, re.MULTILINE)) | |
| fails = len(re.findall(r"^FAIL:", stdout, re.MULTILINE)) | |
| timing_match = re.search(r"CYCLES:\s*(\d+)", stdout) | |
| return SimulationResult( | |
| success=(sim_result.returncode == 0), | |
| stdout=stdout, | |
| stderr=sim_result.stderr, | |
| tests_passed=passes, | |
| tests_total=passes + fails, | |
| timing_cycles=int(timing_match.group(1)) if timing_match else None, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return SimulationResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: simulation timed out — possible infinite loop in submitted code", | |
| tests_passed=0, | |
| tests_total=0, | |
| ) | |
| def synthesize( | |
| self, source: Union[str, Dict[str, str]] | |
| ) -> SynthesisResult: | |
| """ | |
| Run Yosys synthesis. Uses 'synth -flatten; stat' for deterministic cell count. | |
| Accepts single file or multi-file dict. | |
| """ | |
| files = self._files_to_dict(source) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| paths = self._write_files_to_dir(files, tmpdir) | |
| read_cmds = " ".join(f"read_verilog {p};" for p in paths) | |
| synth_script = f"{read_cmds} synth -flatten; stat" | |
| try: | |
| result = subprocess.run( | |
| ["yosys", "-p", synth_script], | |
| capture_output=True, | |
| text=True, | |
| timeout=SYNTH_TIMEOUT, | |
| ) | |
| combined = result.stdout + result.stderr | |
| cell_match = re.search(r"Number of cells:\s*(\d+)", combined) | |
| cell_count = int(cell_match.group(1)) if cell_match else 0 | |
| return SynthesisResult( | |
| success=(result.returncode == 0 and cell_count > 0), | |
| stdout=result.stdout, | |
| stderr=result.stderr, | |
| cell_count=cell_count, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return SynthesisResult( | |
| success=False, stdout="", stderr="ERROR: synthesis timed out", | |
| cell_count=0, | |
| ) | |
| def visualize( | |
| self, | |
| source: Union[str, Dict[str, str]], | |
| top_module: Optional[str] = None, | |
| ) -> "VisualizationResult": | |
| """ | |
| Generate a PNG netlist diagram using yosys show + graphviz dot. | |
| Returns base64-encoded PNG on success. | |
| Requires graphviz (dot) to be installed alongside yosys. | |
| """ | |
| if not shutil.which("dot"): | |
| return VisualizationResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: graphviz 'dot' not found — install graphviz to enable visualization.", | |
| ) | |
| files = self._files_to_dict(source) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| paths = self._write_files_to_dir(files, tmpdir) | |
| out_prefix = os.path.join(tmpdir, "netlist") | |
| read_cmds = " ".join(f"read_verilog {p};" for p in paths) | |
| show_cmd = f"show -format png -prefix {out_prefix}" | |
| if top_module: | |
| show_cmd += f" {top_module}" | |
| synth_script = f"{read_cmds} proc; opt; {show_cmd}" | |
| try: | |
| result = subprocess.run( | |
| ["yosys", "-p", synth_script], | |
| capture_output=True, | |
| text=True, | |
| timeout=SYNTH_TIMEOUT, | |
| ) | |
| png_path = f"{out_prefix}.png" | |
| if os.path.exists(png_path): | |
| with open(png_path, "rb") as f: | |
| img_b64 = base64.b64encode(f.read()).decode() | |
| return VisualizationResult( | |
| success=True, | |
| stdout=result.stdout, | |
| stderr=result.stderr, | |
| image_b64=img_b64, | |
| format="png", | |
| ) | |
| return VisualizationResult( | |
| success=False, | |
| stdout=result.stdout, | |
| stderr=result.stderr or "ERROR: yosys produced no PNG output", | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return VisualizationResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: visualization timed out", | |
| ) | |
| def formal_verify( | |
| self, | |
| source: Union[str, Dict[str, str]], | |
| properties_path: str, | |
| ) -> FormalResult: | |
| """ | |
| Run SymbiYosys formal verification on the agent's design + task properties. | |
| Returns a FormalResult with properties_proven / properties_total and any | |
| counterexample trace. Returns a graceful failure if sby is not installed. | |
| """ | |
| if not self._check_sby(): | |
| return FormalResult( | |
| success=False, | |
| stdout="", | |
| stderr="SymbiYosys (sby) not installed — formal verification skipped.", | |
| properties_proven=0, | |
| properties_total=0, | |
| ) | |
| files = self._files_to_dict(source) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| design_paths = self._write_files_to_dir(files, tmpdir) | |
| props_dest = os.path.join(tmpdir, "properties.sv") | |
| # Copy properties file | |
| import shutil as _shutil | |
| _shutil.copy(properties_path, props_dest) | |
| # Parse the top module name from the properties file itself. | |
| # Cannot use the file stem ("properties") — the module name is | |
| # defined inside, e.g. "module relu_clip_formal #(...)". | |
| props_text = Path(properties_path).read_text() | |
| top_match = re.search(r"^\s*module\s+(\w+)", props_text, re.MULTILINE) | |
| top_module = top_match.group(1) if top_match else Path(properties_path).stem | |
| # Build sby config | |
| read_cmds = "\n".join(f"read -formal {p}" for p in design_paths) | |
| sby_config = f"""\ | |
| [options] | |
| mode prove | |
| depth 20 | |
| [engines] | |
| smtbmc | |
| [script] | |
| {read_cmds} | |
| read -formal {props_dest} | |
| prep -top {top_module} | |
| """ | |
| sby_file = os.path.join(tmpdir, "verify.sby") | |
| with open(sby_file, "w") as f: | |
| f.write(sby_config) | |
| try: | |
| result = subprocess.run( | |
| ["sby", "-f", sby_file], | |
| capture_output=True, | |
| text=True, | |
| timeout=FORMAL_TIMEOUT, | |
| cwd=tmpdir, | |
| ) | |
| combined = result.stdout + result.stderr | |
| # Parse sby output for PASS / FAIL lines | |
| proven = len(re.findall(r"PASS", combined, re.IGNORECASE)) | |
| failed_matches = re.findall(r"FAIL", combined, re.IGNORECASE) | |
| # Heuristic: count unique assertion checks mentioned | |
| assert_checks = re.findall(r"assert\s+\w+", combined) | |
| total = max(proven + len(failed_matches), len(assert_checks), 1) | |
| counterexample = None | |
| if failed_matches: | |
| # Extract first counterexample trace (after "Counterexample:") | |
| cex_match = re.search( | |
| r"(Counterexample.*?)(?=\n\n|\Z)", combined, re.DOTALL | |
| ) | |
| if cex_match: | |
| counterexample = cex_match.group(1)[:500] # cap at 500 chars | |
| success = (result.returncode == 0) and not failed_matches | |
| return FormalResult( | |
| success=success, | |
| stdout=result.stdout[:2000], | |
| stderr=result.stderr[:500], | |
| properties_proven=proven if success else max(0, proven - len(failed_matches)), | |
| properties_total=total, | |
| counterexample=counterexample, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return FormalResult( | |
| success=False, | |
| stdout="", | |
| stderr="ERROR: formal verification timed out after 90s", | |
| properties_proven=0, | |
| properties_total=1, | |
| ) | |
| def grade( | |
| self, | |
| source: Union[str, Dict[str, str]], | |
| task_id: str, | |
| testbench_path: str, | |
| reference_cells: int, | |
| properties_path: Optional[str] = None, | |
| ) -> EvalResult: | |
| """ | |
| Full grading pipeline: compile → simulate → synthesize → [formal] → weighted score. | |
| Args: | |
| source: Agent-submitted Verilog (str or Dict[filename, src]) | |
| task_id: Task identifier | |
| testbench_path: Absolute path to the task testbench file | |
| reference_cells: Reference synthesis cell count for area scoring | |
| properties_path: Optional path to .sv formal properties file | |
| Returns: | |
| EvalResult with final_score in [0.01, 0.99] and all breakdown values | |
| """ | |
| files = self._files_to_dict(source) | |
| primary_src = "\n".join(files.values()) | |
| weights = dict(TASK_WEIGHTS.get(task_id, {"compile": 1.0})) | |
| breakdown: Dict[str, float] = {k: 0.0 for k in weights} | |
| # If formal is in weights but sby is unavailable (or no properties file), | |
| # redistribute formal weight to sim. | |
| if "formal" in weights: | |
| formal_avail = bool(properties_path) and self._check_sby() | |
| if not formal_avail: | |
| weights["sim"] = weights.get("sim", 0) + weights.pop("formal") | |
| breakdown.pop("formal", None) | |
| breakdown = {k: 0.0 for k in weights} | |
| def _clamp(d: Dict[str, float]) -> Dict[str, float]: | |
| return {k: max(0.01, min(0.99, v)) for k, v in d.items()} | |
| if not primary_src.strip(): | |
| return EvalResult( | |
| compilation=CompilationResult( | |
| success=False, stdout="", stderr="Empty submission" | |
| ), | |
| final_score=0.01, | |
| score_breakdown=_clamp(breakdown), | |
| ) | |
| # Step 1: Compilation | |
| comp = self.compile(files) | |
| if not comp.success: | |
| return EvalResult( | |
| compilation=comp, | |
| final_score=0.01, | |
| score_breakdown=_clamp(breakdown), | |
| ) | |
| breakdown["compile"] = 0.99 | |
| # Step 2: Simulation | |
| sim = self.simulate(files, testbench_path) | |
| if sim.tests_total > 0: | |
| ratio = sim.tests_passed / sim.tests_total | |
| breakdown["sim"] = max(0.01, min(ratio, 0.99)) | |
| # Step 3: Synthesis (always run if code compiles) | |
| synth = self.synthesize(files) | |
| # Step 4: Timing scoring (task-specific) | |
| if "timing" in breakdown: | |
| if task_id == "mac_unit" and synth is not None: | |
| dff_count = _count_dffs(synth.stdout) | |
| if dff_count >= 2: | |
| breakdown["timing"] = 0.99 | |
| elif synth.cell_count >= 10: | |
| breakdown["timing"] = 0.50 | |
| elif task_id == "systolic_array": | |
| if sim.timing_cycles is not None: | |
| cycles = sim.timing_cycles | |
| if cycles <= SYSTOLIC_TIMING_LIMIT: | |
| breakdown["timing"] = 0.99 | |
| elif cycles <= SYSTOLIC_TIMING_LIMIT + 3: | |
| breakdown["timing"] = max( | |
| 0.01, min(1.0 - (cycles - SYSTOLIC_TIMING_LIMIT) * 0.2, 0.99) | |
| ) | |
| elif sim.tests_passed > 0: | |
| breakdown["timing"] = 0.20 | |
| elif sim.tests_passed > 0: | |
| breakdown["timing"] = 0.20 | |
| elif task_id in ("dot_product", "fir_filter"): | |
| dff_count = _count_dffs(synth.stdout) | |
| if dff_count >= 2: | |
| breakdown["timing"] = 0.99 | |
| elif synth.cell_count >= 5: | |
| breakdown["timing"] = 0.40 | |
| # Step 5: Area scoring (gated on functional correctness) | |
| if ( | |
| "area" in breakdown | |
| and synth is not None | |
| and synth.success | |
| and synth.cell_count > 0 | |
| and reference_cells > 0 | |
| and sim.tests_passed > 0 | |
| ): | |
| ratio = reference_cells / synth.cell_count | |
| breakdown["area"] = max(0.01, min(ratio, 0.99)) | |
| # Step 6: Formal verification (optional) | |
| formal_result = None | |
| if "formal" in breakdown and properties_path: | |
| formal_result = self.formal_verify(files, properties_path) | |
| breakdown["formal"] = max(0.01, min(formal_result.score, 0.99)) | |
| breakdown = _clamp(breakdown) | |
| # Recompute weights for any keys that may have been redistributed | |
| active_weights = {k: weights[k] for k in weights if k in breakdown} | |
| weight_sum = sum(active_weights.values()) | |
| if weight_sum > 0: | |
| norm_weights = {k: v / weight_sum for k, v in active_weights.items()} | |
| else: | |
| norm_weights = active_weights | |
| raw_score = round(sum(norm_weights[k] * breakdown[k] for k in norm_weights), 4) | |
| final_score = max(0.01, min(0.99, raw_score)) | |
| return EvalResult( | |
| compilation=comp, | |
| simulation=sim, | |
| synthesis=synth, | |
| formal=formal_result, | |
| final_score=final_score, | |
| score_breakdown=breakdown, | |
| ) | |