| """ |
| Self-Reflect Pipeline β Autonomous Retry with Reflection |
| ========================================================== |
| |
| Implements the "Self-Reflect and Retry" pattern for the AgentIC pipeline. |
| |
| When OpenLane synthesis/hardening fails, the system: |
| 1. Captures the failure log and error category |
| 2. Reflects on WHY it failed (structured root-cause analysis) |
| 3. Generates a corrective action plan |
| 4. Applies the fix and retries (up to 5 times) |
| 5. Tracks a convergence history to avoid repeating failed approaches |
| |
| The reflection uses gradient feedback from: |
| - Synthesis timing (WNS/TNS) |
| - Area utilization |
| - DRC/LVS violations |
| - Routing congestion |
| - Formal property results |
| """ |
|
|
| import os |
| import re |
| import json |
| import time |
| import hashlib |
| import logging |
| from dataclasses import dataclass, field, asdict |
| from typing import Any, Callable, Dict, List, Optional, Tuple |
| from enum import Enum |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
|
|
| class FailureCategory(Enum): |
| """Categories of failures the self-reflect pipeline handles.""" |
| SYNTAX_ERROR = "syntax_error" |
| SIMULATION_FAIL = "simulation_fail" |
| FORMAL_PROPERTY_FAIL = "formal_property_fail" |
| SYNTHESIS_ERROR = "synthesis_error" |
| TIMING_VIOLATION = "timing_violation" |
| ROUTING_CONGESTION = "routing_congestion" |
| DRC_VIOLATION = "drc_violation" |
| LVS_MISMATCH = "lvs_mismatch" |
| AREA_OVERFLOW = "area_overflow" |
| POWER_VIOLATION = "power_violation" |
| UNKNOWN = "unknown" |
|
|
|
|
| @dataclass |
| class FailureAnalysis: |
| """Structured analysis of a pipeline failure.""" |
| category: FailureCategory |
| error_message: str |
| root_cause: str |
| impact: str |
| similar_past_failures: int |
| is_repeating: bool |
|
|
|
|
| @dataclass |
| class CorrectionAction: |
| """A fix action to attempt.""" |
| action_type: str |
| description: str |
| target_file: str = "" |
| parameters: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class ReflectionEntry: |
| """A single self-reflection cycle.""" |
| attempt: int |
| failure: FailureAnalysis |
| reflection: str |
| proposed_actions: List[CorrectionAction] |
| outcome: str = "" |
| timestamp: float = field(default_factory=time.time) |
| metrics_before: Dict[str, Any] = field(default_factory=dict) |
| metrics_after: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class ConvergenceMetrics: |
| """Metrics tracked across retry iterations for convergence analysis.""" |
| wns: float = 0.0 |
| tns: float = 0.0 |
| area_um2: float = 0.0 |
| power_w: float = 0.0 |
| congestion_pct: float = 0.0 |
| drc_count: int = 0 |
| lvs_ok: bool = False |
| formal_pass: bool = False |
| sim_pass: bool = False |
|
|
| def is_improving(self, previous: "ConvergenceMetrics") -> bool: |
| """Check if metrics are trending in the right direction.""" |
| improvements = 0 |
| regressions = 0 |
| |
| if self.wns > previous.wns: |
| improvements += 1 |
| elif self.wns < previous.wns: |
| regressions += 1 |
| |
| if self.drc_count < previous.drc_count: |
| improvements += 1 |
| elif self.drc_count > previous.drc_count: |
| regressions += 1 |
| |
| if self.congestion_pct < previous.congestion_pct: |
| improvements += 1 |
| elif self.congestion_pct > previous.congestion_pct: |
| regressions += 1 |
| |
| if self.sim_pass and not previous.sim_pass: |
| improvements += 1 |
| if self.formal_pass and not previous.formal_pass: |
| improvements += 1 |
|
|
| return improvements > regressions |
|
|
| def to_dict(self) -> dict: |
| return asdict(self) |
|
|
|
|
| |
|
|
| SELF_REFLECT_PROMPT = """\ |
| You are a Self-Reflecting VLSI Agent. A pipeline stage has FAILED. |
| |
| Your job is to: |
| 1. ANALYZE the failure β identify the root cause |
| 2. REFLECT on whether this is a repeating pattern |
| 3. PROPOSE concrete corrective actions |
| 4. ASSESS the risk of each action |
| |
| FAILURE CONTEXT: |
| Category: {category} |
| Error: {error_message} |
| Attempt: {attempt}/{max_attempts} |
| |
| CONVERGENCE HISTORY: |
| {convergence_history} |
| |
| PREVIOUS REFLECTIONS (do NOT repeat the same fix): |
| {previous_reflections} |
| |
| CURRENT RTL SUMMARY: |
| {rtl_summary} |
| |
| Respond in this EXACT format: |
| ROOT_CAUSE: <one sentence> |
| REFLECTION: <2-3 sentences about what went wrong and why> |
| ACTION_1: <type>|<description>|<target_file> |
| ACTION_2: <type>|<description>|<target_file> |
| RISK_ASSESSMENT: <one sentence about what could go wrong with these fixes> |
| CONVERGENCE_TREND: IMPROVING | STAGNATING | DIVERGING |
| """ |
|
|
|
|
| |
|
|
| class SelfReflectPipeline: |
| """ |
| Self-reflection retry pipeline for OpenLane synthesis convergence. |
| |
| When any stage fails, the pipeline: |
| 1. Categorizes the failure |
| 2. Reflects on root cause (using LLM) |
| 3. Proposes and applies corrective actions |
| 4. Retries up to max_retries times |
| 5. Tracks convergence to detect stagnation |
| |
| The reflection history prevents the agent from repeating the same |
| failed approach β each retry must try something different. |
| """ |
|
|
| def __init__( |
| self, |
| llm, |
| max_retries: int = 5, |
| verbose: bool = False, |
| on_reflection: Optional[Callable] = None, |
| ): |
| self.llm = llm |
| self.max_retries = max_retries |
| self.verbose = verbose |
| self.on_reflection = on_reflection |
|
|
| self.reflections: List[ReflectionEntry] = [] |
| self.convergence_history: List[ConvergenceMetrics] = [] |
| self.failure_fingerprints: Dict[str, int] = {} |
|
|
| def run_with_retry( |
| self, |
| stage_name: str, |
| action_fn: Callable[[], Tuple[bool, str, Dict[str, Any]]], |
| fix_fn: Callable[[CorrectionAction], bool], |
| rtl_summary: str = "", |
| ) -> Tuple[bool, str, List[ReflectionEntry]]: |
| """ |
| Execute a pipeline stage with self-reflective retry. |
| |
| Args: |
| stage_name: Human-readable stage name (e.g., "OpenLane Hardening") |
| action_fn: The stage function. Returns (success, error_msg, metrics_dict) |
| fix_fn: Function that applies a CorrectionAction. Returns True if applied |
| rtl_summary: Current RTL code summary for context |
| |
| Returns: |
| (success, final_message, reflection_history) |
| """ |
| logger.info(f"[SelfReflect] Starting {stage_name} with up to {self.max_retries} retries") |
|
|
| for attempt in range(1, self.max_retries + 1): |
| logger.info(f"[SelfReflect] {stage_name} attempt {attempt}/{self.max_retries}") |
|
|
| |
| try: |
| success, error_msg, metrics = action_fn() |
| except Exception as e: |
| success = False |
| error_msg = f"Stage exception: {str(e)}" |
| metrics = {} |
|
|
| |
| cm = self._parse_metrics(metrics) |
| self.convergence_history.append(cm) |
|
|
| if success: |
| logger.info(f"[SelfReflect] {stage_name} PASSED on attempt {attempt}") |
| return True, f"Passed on attempt {attempt}", self.reflections |
|
|
| |
| fp = self._fingerprint(error_msg) |
| self.failure_fingerprints[fp] = self.failure_fingerprints.get(fp, 0) + 1 |
| is_repeating = self.failure_fingerprints[fp] >= 2 |
|
|
| |
| category = self._categorize_failure(error_msg) |
| analysis = FailureAnalysis( |
| category=category, |
| error_message=error_msg[:2000], |
| root_cause="", |
| impact="", |
| similar_past_failures=self.failure_fingerprints[fp], |
| is_repeating=is_repeating, |
| ) |
|
|
| |
| reflection_entry = self._reflect( |
| analysis, attempt, rtl_summary |
| ) |
| self.reflections.append(reflection_entry) |
|
|
| |
| if self.on_reflection: |
| try: |
| self.on_reflection({ |
| "type": "self_reflection", |
| "stage": stage_name, |
| "attempt": attempt, |
| "category": category.value, |
| "reflection": reflection_entry.reflection, |
| "actions": [a.description for a in reflection_entry.proposed_actions], |
| }) |
| except Exception: |
| pass |
|
|
| |
| if attempt >= 3 and self._is_diverging(): |
| logger.warning(f"[SelfReflect] Convergence diverging after {attempt} attempts β aborting") |
| return False, f"Diverging after {attempt} attempts β aborting", self.reflections |
|
|
| |
| applied_any = False |
| for action in reflection_entry.proposed_actions: |
| try: |
| if fix_fn(action): |
| applied_any = True |
| logger.info(f"[SelfReflect] Applied fix: {action.description}") |
| except Exception as e: |
| logger.warning(f"[SelfReflect] Fix failed: {action.description}: {e}") |
|
|
| if not applied_any: |
| logger.warning(f"[SelfReflect] No fixes could be applied on attempt {attempt}") |
|
|
| return False, f"Failed after {self.max_retries} attempts", self.reflections |
|
|
| def _categorize_failure(self, error_msg: str) -> FailureCategory: |
| """Categorize a failure based on error message patterns.""" |
| msg = error_msg.lower() |
|
|
| patterns = [ |
| (r"syntax error|parse error|unexpected token", FailureCategory.SYNTAX_ERROR), |
| (r"test failed|simulation.*fail|mismatch", FailureCategory.SIMULATION_FAIL), |
| (r"assert.*fail|property.*fail|formal.*fail", FailureCategory.FORMAL_PROPERTY_FAIL), |
| (r"synthesis.*error|synth.*fail|yosys.*error", FailureCategory.SYNTHESIS_ERROR), |
| (r"timing|slack|wns|tns|setup.*violation|hold.*violation", FailureCategory.TIMING_VIOLATION), |
| (r"congestion|overflow|routing.*fail", FailureCategory.ROUTING_CONGESTION), |
| (r"drc.*violation|design rule", FailureCategory.DRC_VIOLATION), |
| (r"lvs.*mismatch|layout.*vs.*schematic", FailureCategory.LVS_MISMATCH), |
| (r"area.*overflow|die.*area|utilization.*exceed", FailureCategory.AREA_OVERFLOW), |
| (r"power.*violation|power.*exceed|ir.*drop", FailureCategory.POWER_VIOLATION), |
| ] |
|
|
| for pattern, category in patterns: |
| if re.search(pattern, msg): |
| return category |
|
|
| return FailureCategory.UNKNOWN |
|
|
| def _reflect( |
| self, |
| analysis: FailureAnalysis, |
| attempt: int, |
| rtl_summary: str, |
| ) -> ReflectionEntry: |
| """Run LLM self-reflection on the failure.""" |
| |
| conv_lines = [] |
| for i, cm in enumerate(self.convergence_history): |
| conv_lines.append( |
| f" [{i+1}] WNS={cm.wns:.3f}ns DRC={cm.drc_count} " |
| f"cong={cm.congestion_pct:.1f}% sim={'PASS' if cm.sim_pass else 'FAIL'}" |
| ) |
| conv_str = "\n".join(conv_lines[-5:]) or " No history yet" |
|
|
| |
| prev_lines = [] |
| for r in self.reflections[-3:]: |
| prev_lines.append( |
| f" [Attempt {r.attempt}] {r.failure.category.value}: " |
| f"{r.reflection[:100]}... β {r.outcome}" |
| ) |
| prev_str = "\n".join(prev_lines) or " No previous reflections" |
|
|
| prompt = SELF_REFLECT_PROMPT.format( |
| category=analysis.category.value, |
| error_message=analysis.error_message[:1500], |
| attempt=attempt, |
| max_attempts=self.max_retries, |
| convergence_history=conv_str, |
| previous_reflections=prev_str, |
| rtl_summary=rtl_summary[:3000], |
| ) |
|
|
| |
| try: |
| from crewai import Agent, Task, Crew |
|
|
| agent = Agent( |
| role="Self-Reflecting VLSI Agent", |
| goal="Analyze the failure and propose corrective actions", |
| backstory=( |
| "You are an expert at diagnosing ASIC design failures. " |
| "You analyze error patterns, identify root causes, and propose " |
| "targeted fixes. You never repeat a fix that already failed." |
| ), |
| llm=self.llm, |
| verbose=self.verbose, |
| ) |
|
|
| task = Task( |
| description=prompt, |
| expected_output="ROOT_CAUSE, REFLECTION, ACTION_1, ACTION_2, RISK_ASSESSMENT, CONVERGENCE_TREND", |
| agent=agent, |
| ) |
|
|
| raw = str(Crew(agents=[agent], tasks=[task]).kickoff()) |
| return self._parse_reflection(raw, analysis, attempt) |
|
|
| except Exception as e: |
| logger.warning(f"[SelfReflect] LLM reflection failed: {e}") |
| return self._fallback_reflection(analysis, attempt) |
|
|
| def _parse_reflection( |
| self, raw: str, analysis: FailureAnalysis, attempt: int |
| ) -> ReflectionEntry: |
| """Parse LLM reflection response.""" |
| |
| m = re.search(r'ROOT_CAUSE\s*:\s*(.+?)(?:\n|$)', raw, re.IGNORECASE) |
| root_cause = m.group(1).strip() if m else "Unknown" |
| analysis.root_cause = root_cause |
|
|
| |
| m = re.search(r'REFLECTION\s*:\s*(.+?)(?=ACTION|\Z)', raw, |
| re.DOTALL | re.IGNORECASE) |
| reflection = m.group(1).strip() if m else "Analysis inconclusive" |
|
|
| |
| actions: List[CorrectionAction] = [] |
| for i in (1, 2, 3): |
| m = re.search(rf'ACTION_{i}\s*:\s*(.+?)(?:\n|$)', raw, re.IGNORECASE) |
| if m: |
| parts = m.group(1).strip().split("|") |
| action_type = parts[0].strip() if len(parts) > 0 else "modify_rtl" |
| desc = parts[1].strip() if len(parts) > 1 else parts[0].strip() |
| target = parts[2].strip() if len(parts) > 2 else "" |
| actions.append(CorrectionAction( |
| action_type=action_type, |
| description=desc, |
| target_file=target, |
| )) |
|
|
| if not actions: |
| |
| actions = self._default_actions(analysis.category) |
|
|
| return ReflectionEntry( |
| attempt=attempt, |
| failure=analysis, |
| reflection=reflection, |
| proposed_actions=actions, |
| ) |
|
|
| def _fallback_reflection( |
| self, analysis: FailureAnalysis, attempt: int |
| ) -> ReflectionEntry: |
| """Generate fallback reflection when LLM is unavailable.""" |
| actions = self._default_actions(analysis.category) |
| return ReflectionEntry( |
| attempt=attempt, |
| failure=analysis, |
| reflection=f"Fallback reflection: {analysis.category.value} detected", |
| proposed_actions=actions, |
| ) |
|
|
| def _default_actions(self, category: FailureCategory) -> List[CorrectionAction]: |
| """Generate default corrective actions based on failure category.""" |
| defaults = { |
| FailureCategory.SYNTAX_ERROR: [ |
| CorrectionAction("modify_rtl", "Fix Verilog syntax errors"), |
| ], |
| FailureCategory.SIMULATION_FAIL: [ |
| CorrectionAction("modify_rtl", "Fix RTL logic to match expected behavior"), |
| CorrectionAction("modify_rtl", "Adjust testbench timing and reset sequence"), |
| ], |
| FailureCategory.TIMING_VIOLATION: [ |
| CorrectionAction("adjust_config", "Increase clock period"), |
| CorrectionAction("modify_rtl", "Pipeline critical path"), |
| ], |
| FailureCategory.ROUTING_CONGESTION: [ |
| CorrectionAction("adjust_config", "Reduce utilization target"), |
| CorrectionAction("adjust_config", "Increase die area by 20%"), |
| ], |
| FailureCategory.DRC_VIOLATION: [ |
| CorrectionAction("adjust_config", "Reduce placement density"), |
| CorrectionAction("adjust_config", "Enable DRC repair scripts"), |
| ], |
| FailureCategory.AREA_OVERFLOW: [ |
| CorrectionAction("adjust_config", "Increase die area"), |
| CorrectionAction("modify_rtl", "Reduce design complexity"), |
| ], |
| } |
| return defaults.get(category, [ |
| CorrectionAction("modify_rtl", "General RTL fix based on error log"), |
| ]) |
|
|
| def _parse_metrics(self, metrics: Dict[str, Any]) -> ConvergenceMetrics: |
| """Parse raw metrics dict into ConvergenceMetrics.""" |
| return ConvergenceMetrics( |
| wns=float(metrics.get("wns", 0)), |
| tns=float(metrics.get("tns", 0)), |
| area_um2=float(metrics.get("area_um2", 0)), |
| power_w=float(metrics.get("power_w", 0)), |
| congestion_pct=float(metrics.get("congestion_pct", 0)), |
| drc_count=int(metrics.get("drc_count", 0)), |
| lvs_ok=bool(metrics.get("lvs_ok", False)), |
| formal_pass=bool(metrics.get("formal_pass", False)), |
| sim_pass=bool(metrics.get("sim_pass", False)), |
| ) |
|
|
| def _fingerprint(self, error_msg: str) -> str: |
| """Generate a fingerprint for deduplicating errors.""" |
| |
| normalized = re.sub(r'\d+', 'N', error_msg[:500]) |
| normalized = re.sub(r'/[\w/]+\.', 'FILE.', normalized) |
| return hashlib.sha256(normalized.encode()).hexdigest()[:16] |
|
|
| def _is_diverging(self) -> bool: |
| """Check if the convergence history shows divergence (getting worse).""" |
| if len(self.convergence_history) < 3: |
| return False |
| |
| recent = self.convergence_history[-3:] |
| |
| |
| if all(recent[i].drc_count >= recent[i-1].drc_count |
| for i in range(1, len(recent))) and recent[-1].drc_count > 0: |
| return True |
| |
| |
| if all(recent[i].wns <= recent[i-1].wns |
| for i in range(1, len(recent))) and recent[-1].wns < -1.0: |
| return True |
|
|
| return False |
|
|
| def get_summary(self) -> str: |
| """Get a human-readable summary of the reflection history.""" |
| if not self.reflections: |
| return "No reflections recorded." |
|
|
| lines = [f"Self-Reflection Summary ({len(self.reflections)} attempts):"] |
| for r in self.reflections: |
| lines.append( |
| f" [{r.attempt}] {r.failure.category.value}: {r.reflection[:80]}... " |
| f"β {r.outcome or 'pending'}" |
| ) |
| |
| if self.convergence_history: |
| last = self.convergence_history[-1] |
| lines.append( |
| f" Latest metrics: WNS={last.wns:.3f} DRC={last.drc_count} " |
| f"cong={last.congestion_pct:.1f}%" |
| ) |
|
|
| return "\n".join(lines) |
|
|