File size: 24,834 Bytes
3951517
 
 
 
 
 
 
 
 
2b4ae64
 
 
 
 
 
 
 
 
 
3951517
 
e3ca86e
3951517
 
2b4ae64
3951517
 
2b4ae64
 
 
3951517
2b4ae64
3951517
2b4ae64
 
 
 
 
3951517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3ca86e
 
 
 
 
 
 
 
 
3951517
 
 
 
 
2b4ae64
3951517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b4ae64
 
 
 
 
 
 
3951517
 
 
 
 
 
 
 
 
 
2b4ae64
3951517
2b4ae64
 
 
 
3951517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3951517
 
2b4ae64
 
12ac0fd
 
 
 
 
 
 
 
 
 
2b4ae64
 
 
 
3951517
 
 
 
2b4ae64
 
 
3951517
 
 
 
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3951517
2b4ae64
 
 
 
 
3951517
2b4ae64
3951517
2b4ae64
 
3951517
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3951517
 
2b4ae64
 
3951517
 
 
 
2b4ae64
 
 
3951517
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3ca86e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3951517
2b4ae64
 
 
 
 
3951517
2b4ae64
 
 
 
 
 
 
3951517
 
2b4ae64
 
 
3951517
2b4ae64
 
 
 
 
 
 
 
 
 
12ac0fd
 
 
 
 
 
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
12ac0fd
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3951517
 
 
2b4ae64
3951517
 
 
2b4ae64
3951517
 
2b4ae64
3951517
 
2b4ae64
 
3951517
 
2b4ae64
3951517
 
2b4ae64
3951517
2b4ae64
 
 
 
3951517
 
2b4ae64
 
 
 
 
 
 
 
 
 
fc68368
 
2b4ae64
3951517
 
 
 
0c63334
2b4ae64
3951517
 
 
2b4ae64
3951517
 
 
0c63334
2b4ae64
3951517
2b4ae64
3951517
 
2b4ae64
3951517
9cbe619
 
3951517
2b4ae64
 
3951517
 
 
 
12ac0fd
68acda4
9cbe619
3951517
9cbe619
2b4ae64
3951517
 
 
 
9cbe619
3951517
2b4ae64
 
 
 
 
fc68368
9cbe619
3951517
2b4ae64
12ac0fd
 
2b4ae64
 
 
 
 
3951517
 
 
 
 
 
fc68368
3951517
9cbe619
 
3951517
2b4ae64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc68368
2b4ae64
0c63334
3951517
 
 
 
 
2b4ae64
3951517
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
VeriRL EDA Evaluator.

Wraps iverilog, vvp, yosys, and optionally SymbiYosys to evaluate agent-submitted
Verilog. Supports multi-file projects: all methods accept either a single string
(backward-compatible) or a Dict[str, str] mapping filename → source.

