| | import enum |
| | import json |
| | import re |
| | from dataclasses import asdict, dataclass, field |
| | from typing import Any, Dict, List, Optional |
| |
|
| |
|
| | class FailureClass(str, enum.Enum): |
| | EDA_TOOL_ERROR = "EDA_TOOL_ERROR" |
| | LLM_FORMAT_ERROR = "LLM_FORMAT_ERROR" |
| | LLM_SEMANTIC_ERROR = "LLM_SEMANTIC_ERROR" |
| | ORCHESTRATOR_ROUTING_ERROR = "ORCHESTRATOR_ROUTING_ERROR" |
| | RETRY_BUDGET_ERROR = "RETRY_BUDGET_ERROR" |
| | INFRASTRUCTURE_ERROR = "INFRASTRUCTURE_ERROR" |
| | UNKNOWN = "UNKNOWN" |
| |
|
| |
|
| | class StageStatus(str, enum.Enum): |
| | PASS = "PASS" |
| | FAIL = "FAIL" |
| | RETRY = "RETRY" |
| | SKIP = "SKIP" |
| | ERROR = "ERROR" |
| |
|
| |
|
| | @dataclass |
| | class ArtifactRef: |
| | key: str |
| | producer: str |
| | consumer: str = "" |
| | required: bool = False |
| | blocking: bool = False |
| | value: Any = None |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | return asdict(self) |
| |
|
| |
|
| | @dataclass |
| | class FailureRecord: |
| | failure_class: FailureClass |
| | producer: str |
| | message: str |
| | diagnostics: List[str] = field(default_factory=list) |
| | raw_excerpt: str = "" |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | data = asdict(self) |
| | data["failure_class"] = self.failure_class.value |
| | return data |
| |
|
| |
|
| | @dataclass |
| | class AgentResult: |
| | agent: str |
| | ok: bool |
| | producer: str |
| | payload: Dict[str, Any] = field(default_factory=dict) |
| | diagnostics: List[str] = field(default_factory=list) |
| | failure_class: FailureClass = FailureClass.UNKNOWN |
| | raw_output: str = "" |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | data = asdict(self) |
| | data["failure_class"] = self.failure_class.value |
| | return data |
| |
|
| |
|
| | @dataclass |
| | class StageResult: |
| | stage: str |
| | status: StageStatus |
| | producer: str |
| | failure_class: FailureClass = FailureClass.UNKNOWN |
| | consumable_payload: Dict[str, Any] = field(default_factory=dict) |
| | diagnostics: List[str] = field(default_factory=list) |
| | artifacts_written: List[str] = field(default_factory=list) |
| | next_action: str = "" |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | data = asdict(self) |
| | data["status"] = self.status.value |
| | data["failure_class"] = self.failure_class.value |
| | return data |
| |
|
| |
|
| | _JSON_FENCE_RE = re.compile(r"```(?:json)?\s*(\{.*?\})\s*```", re.DOTALL) |
| |
|
| |
|
| | def extract_json_object(raw_text: str) -> Optional[Dict[str, Any]]: |
| | if not raw_text: |
| | return None |
| | match = _JSON_FENCE_RE.search(raw_text) |
| | candidates = [match.group(1)] if match else [] |
| | stripped = raw_text.strip() |
| | if stripped.startswith("{") and stripped.endswith("}"): |
| | candidates.append(stripped) |
| | first = stripped.find("{") |
| | last = stripped.rfind("}") |
| | if first != -1 and last != -1 and last > first: |
| | candidates.append(stripped[first:last + 1]) |
| | for candidate in candidates: |
| | try: |
| | parsed = json.loads(candidate) |
| | except json.JSONDecodeError: |
| | continue |
| | if isinstance(parsed, dict): |
| | return parsed |
| | return None |
| |
|
| |
|
| | def validate_agent_payload(payload: Dict[str, Any], required_keys: List[str]) -> List[str]: |
| | errors: List[str] = [] |
| | if not isinstance(payload, dict): |
| | return ["Payload is not a JSON object."] |
| | for key in required_keys: |
| | if key not in payload: |
| | errors.append(f"Missing required key '{key}'.") |
| | return errors |
| |
|
| |
|
| | def infer_failure_class( |
| | *, |
| | producer: str, |
| | raw_output: str = "", |
| | diagnostics: Optional[List[str]] = None, |
| | tool_result: Optional[Dict[str, Any]] = None, |
| | ) -> FailureClass: |
| | diag_text = "\n".join(diagnostics or []) |
| | text = f"{raw_output}\n{diag_text}".lower() |
| | if tool_result: |
| | if tool_result.get("infra_failure"): |
| | return FailureClass.INFRASTRUCTURE_ERROR |
| | if tool_result.get("tool"): |
| | return FailureClass.EDA_TOOL_ERROR |
| | if "not valid json" in text or "missing required key" in text or "prose" in text: |
| | return FailureClass.LLM_FORMAT_ERROR |
| | if "timed out" in text or "tool missing" in text or "binary not found" in text: |
| | return FailureClass.INFRASTRUCTURE_ERROR |
| | if "cannot find" in text or "%error" in text or "warning" in text or "yosys" in text or "verilator" in text: |
| | return FailureClass.EDA_TOOL_ERROR |
| | if "handoff" in text or "missing artifact" in text or "routing" in text: |
| | return FailureClass.ORCHESTRATOR_ROUTING_ERROR |
| | if "retry" in text and "budget" in text: |
| | return FailureClass.RETRY_BUDGET_ERROR |
| | if producer.startswith("llm") or producer.startswith("agent"): |
| | return FailureClass.LLM_SEMANTIC_ERROR |
| | return FailureClass.UNKNOWN |
| |
|
| |
|
| | def materially_changed(before: str, after: str) -> bool: |
| | if before == after: |
| | return False |
| | if not before or not after: |
| | return True |
| | before_norm = "\n".join(line.rstrip() for line in before.splitlines()).strip() |
| | after_norm = "\n".join(line.rstrip() for line in after.splitlines()).strip() |
| | return before_norm != after_norm |
| |
|