Provides:
- Compilation checking via iverilog
- Functional simulation via iverilog + vvp against task testbenches
- Synthesis via yosys for area estimation
- Formal verification via SymbiYosys (optional, graceful degradation)
- Weighted per-task grading across compile / sim / timing / area / formal
"""

import base64
import os
import re
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Union

TOOL_TIMEOUT = 30   # seconds — prevents infinite loops in agent-submitted code
SYNTH_TIMEOUT = 60  # synthesis is slower than compilation
FORMAL_TIMEOUT = 90 # SymbiYosys can be slow on complex designs

# ---------------------------------------------------------------------------
# Result data classes
# ---------------------------------------------------------------------------


@dataclass
class CompilationResult:
    success: bool
    stdout: str
    stderr: str


@dataclass
class SimulationResult:
    success: bool
    stdout: str
    stderr: str
    tests_passed: int
    tests_total: int
    timing_cycles: Optional[int] = None


@dataclass
class SynthesisResult:
    success: bool
    stdout: str
    stderr: str
    cell_count: int = 0


@dataclass
class FormalResult:
    success: bool           # True if all properties proven
    stdout: str
    stderr: str
    properties_proven: int = 0
    properties_total: int = 0
    counterexample: Optional[str] = None  # description of first failing trace

    @property
    def score(self) -> float:
        if self.properties_total == 0:
            return 0.0
        return self.properties_proven / self.properties_total


@dataclass
class VisualizationResult:
    success: bool
    stdout: str
    stderr: str
    image_b64: Optional[str] = None  # base64-encoded PNG
    format: str = "png"


@dataclass
class EvalResult:
    compilation: CompilationResult
    simulation: Optional[SimulationResult] = None
    synthesis: Optional[SynthesisResult] = None
    formal: Optional[FormalResult] = None
    final_score: Optional[float] = None
    score_breakdown: Optional[Dict[str, float]] = None

    def to_agent_feedback(self) -> str:
        """Format evaluation results as human-readable feedback for the agent."""
        parts = []
        if not self.compilation.success:
            parts.append(f"Compilation FAILED:\n{self.compilation.stderr}")
            return "\n".join(parts)
        parts.append("Compilation: OK")
        if self.simulation:
            if self.simulation.tests_total > 0:
                parts.append(
                    f"Simulation: {self.simulation.tests_passed}/{self.simulation.tests_total} "
                    f"tests passed"
                )
            if self.simulation.timing_cycles is not None:
                parts.append(f"Timing: {self.simulation.timing_cycles} cycles")
            if self.simulation.stdout:
                parts.append(f"Sim output:\n{self.simulation.stdout}")
            if self.simulation.stderr:
                parts.append(f"Sim errors:\n{self.simulation.stderr}")
        if self.synthesis:
            parts.append(f"Synthesis: {self.synthesis.cell_count} cells")
        if self.formal:
            parts.append(
                f"Formal: {self.formal.properties_proven}/{self.formal.properties_total} "
                f"properties proven"
            )
            if self.formal.counterexample:
                parts.append(f"Counterexample:\n{self.formal.counterexample}")
        if self.final_score is not None:
            parts.append(f"Score: {self.final_score:.3f}")
            if self.score_breakdown:
                breakdown_str = ", ".join(
                    f"{k}={v:.2f}" for k, v in self.score_breakdown.items()
                )
                parts.append(f"Breakdown: {breakdown_str}")
        return "\n".join(parts)


# ---------------------------------------------------------------------------
# Per-task grading weights
# Tasks with a "formal" key require SymbiYosys; weight is redistributed if
# sby is not installed.
# ---------------------------------------------------------------------------

TASK_WEIGHTS: Dict[str, Dict[str, float]] = {
    "mac_unit": {
        "compile": 0.10,
        "sim": 0.60,
        "timing": 0.20,
        "area": 0.10,
    },
    "axi_fifo": {
        "compile": 0.10,
        "sim": 0.70,
        "area": 0.20,
    },
    "systolic_array": {
        "compile": 0.05,
        "sim": 0.50,
        "timing": 0.30,
        "area": 0.15,
    },
    # --- New tasks ---
    "relu_clip": {
        "compile": 0.10,
        "sim": 0.75,
        "formal": 0.10,
        "area": 0.05,
    },
    "barrel_shifter": {
        "compile": 0.10,
        "sim": 0.80,
        "area": 0.10,
    },
    "register_file": {
        "compile": 0.10,
        "sim": 0.70,
        "formal": 0.10,
        "area": 0.10,
    },
    "ring_buffer": {
        "compile": 0.10,
        "sim": 0.70,
        "area": 0.20,
    },
    "dot_product": {
        "compile": 0.10,
        "sim": 0.60,
        "timing": 0.20,
        "area": 0.10,
    },
    "fir_filter": {
        "compile": 0.10,
        "sim": 0.60,
        "timing": 0.20,
        "area": 0.10,
    },
    "fp16_adder": {
        "compile": 0.05,
        "sim": 0.60,
        "formal": 0.15,
        "area": 0.20,
    },
}

SYSTOLIC_TIMING_LIMIT = 10

# Yosys stat output uses cell names like $_DFF_P_, $_DFF_N_, $_SDFF_PP0_, etc.
# Each line looks like:  "    $_SDFF_PP0_                     8"
# Sum all DFF-family cell instance counts.
_DFF_CELL_RE = re.compile(r"\$_[A-Z]*DFF[A-Z0-9_]*\s+(\d+)", re.MULTILINE)


def _count_dffs(synth_stdout: str) -> int:
    """Return total DFF instance count from yosys stat output."""
    return sum(int(m) for m in _DFF_CELL_RE.findall(synth_stdout))


# ---------------------------------------------------------------------------
# Evaluator
# ---------------------------------------------------------------------------


class VerilogEvaluator:
    """
    Evaluates agent-submitted Verilog using real EDA tools.

    Accepts both single-file (str) and multi-file (Dict[str, str]) inputs.
    """

    def __init__(self, timeout: int = TOOL_TIMEOUT):
        self.timeout = timeout
        self._sby_available: Optional[bool] = None  # cached check

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _files_to_dict(
        self, source: Union[str, Dict[str, str]]
    ) -> Dict[str, str]:
        """Normalize single-string source to a filename dict."""
        if isinstance(source, str):
            return {"design.v": source}
        return dict(source)

    def _write_files_to_dir(
        self, files: Dict[str, str], tmpdir: str
    ) -> list:
        """Write all source files to tmpdir. Returns list of absolute paths."""
        paths = []
        for fname, src in files.items():
            # Sanitize filename — no path traversal
            safe_name = Path(fname).name
            fpath = os.path.join(tmpdir, safe_name)
            with open(fpath, "w") as f:
                f.write(src)
            paths.append(fpath)
        return paths

    def _check_sby(self) -> bool:
        """Check once whether sby (SymbiYosys) is installed."""
        if self._sby_available is None:
            self._sby_available = shutil.which("sby") is not None
        return self._sby_available

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def compile(
        self, source: Union[str, Dict[str, str]]
    ) -> CompilationResult:
        """Syntax-check Verilog using iverilog. Accepts single file or multi-file dict."""
        files = self._files_to_dict(source)
        with tempfile.TemporaryDirectory() as tmpdir:
            paths = self._write_files_to_dir(files, tmpdir)
            try:
                result = subprocess.run(
                    ["iverilog", "-o", "/dev/null", "-Wall"] + paths,
                    capture_output=True,
                    text=True,
                    timeout=self.timeout,
                )
                return CompilationResult(
                    success=(result.returncode == 0),
                    stdout=result.stdout,
                    stderr=result.stderr,
                )
            except subprocess.TimeoutExpired:
                return CompilationResult(
                    success=False,
                    stdout="",
                    stderr="ERROR: compilation timed out after 30s",
                )
            except FileNotFoundError:
                return CompilationResult(
                    success=False,
                    stdout="",
                    stderr="ERROR: iverilog not found — is it installed?",
                )

    def simulate(
        self,
        source: Union[str, Dict[str, str]],
        testbench_path: str,
    ) -> SimulationResult:
        """
        Compile agent files together with the task testbench and run with vvp.

        Testbenches use $display("PASS: ...") / $display("FAIL: ...") conventions.
        Counts lines to compute test score. Extracts "CYCLES: N" for timing.
        """
        files = self._files_to_dict(source)
        with tempfile.TemporaryDirectory() as tmpdir:
            paths = self._write_files_to_dir(files, tmpdir)
            sim_binary = os.path.join(tmpdir, "sim.vvp")
            try:
                compile_result = subprocess.run(
                    ["iverilog", "-o", sim_binary] + paths + [testbench_path],
                    capture_output=True,
                    text=True,
                    timeout=self.timeout,
                )
                if compile_result.returncode != 0:
                    return SimulationResult(
                        success=False,
                        stdout=compile_result.stdout,
                        stderr=compile_result.stderr,
                        tests_passed=0,
                        tests_total=0,
                    )
                sim_result = subprocess.run(
                    ["vvp", sim_binary],
                    capture_output=True,
                    text=True,
                    timeout=self.timeout,
                )
                stdout = sim_result.stdout
                passes = len(re.findall(r"^PASS:", stdout, re.MULTILINE))
                fails = len(re.findall(r"^FAIL:", stdout, re.MULTILINE))
                timing_match = re.search(r"CYCLES:\s*(\d+)", stdout)
                return SimulationResult(
                    success=(sim_result.returncode == 0),
                    stdout=stdout,
                    stderr=sim_result.stderr,
                    tests_passed=passes,
                    tests_total=passes + fails,
                    timing_cycles=int(timing_match.group(1)) if timing_match else None,
                )
            except subprocess.TimeoutExpired:
                return SimulationResult(
                    success=False,
                    stdout="",
                    stderr="ERROR: simulation timed out — possible infinite loop in submitted code",
                    tests_passed=0,
                    tests_total=0,
                )

    def synthesize(
        self, source: Union[str, Dict[str, str]]
    ) -> SynthesisResult:
        """
        Run Yosys synthesis. Uses 'synth -flatten; stat' for deterministic cell count.
        Accepts single file or multi-file dict.
        """
        files = self._files_to_dict(source)
        with tempfile.TemporaryDirectory() as tmpdir:
            paths = self._write_files_to_dir(files, tmpdir)
            read_cmds = " ".join(f"read_verilog {p};" for p in paths)
            synth_script = f"{read_cmds} synth -flatten; stat"
            try:
                result = subprocess.run(
                    ["yosys", "-p", synth_script],
                    capture_output=True,
                    text=True,
                    timeout=SYNTH_TIMEOUT,
                )
                combined = result.stdout + result.stderr
                cell_match = re.search(r"Number of cells:\s*(\d+)", combined)
                cell_count = int(cell_match.group(1)) if cell_match else 0
                return SynthesisResult(
                    success=(result.returncode == 0 and cell_count > 0),
                    stdout=result.stdout,
                    stderr=result.stderr,
                    cell_count=cell_count,
                )
            except subprocess.TimeoutExpired:
                return SynthesisResult(
                    success=False, stdout="", stderr="ERROR: synthesis timed out",
                    cell_count=0,
                )

    def visualize(
        self,
        source: Union[str, Dict[str, str]],
        top_module: Optional[str] = None,
    ) -> "VisualizationResult":
        """
        Generate a PNG netlist diagram using yosys show + graphviz dot.
        Returns base64-encoded PNG on success.
        Requires graphviz (dot) to be installed alongside yosys.
        """
        if not shutil.which("dot"):
            return VisualizationResult(
                success=False,
                stdout="",
                stderr="ERROR: graphviz 'dot' not found — install graphviz to enable visualization.",
            )

        files = self._files_to_dict(source)
        with tempfile.TemporaryDirectory() as tmpdir:
            paths = self._write_files_to_dir(files, tmpdir)
            out_prefix = os.path.join(tmpdir, "netlist")
            read_cmds = " ".join(f"read_verilog {p};" for p in paths)
            show_cmd = f"show -format png -prefix {out_prefix}"
            if top_module:
                show_cmd += f" {top_module}"
            synth_script = f"{read_cmds} proc; opt; {show_cmd}"
            try:
                result = subprocess.run(
                    ["yosys", "-p", synth_script],
                    capture_output=True,
                    text=True,
                    timeout=SYNTH_TIMEOUT,
                )
                png_path = f"{out_prefix}.png"
                if os.path.exists(png_path):
                    with open(png_path, "rb") as f:
                        img_b64 = base64.b64encode(f.read()).decode()
                    return VisualizationResult(
                        success=True,
                        stdout=result.stdout,
                        stderr=result.stderr,
                        image_b64=img_b64,
                        format="png",
                    )
                return VisualizationResult(
                    success=False,
                    stdout=result.stdout,
                    stderr=result.stderr or "ERROR: yosys produced no PNG output",
                )
            except subprocess.TimeoutExpired:
                return VisualizationResult(
                    success=False,
                    stdout="",
                    stderr="ERROR: visualization timed out",
                )

    def formal_verify(
        self,
        source: Union[str, Dict[str, str]],
        properties_path: str,
    ) -> FormalResult:
        """
        Run SymbiYosys formal verification on the agent's design + task properties.

        Returns a FormalResult with properties_proven / properties_total and any
        counterexample trace. Returns a graceful failure if sby is not installed.
        """
        if not self._check_sby():
            return FormalResult(
                success=False,
                stdout="",
                stderr="SymbiYosys (sby) not installed — formal verification skipped.",
                properties_proven=0,
                properties_total=0,
            )

        files = self._files_to_dict(source)
        with tempfile.TemporaryDirectory() as tmpdir:
            design_paths = self._write_files_to_dir(files, tmpdir)
            props_dest = os.path.join(tmpdir, "properties.sv")

            # Copy properties file
            import shutil as _shutil
            _shutil.copy(properties_path, props_dest)

            # Parse the top module name from the properties file itself.
            # Cannot use the file stem ("properties") — the module name is
            # defined inside, e.g. "module relu_clip_formal #(...)".
            props_text = Path(properties_path).read_text()
            top_match = re.search(r"^\s*module\s+(\w+)", props_text, re.MULTILINE)
            top_module = top_match.group(1) if top_match else Path(properties_path).stem

            # Build sby config
            read_cmds = "\n".join(f"read -formal {p}" for p in design_paths)
            sby_config = f"""\
[options]
mode prove
depth 20

[engines]
smtbmc

[script]
{read_cmds}
read -formal {props_dest}
prep -top {top_module}

"""
            sby_file = os.path.join(tmpdir, "verify.sby")
            with open(sby_file, "w") as f:
                f.write(sby_config)

            try:
                result = subprocess.run(
                    ["sby", "-f", sby_file],
                    capture_output=True,
                    text=True,
                    timeout=FORMAL_TIMEOUT,
                    cwd=tmpdir,
                )
                combined = result.stdout + result.stderr

                # Parse sby output for PASS / FAIL lines
                proven = len(re.findall(r"PASS", combined, re.IGNORECASE))
                failed_matches = re.findall(r"FAIL", combined, re.IGNORECASE)

                # Heuristic: count unique assertion checks mentioned
                assert_checks = re.findall(r"assert\s+\w+", combined)
                total = max(proven + len(failed_matches), len(assert_checks), 1)

                counterexample = None
                if failed_matches:
                    # Extract first counterexample trace (after "Counterexample:")
                    cex_match = re.search(
                        r"(Counterexample.*?)(?=\n\n|\Z)", combined, re.DOTALL
                    )
                    if cex_match:
                        counterexample = cex_match.group(1)[:500]  # cap at 500 chars

                success = (result.returncode == 0) and not failed_matches

                return FormalResult(
                    success=success,
                    stdout=result.stdout[:2000],
                    stderr=result.stderr[:500],
                    properties_proven=proven if success else max(0, proven - len(failed_matches)),
                    properties_total=total,
                    counterexample=counterexample,
                )

            except subprocess.TimeoutExpired:
                return FormalResult(
                    success=False,
                    stdout="",
                    stderr="ERROR: formal verification timed out after 90s",
                    properties_proven=0,
                    properties_total=1,
                )

    def grade(
        self,
        source: Union[str, Dict[str, str]],
        task_id: str,
        testbench_path: str,
        reference_cells: int,
        properties_path: Optional[str] = None,
    ) -> EvalResult:
        """
        Full grading pipeline: compile → simulate → synthesize → [formal] → weighted score.

        Args:
            source: Agent-submitted Verilog (str or Dict[filename, src])
            task_id: Task identifier
            testbench_path: Absolute path to the task testbench file
            reference_cells: Reference synthesis cell count for area scoring
            properties_path: Optional path to .sv formal properties file

        Returns:
            EvalResult with final_score in [0.01, 0.99] and all breakdown values
        """
        files = self._files_to_dict(source)
        primary_src = "\n".join(files.values())

        weights = dict(TASK_WEIGHTS.get(task_id, {"compile": 1.0}))
        breakdown: Dict[str, float] = {k: 0.0 for k in weights}

        # If formal is in weights but sby is unavailable (or no properties file),
        # redistribute formal weight to sim.
        if "formal" in weights:
            formal_avail = bool(properties_path) and self._check_sby()
            if not formal_avail:
                weights["sim"] = weights.get("sim", 0) + weights.pop("formal")
                breakdown.pop("formal", None)
                breakdown = {k: 0.0 for k in weights}

        def _clamp(d: Dict[str, float]) -> Dict[str, float]:
            return {k: max(0.01, min(0.99, v)) for k, v in d.items()}

        if not primary_src.strip():
            return EvalResult(
                compilation=CompilationResult(
                    success=False, stdout="", stderr="Empty submission"
                ),
                final_score=0.01,
                score_breakdown=_clamp(breakdown),
            )

        # Step 1: Compilation
        comp = self.compile(files)
        if not comp.success:
            return EvalResult(
                compilation=comp,
                final_score=0.01,
                score_breakdown=_clamp(breakdown),
            )
        breakdown["compile"] = 0.99

        # Step 2: Simulation
        sim = self.simulate(files, testbench_path)
        if sim.tests_total > 0:
            ratio = sim.tests_passed / sim.tests_total
            breakdown["sim"] = max(0.01, min(ratio, 0.99))

        # Step 3: Synthesis (always run if code compiles)
        synth = self.synthesize(files)

        # Step 4: Timing scoring (task-specific)
        if "timing" in breakdown:
            if task_id == "mac_unit" and synth is not None:
                dff_count = _count_dffs(synth.stdout)
                if dff_count >= 2:
                    breakdown["timing"] = 0.99
                elif synth.cell_count >= 10:
                    breakdown["timing"] = 0.50

            elif task_id == "systolic_array":
                if sim.timing_cycles is not None:
                    cycles = sim.timing_cycles
                    if cycles <= SYSTOLIC_TIMING_LIMIT:
                        breakdown["timing"] = 0.99
                    elif cycles <= SYSTOLIC_TIMING_LIMIT + 3:
                        breakdown["timing"] = max(
                            0.01, min(1.0 - (cycles - SYSTOLIC_TIMING_LIMIT) * 0.2, 0.99)
                        )
                    elif sim.tests_passed > 0:
                        breakdown["timing"] = 0.20
                elif sim.tests_passed > 0:
                    breakdown["timing"] = 0.20

            elif task_id in ("dot_product", "fir_filter"):
                dff_count = _count_dffs(synth.stdout)
                if dff_count >= 2:
                    breakdown["timing"] = 0.99
                elif synth.cell_count >= 5:
                    breakdown["timing"] = 0.40

        # Step 5: Area scoring (gated on functional correctness)
        if (
            "area" in breakdown
            and synth is not None
            and synth.success
            and synth.cell_count > 0
            and reference_cells > 0
            and sim.tests_passed > 0
        ):
            ratio = reference_cells / synth.cell_count
            breakdown["area"] = max(0.01, min(ratio, 0.99))

        # Step 6: Formal verification (optional)
        formal_result = None
        if "formal" in breakdown and properties_path:
            formal_result = self.formal_verify(files, properties_path)
            breakdown["formal"] = max(0.01, min(formal_result.score, 0.99))

        breakdown = _clamp(breakdown)

        # Recompute weights for any keys that may have been redistributed
        active_weights = {k: weights[k] for k in weights if k in breakdown}
        weight_sum = sum(active_weights.values())
        if weight_sum > 0:
            norm_weights = {k: v / weight_sum for k, v in active_weights.items()}
        else:
            norm_weights = active_weights

        raw_score = round(sum(norm_weights[k] * breakdown[k] for k in norm_weights), 4)
        final_score = max(0.01, min(0.99, raw_score))

        return EvalResult(
            compilation=comp,
            simulation=sim,
            synthesis=synth,
            formal=formal_result,
            final_score=final_score,
            score_breakdown=breakdown,
        )