| from __future__ import annotations |
|
|
| import csv |
| import html |
| import io |
| import json |
| import re |
| from typing import Any, Dict, Iterable, List, Sequence, Tuple |
|
|
| from .def_translator import DEFGridConfig |
| from .design_importer import parse_design_def |
| from .eda_view import parse_def_blockages, parse_eda_text |
| from .severity_mapper import SeverityConfig |
|
|
|
|
| _LEF_SITE_BLOCK_RE = re.compile(r"(?is)\bSITE\s+([A-Za-z0-9_.$/-]+)(.*?)END\s+\1\b") |
| _LEF_MACRO_BLOCK_RE = re.compile(r"(?is)\bMACRO\s+([A-Za-z0-9_.$/-]+)(.*?)END\s+\1\b") |
| _LEF_SIZE_RE = re.compile(r"(?is)\bSIZE\s+([0-9.+-eE]+)\s+BY\s+([0-9.+-eE]+)\s*;") |
| _LEF_SITE_REF_RE = re.compile(r"(?is)\bSITE\s+([A-Za-z0-9_.$/-]+)\s*;") |
| _LIB_LIBRARY_RE = re.compile(r"(?is)\blibrary\s*\(\s*([^)]+)\s*\)") |
| _LIB_CELL_RE = re.compile(r"(?is)\bcell\s*\(\s*([^)]+)\s*\)\s*\{(.*?)\n\s*\}") |
| _LIB_AREA_RE = re.compile(r"(?is)\barea\s*:\s*([0-9.+-eE]+)\s*;") |
| _LIB_COMMENT_RE = re.compile(r'(?is)\bcomment\s*:\s*"([^"]*)"\s*;') |
|
|
|
|
| def _float_close(a: Any, b: Any, tol: float = 1e-5) -> bool: |
| if a in (None, "") and b in (None, ""): |
| return True |
| try: |
| return abs(float(a) - float(b)) <= float(tol) |
| except Exception: |
| return False |
|
|
|
|
| def _status_rank(status: str) -> int: |
| text = str(status or "").strip().lower() |
| if text == "fail": |
| return 2 |
| if text == "warn": |
| return 1 |
| return 0 |
|
|
|
|
| def _final_status(checks: Sequence[Dict[str, Any]]) -> str: |
| rank = 0 |
| for check in checks: |
| rank = max(rank, _status_rank(str(check.get("status", "pass")))) |
| if rank >= 2: |
| return "FAIL" |
| if rank == 1: |
| return "WARN" |
| return "PASS" |
|
|
|
|
| def _add_check( |
| checks: List[Dict[str, Any]], |
| *, |
| status: str, |
| code: str, |
| artifact: str, |
| message: str, |
| targets: Sequence[Any] | None = None, |
| ) -> None: |
| checks.append( |
| { |
| "status": str(status).upper(), |
| "code": str(code), |
| "artifact": str(artifact), |
| "message": str(message), |
| "targets": [str(item) for item in (targets or []) if str(item).strip()], |
| } |
| ) |
|
|
|
|
| def _safe_int(value: Any) -> int | None: |
| text = str(value).strip() |
| if text == "": |
| return None |
| try: |
| return int(float(text)) |
| except Exception: |
| return None |
|
|
|
|
| def _safe_float(value: Any) -> float | None: |
| text = str(value).strip() |
| if text == "": |
| return None |
| try: |
| return float(text) |
| except Exception: |
| return None |
|
|
|
|
| def parse_metrics_csv_text(text: str) -> List[Dict[str, Any]]: |
| raw = str(text or "").strip() |
| if not raw: |
| raise ValueError("Metrics CSV is empty.") |
| reader = csv.DictReader(io.StringIO(raw)) |
| required = [ |
| "qubit", |
| "activity_count", |
| "activity_norm", |
| "gate_error", |
| "readout_error", |
| "fidelity", |
| "state_fidelity", |
| "process_fidelity", |
| "coherence_health", |
| "decoherence_risk", |
| "composite_risk", |
| "hotspot_level", |
| ] |
| if reader.fieldnames is None or any(field not in reader.fieldnames for field in required): |
| raise ValueError("Metrics CSV missing required headers.") |
| rows: List[Dict[str, Any]] = [] |
| for row in reader: |
| rows.append( |
| { |
| "qubit": int(row["qubit"]), |
| "activity_count": float(row["activity_count"]), |
| "activity_norm": float(row["activity_norm"]), |
| "gate_error": float(row["gate_error"]), |
| "readout_error": float(row["readout_error"]), |
| "fidelity": float(row["fidelity"]), |
| "state_fidelity": float(row["state_fidelity"]), |
| "process_fidelity": float(row["process_fidelity"]), |
| "coherence_health": float(row["coherence_health"]), |
| "decoherence_risk": float(row["decoherence_risk"]), |
| "composite_risk": float(row["composite_risk"]), |
| "hotspot_level": int(float(row["hotspot_level"])), |
| } |
| ) |
| return sorted(rows, key=lambda row: int(row["qubit"])) |
|
|
|
|
| def parse_severity_csv_text(text: str) -> List[Dict[str, Any]]: |
| raw = str(text or "").strip() |
| if not raw: |
| raise ValueError("Severity CSV is empty.") |
| reader = csv.DictReader(io.StringIO(raw)) |
| required = [ |
| "qubit", |
| "layout_row", |
| "layout_col", |
| "source_metric", |
| "severity_mode", |
| "base_risk", |
| "severity_score", |
| "severity_band", |
| "severity_rank", |
| "severity_percentile", |
| "pnr_cost", |
| "timing_derate", |
| "density_cap", |
| "guardband_mv", |
| "route_priority", |
| ] |
| if reader.fieldnames is None or any(field not in reader.fieldnames for field in required): |
| raise ValueError("Severity CSV missing required headers.") |
| rows: List[Dict[str, Any]] = [] |
| for row in reader: |
| rows.append( |
| { |
| "qubit": int(row["qubit"]), |
| "layout_row": _safe_int(row.get("layout_row", "")), |
| "layout_col": _safe_int(row.get("layout_col", "")), |
| "source_metric": str(row.get("source_metric", "")), |
| "severity_mode": str(row.get("severity_mode", "")), |
| "base_risk": float(row["base_risk"]), |
| "severity_score": float(row["severity_score"]), |
| "severity_band": str(row.get("severity_band", "")).upper(), |
| "severity_rank": int(float(row["severity_rank"])), |
| "severity_percentile": float(row["severity_percentile"]), |
| "pnr_cost": float(row["pnr_cost"]), |
| "timing_derate": float(row["timing_derate"]), |
| "density_cap": float(row["density_cap"]), |
| "guardband_mv": float(row["guardband_mv"]), |
| "route_priority": str(row.get("route_priority", "")).upper(), |
| } |
| ) |
| return sorted(rows, key=lambda row: int(row["qubit"])) |
|
|
|
|
| def parse_companion_lef_text(text: str) -> Dict[str, Any]: |
| raw = str(text or "") |
| if not raw.strip(): |
| raise ValueError("Companion LEF is empty.") |
|
|
| sites: Dict[str, Dict[str, Any]] = {} |
| for match in _LEF_SITE_BLOCK_RE.finditer(raw): |
| name = str(match.group(1)) |
| block = str(match.group(2)) |
| size_match = _LEF_SIZE_RE.search(block) |
| if not size_match: |
| continue |
| sites[name] = { |
| "width_microns": float(size_match.group(1)), |
| "height_microns": float(size_match.group(2)), |
| } |
|
|
| macros: List[Dict[str, Any]] = [] |
| for match in _LEF_MACRO_BLOCK_RE.finditer(raw): |
| name = str(match.group(1)) |
| block = str(match.group(2)) |
| size_match = _LEF_SIZE_RE.search(block) |
| site_match = _LEF_SITE_REF_RE.search(block) |
| macros.append( |
| { |
| "name": name, |
| "width_microns": None if not size_match else float(size_match.group(1)), |
| "height_microns": None if not size_match else float(size_match.group(2)), |
| "site_name": "" if not site_match else str(site_match.group(1)), |
| } |
| ) |
| return { |
| "sites": sites, |
| "macros": macros, |
| } |
|
|
|
|
| def parse_companion_lib_text(text: str) -> Dict[str, Any]: |
| raw = str(text or "") |
| if not raw.strip(): |
| raise ValueError("Companion LIB is empty.") |
| library_match = _LIB_LIBRARY_RE.search(raw) |
| if not library_match: |
| raise ValueError("Companion LIB missing library declaration.") |
| cells: List[Dict[str, Any]] = [] |
| for match in _LIB_CELL_RE.finditer(raw): |
| name = str(match.group(1)).strip() |
| block = str(match.group(2)) |
| area_match = _LIB_AREA_RE.search(block) |
| comment_match = _LIB_COMMENT_RE.search(block) |
| comment_text = "" if not comment_match else str(comment_match.group(1)) |
| comment_fields: Dict[str, str] = {} |
| for item in comment_text.split(";"): |
| if "=" not in item: |
| continue |
| key, value = item.split("=", 1) |
| comment_fields[str(key).strip().lower()] = str(value).strip() |
| qubit_text = comment_fields.get("qubit", "") |
| qubit = None |
| if qubit_text.startswith("q") and qubit_text[1:].isdigit(): |
| qubit = int(qubit_text[1:]) |
| cells.append( |
| { |
| "name": name, |
| "area": None if not area_match else float(area_match.group(1)), |
| "comment": comment_text, |
| "qubit": qubit, |
| "band": str(comment_fields.get("band", "")).upper(), |
| "source_metric": str(comment_fields.get("source", "")), |
| "severity_mode": str(comment_fields.get("mode", "")), |
| "layout_row": _safe_int(comment_fields.get("row", "")), |
| "layout_col": _safe_int(comment_fields.get("col", "")), |
| } |
| ) |
| return { |
| "library_name": str(library_match.group(1)).strip(), |
| "cells": cells, |
| } |
|
|
|
|
| def _rows_by_qubit(rows: Iterable[Dict[str, Any]]) -> Dict[int, Dict[str, Any]]: |
| return {int(row["qubit"]): dict(row) for row in rows if row.get("qubit") is not None} |
|
|
|
|
| def _expected_band_for_row(row: Dict[str, Any], severity_cfg: SeverityConfig) -> str: |
| mode = str(severity_cfg.mode or "linear").strip().lower() |
| if mode == "percentile": |
| percentile = float(row.get("severity_percentile", 0.0)) |
| if percentile >= float(severity_cfg.thresholds.percentile_critical): |
| return "CRITICAL" |
| if percentile >= float(severity_cfg.thresholds.percentile_warning): |
| return "WARNING" |
| return "OK" |
| score = float(row.get("severity_score", 0.0)) |
| if score >= float(severity_cfg.thresholds.critical): |
| return "CRITICAL" |
| if score >= float(severity_cfg.thresholds.warning): |
| return "WARNING" |
| return "OK" |
|
|
|
|
| def _compare_expected_row_maps( |
| checks: List[Dict[str, Any]], |
| *, |
| artifact: str, |
| observed: Dict[int, Dict[str, Any]], |
| expected: Dict[int, Dict[str, Any]], |
| float_fields: Sequence[str], |
| text_fields: Sequence[str], |
| int_fields: Sequence[str], |
| ) -> None: |
| observed_qubits = sorted(observed.keys()) |
| expected_qubits = sorted(expected.keys()) |
| if observed_qubits != expected_qubits: |
| _add_check( |
| checks, |
| status="fail", |
| code=f"{artifact}_qubits", |
| artifact=artifact, |
| message=f"Qubit coverage mismatch. observed={observed_qubits} expected={expected_qubits}", |
| targets=[f"q{q}" for q in sorted(set(observed_qubits) ^ set(expected_qubits))], |
| ) |
| return |
|
|
| mismatches: List[str] = [] |
| for q in expected_qubits: |
| obs = observed[q] |
| exp = expected[q] |
| for field in float_fields: |
| if not _float_close(obs.get(field), exp.get(field)): |
| mismatches.append(f"q{q}:{field}") |
| break |
| if mismatches and mismatches[-1].startswith(f"q{q}:"): |
| continue |
| for field in text_fields: |
| if str(obs.get(field, "")).upper() != str(exp.get(field, "")).upper(): |
| mismatches.append(f"q{q}:{field}") |
| break |
| if mismatches and mismatches[-1].startswith(f"q{q}:"): |
| continue |
| for field in int_fields: |
| if _safe_int(obs.get(field)) != _safe_int(exp.get(field)): |
| mismatches.append(f"q{q}:{field}") |
| break |
|
|
| if mismatches: |
| _add_check( |
| checks, |
| status="fail", |
| code=f"{artifact}_content", |
| artifact=artifact, |
| message=f"Observed rows diverge from expected export state at {', '.join(mismatches[:8])}.", |
| targets=mismatches[:16], |
| ) |
| else: |
| _add_check( |
| checks, |
| status="pass", |
| code=f"{artifact}_content", |
| artifact=artifact, |
| message=f"{artifact} rows match the current session state for {len(expected_qubits)} qubits.", |
| ) |
|
|
|
|
| def _collect_geometry_checks( |
| checks: List[Dict[str, Any]], |
| *, |
| blockage_rows: Sequence[Dict[str, Any]], |
| grid_cfg: DEFGridConfig, |
| chip_rows: int, |
| chip_cols: int, |
| imported_design_def_text: str = "", |
| ) -> None: |
| cell_width = int(grid_cfg.site_width_dbu) * int(grid_cfg.sites_per_cell_x) |
| cell_height = int(grid_cfg.row_height_dbu) * int(grid_cfg.rows_per_cell_y) |
| core_x0 = int(grid_cfg.origin_x_dbu) |
| core_y0 = int(grid_cfg.origin_y_dbu) |
| core_x1 = core_x0 + int(chip_cols) * cell_width |
| core_y1 = core_y0 + int(chip_rows) * cell_height |
|
|
| malformed = [] |
| misaligned = [] |
| wrong_size = [] |
| out_of_core = [] |
| for row in blockage_rows: |
| xl, yl, xh, yh = [int(v) for v in row["rect_dbu"]] |
| q = int(row.get("qubit", -1)) |
| if xh <= xl or yh <= yl: |
| malformed.append(q) |
| if ( |
| (xl - core_x0) % int(grid_cfg.site_width_dbu) != 0 |
| or (xh - core_x0) % int(grid_cfg.site_width_dbu) != 0 |
| or (yl - core_y0) % int(grid_cfg.row_height_dbu) != 0 |
| or (yh - core_y0) % int(grid_cfg.row_height_dbu) != 0 |
| ): |
| misaligned.append(q) |
| if (xh - xl) != cell_width or (yh - yl) != cell_height: |
| wrong_size.append(q) |
| if xl < core_x0 or yl < core_y0 or xh > core_x1 or yh > core_y1: |
| out_of_core.append(q) |
|
|
| if malformed: |
| _add_check(checks, status="fail", code="def_malformed", artifact="def", message=f"Malformed DEF rectangles for qubits {malformed}.", targets=[f"q{q}" for q in malformed]) |
| else: |
| _add_check(checks, status="pass", code="def_malformed", artifact="def", message="All DEF blockage rectangles have positive area.") |
|
|
| if misaligned: |
| _add_check(checks, status="fail", code="def_alignment", artifact="def", message=f"DEF rectangles are off the site/row grid for qubits {misaligned}.", targets=[f"q{q}" for q in misaligned]) |
| else: |
| _add_check(checks, status="pass", code="def_alignment", artifact="def", message="All DEF rectangles snap to the configured site/row grid.") |
|
|
| if wrong_size: |
| _add_check(checks, status="fail", code="def_box_size", artifact="def", message=f"DEF rectangles do not match one heatmap cell span for qubits {wrong_size}.", targets=[f"q{q}" for q in wrong_size]) |
| else: |
| _add_check(checks, status="pass", code="def_box_size", artifact="def", message="Each DEF blockage covers exactly one heatmap cell span.") |
|
|
| if out_of_core: |
| _add_check(checks, status="fail", code="def_core_bounds", artifact="def", message=f"DEF rectangles exceed the current core extent for qubits {out_of_core}.", targets=[f"q{q}" for q in out_of_core]) |
| else: |
| _add_check(checks, status="pass", code="def_core_bounds", artifact="def", message="All DEF rectangles stay within the current core extent.") |
|
|
| imported_def_meta = parse_design_def(imported_design_def_text) if str(imported_design_def_text or "").strip() else {} |
| diearea = imported_def_meta.get("diearea") |
| if diearea is None: |
| _add_check(checks, status="warn", code="def_diearea_bounds", artifact="def", message="Imported DEF DIEAREA not available; external floorplan-bound check was skipped.") |
| return |
|
|
| die_x0, die_y0, die_x1, die_y1 = [int(v) for v in diearea] |
| outside_die = [] |
| for row in blockage_rows: |
| xl, yl, xh, yh = [int(v) for v in row["rect_dbu"]] |
| q = int(row.get("qubit", -1)) |
| if xl < die_x0 or yl < die_y0 or xh > die_x1 or yh > die_y1: |
| outside_die.append(q) |
| if outside_die: |
| _add_check(checks, status="fail", code="def_diearea_bounds", artifact="def", message=f"DEF rectangles exceed imported DEF DIEAREA for qubits {outside_die}.", targets=[f"q{q}" for q in outside_die]) |
| else: |
| _add_check(checks, status="pass", code="def_diearea_bounds", artifact="def", message="All DEF rectangles stay within the imported DEF DIEAREA.") |
|
|
|
|
| def validate_design_exports( |
| *, |
| metrics_csv_text: str, |
| severity_csv_text: str, |
| synopsys_tcl_text: str, |
| cadence_skill_text: str, |
| def_fragment_text: str, |
| companion_lef_text: str, |
| companion_lib_text: str, |
| expected_metrics: Dict[str, Any], |
| expected_severity_rows: Sequence[Dict[str, Any]], |
| expected_mapping_rows: Sequence[Dict[str, Any]], |
| expected_blockage_rows: Sequence[Dict[str, Any]], |
| severity_cfg: SeverityConfig, |
| grid_cfg: DEFGridConfig, |
| chip_rows: int, |
| chip_cols: int, |
| dbu_per_micron: int, |
| site_name: str, |
| imported_design_def_text: str = "", |
| ) -> Dict[str, Any]: |
| checks: List[Dict[str, Any]] = [] |
| expected_metrics_rows = [] |
| qubit_count = int(len(expected_metrics.get("activity_count", []))) |
| for q in range(qubit_count): |
| expected_metrics_rows.append( |
| { |
| "qubit": int(q), |
| "activity_count": float(expected_metrics["activity_count"][q]), |
| "activity_norm": float(expected_metrics["activity_norm"][q]), |
| "gate_error": float(expected_metrics["gate_error"][q]), |
| "readout_error": float(expected_metrics["readout_error"][q]), |
| "fidelity": float(expected_metrics["fidelity"][q]), |
| "state_fidelity": float(expected_metrics["state_fidelity"][q]), |
| "process_fidelity": float(expected_metrics["process_fidelity"][q]), |
| "coherence_health": float(expected_metrics["coherence_health"][q]), |
| "decoherence_risk": float(expected_metrics["decoherence_risk"][q]), |
| "composite_risk": float(expected_metrics["composite_risk"][q]), |
| "hotspot_level": int(expected_metrics["hotspot_level"][q]), |
| } |
| ) |
|
|
| parsed_metrics = parse_metrics_csv_text(metrics_csv_text) |
| _compare_expected_row_maps( |
| checks, |
| artifact="metrics_csv", |
| observed=_rows_by_qubit(parsed_metrics), |
| expected=_rows_by_qubit(expected_metrics_rows), |
| float_fields=[ |
| "activity_count", |
| "activity_norm", |
| "gate_error", |
| "readout_error", |
| "fidelity", |
| "state_fidelity", |
| "process_fidelity", |
| "coherence_health", |
| "decoherence_risk", |
| "composite_risk", |
| ], |
| text_fields=[], |
| int_fields=["hotspot_level"], |
| ) |
|
|
| parsed_severity = parse_severity_csv_text(severity_csv_text) |
| expected_severity_map = _rows_by_qubit(expected_severity_rows) |
| _compare_expected_row_maps( |
| checks, |
| artifact="severity_csv", |
| observed=_rows_by_qubit(parsed_severity), |
| expected=expected_severity_map, |
| float_fields=[ |
| "base_risk", |
| "severity_score", |
| "severity_percentile", |
| "pnr_cost", |
| "timing_derate", |
| "density_cap", |
| "guardband_mv", |
| ], |
| text_fields=["source_metric", "severity_mode", "severity_band", "route_priority"], |
| int_fields=["layout_row", "layout_col", "severity_rank"], |
| ) |
|
|
| threshold_mismatches = [] |
| for row in parsed_severity: |
| expected_band = _expected_band_for_row(row, severity_cfg) |
| if str(row.get("severity_band", "")).upper() != str(expected_band).upper(): |
| threshold_mismatches.append(int(row["qubit"])) |
| if threshold_mismatches: |
| _add_check( |
| checks, |
| status="fail", |
| code="severity_thresholds", |
| artifact="severity_csv", |
| message=f"Severity bands do not match the current threshold model for qubits {threshold_mismatches}.", |
| targets=[f"q{q}" for q in threshold_mismatches], |
| ) |
| else: |
| _add_check( |
| checks, |
| status="pass", |
| code="severity_thresholds", |
| artifact="severity_csv", |
| message="Severity bands align with the current threshold model.", |
| ) |
|
|
| parsed_tcl_rows, tcl_meta = parse_eda_text(synopsys_tcl_text, "synopsys_tcl") |
| parsed_skill_rows, skill_meta = parse_eda_text(cadence_skill_text, "cadence_skill") |
| parsed_def_rows, def_meta = parse_def_blockages(def_fragment_text) |
| expected_mapping_map = _rows_by_qubit(expected_mapping_rows) |
| expected_def_map = _rows_by_qubit(expected_blockage_rows) |
|
|
| _compare_expected_row_maps( |
| checks, |
| artifact="synopsys_tcl", |
| observed=_rows_by_qubit(parsed_tcl_rows), |
| expected=expected_mapping_map, |
| float_fields=[ |
| "composite_risk", |
| "severity_score", |
| "severity_percentile", |
| "pnr_cost", |
| "timing_derate", |
| "density_cap", |
| "guardband_mv", |
| ], |
| text_fields=["tier", "route_priority", "source_metric", "severity_mode"], |
| int_fields=["layout_row", "layout_col"], |
| ) |
| _compare_expected_row_maps( |
| checks, |
| artifact="cadence_skill", |
| observed=_rows_by_qubit(parsed_skill_rows), |
| expected=expected_mapping_map, |
| float_fields=[ |
| "composite_risk", |
| "severity_score", |
| "severity_percentile", |
| "pnr_cost", |
| "timing_derate", |
| "density_cap", |
| "guardband_mv", |
| ], |
| text_fields=["tier", "route_priority", "source_metric", "severity_mode"], |
| int_fields=["layout_row", "layout_col"], |
| ) |
| _compare_expected_row_maps( |
| checks, |
| artifact="def", |
| observed=_rows_by_qubit(parsed_def_rows), |
| expected=expected_def_map, |
| float_fields=["density", "risk_value", "severity_percentile", "pnr_cost", "density_cap"], |
| text_fields=["severity", "mode", "source_metric", "severity_mode", "route_priority"], |
| int_fields=["layout_row", "layout_col"], |
| ) |
|
|
| observed_def_map = _rows_by_qubit(parsed_def_rows) |
| def_rect_mismatch = [] |
| for q, expected_row in expected_def_map.items(): |
| observed_row = observed_def_map.get(q) |
| if observed_row is None or tuple(observed_row.get("rect_dbu", ())) != tuple(expected_row.get("rect_dbu", ())): |
| def_rect_mismatch.append(q) |
| if def_rect_mismatch: |
| _add_check(checks, status="fail", code="def_rectangles", artifact="def", message=f"DEF rectangle mismatch for qubits {def_rect_mismatch}.", targets=[f"q{q}" for q in def_rect_mismatch]) |
| else: |
| _add_check(checks, status="pass", code="def_rectangles", artifact="def", message="DEF rectangles match the current exported blockage geometry.") |
|
|
| if int(def_meta.get("declared_blockages") or -1) != len(parsed_def_rows): |
| _add_check( |
| checks, |
| status="fail", |
| code="def_declared_count", |
| artifact="def", |
| message=f"DEF declares {def_meta.get('declared_blockages')} blockages but parses {len(parsed_def_rows)} rows.", |
| ) |
| else: |
| _add_check(checks, status="pass", code="def_declared_count", artifact="def", message=f"DEF declaration count matches parsed rows ({len(parsed_def_rows)}).") |
|
|
| _collect_geometry_checks( |
| checks, |
| blockage_rows=parsed_def_rows, |
| grid_cfg=grid_cfg, |
| chip_rows=int(chip_rows), |
| chip_cols=int(chip_cols), |
| imported_design_def_text=imported_design_def_text, |
| ) |
|
|
| parsed_lef = parse_companion_lef_text(companion_lef_text) |
| parsed_lib = parse_companion_lib_text(companion_lib_text) |
| expected_site = str(site_name or "QUREAD_SITE") |
| site_info = parsed_lef["sites"].get(expected_site) |
| expected_site_width_um = float(int(grid_cfg.site_width_dbu)) / float(max(1, int(dbu_per_micron))) |
| expected_row_height_um = float(int(grid_cfg.row_height_dbu)) / float(max(1, int(dbu_per_micron))) |
| expected_cell_width_um = float(int(grid_cfg.site_width_dbu) * int(grid_cfg.sites_per_cell_x)) / float(max(1, int(dbu_per_micron))) |
| expected_cell_height_um = float(int(grid_cfg.row_height_dbu) * int(grid_cfg.rows_per_cell_y)) / float(max(1, int(dbu_per_micron))) |
|
|
| if not site_info: |
| _add_check(checks, status="fail", code="lef_site", artifact="lef", message=f"Companion LEF is missing expected site `{expected_site}`.") |
| elif not (_float_close(site_info["width_microns"], expected_site_width_um) and _float_close(site_info["height_microns"], expected_row_height_um)): |
| _add_check(checks, status="fail", code="lef_site", artifact="lef", message="Companion LEF site size does not match current DBU/grid settings.") |
| else: |
| _add_check(checks, status="pass", code="lef_site", artifact="lef", message="Companion LEF site declaration matches current DBU/grid settings.") |
|
|
| macros_by_name = {str(item["name"]): item for item in parsed_lef.get("macros", [])} |
| macro_expected_count = len(expected_severity_rows) + 3 |
| if len(macros_by_name) != macro_expected_count: |
| _add_check(checks, status="fail", code="lef_macro_count", artifact="lef", message=f"Companion LEF macro count mismatch. observed={len(macros_by_name)} expected={macro_expected_count}.") |
| else: |
| _add_check(checks, status="pass", code="lef_macro_count", artifact="lef", message=f"Companion LEF contains the expected {macro_expected_count} macros.") |
|
|
| missing_lef_macros = [] |
| bad_lef_macros = [] |
| for row in expected_severity_rows: |
| name = f"QUREAD_Q{int(row['qubit'])}_{str(row.get('severity_band', 'OK')).upper()}" |
| macro = macros_by_name.get(name) |
| if macro is None: |
| missing_lef_macros.append(name) |
| continue |
| if not ( |
| _float_close(macro.get("width_microns"), expected_cell_width_um) |
| and _float_close(macro.get("height_microns"), expected_cell_height_um) |
| and str(macro.get("site_name", "")) == expected_site |
| ): |
| bad_lef_macros.append(name) |
| if missing_lef_macros or bad_lef_macros: |
| _add_check( |
| checks, |
| status="fail", |
| code="lef_macro_content", |
| artifact="lef", |
| message=f"Companion LEF macro mismatch. missing={missing_lef_macros[:6]} bad={bad_lef_macros[:6]}", |
| targets=missing_lef_macros[:6] + bad_lef_macros[:6], |
| ) |
| else: |
| _add_check(checks, status="pass", code="lef_macro_content", artifact="lef", message="Companion LEF qubit macros match current severity rows.") |
|
|
| cells_by_name = {str(item["name"]): item for item in parsed_lib.get("cells", [])} |
| if len(cells_by_name) != len(expected_severity_rows): |
| _add_check(checks, status="fail", code="lib_cell_count", artifact="lib", message=f"Companion LIB cell count mismatch. observed={len(cells_by_name)} expected={len(expected_severity_rows)}.") |
| else: |
| _add_check(checks, status="pass", code="lib_cell_count", artifact="lib", message=f"Companion LIB contains the expected {len(expected_severity_rows)} cells.") |
|
|
| lib_mismatches = [] |
| for row in expected_severity_rows: |
| name = f"QUREAD_Q{int(row['qubit'])}_{str(row.get('severity_band', 'OK')).upper()}" |
| cell = cells_by_name.get(name) |
| if cell is None: |
| lib_mismatches.append(name) |
| continue |
| if ( |
| int(cell.get("qubit", -1)) != int(row["qubit"]) |
| or str(cell.get("band", "")).upper() != str(row.get("severity_band", "")).upper() |
| or str(cell.get("source_metric", "")) != str(row.get("source_metric", "")) |
| or str(cell.get("severity_mode", "")) != str(row.get("severity_mode", "")) |
| or _safe_int(cell.get("layout_row")) != _safe_int(row.get("layout_row")) |
| or _safe_int(cell.get("layout_col")) != _safe_int(row.get("layout_col")) |
| ): |
| lib_mismatches.append(name) |
| if lib_mismatches: |
| _add_check(checks, status="fail", code="lib_cell_content", artifact="lib", message=f"Companion LIB comments diverge from current severity rows for {lib_mismatches[:8]}.", targets=lib_mismatches[:8]) |
| else: |
| _add_check(checks, status="pass", code="lib_cell_content", artifact="lib", message="Companion LIB annotations match current severity rows.") |
|
|
| if int(tcl_meta.get("parsed_rows", 0)) != int(skill_meta.get("parsed_rows", 0)): |
| _add_check(checks, status="fail", code="eda_row_parity", artifact="eda", message="Synopsys TCL and Cadence SKILL parse different row counts.") |
| else: |
| _add_check(checks, status="pass", code="eda_row_parity", artifact="eda", message=f"Synopsys TCL and Cadence SKILL both parse {int(tcl_meta.get('parsed_rows', 0))} rows.") |
|
|
| summary = { |
| "overall_status": _final_status(checks), |
| "check_count": len(checks), |
| "pass_count": sum(1 for check in checks if str(check["status"]) == "PASS"), |
| "warn_count": sum(1 for check in checks if str(check["status"]) == "WARN"), |
| "fail_count": sum(1 for check in checks if str(check["status"]) == "FAIL"), |
| } |
| artifact_counts = { |
| "metrics_rows": len(parsed_metrics), |
| "severity_rows": len(parsed_severity), |
| "synopsys_rows": len(parsed_tcl_rows), |
| "cadence_rows": len(parsed_skill_rows), |
| "def_rows": len(parsed_def_rows), |
| "lef_macros": len(parsed_lef.get("macros", [])), |
| "lib_cells": len(parsed_lib.get("cells", [])), |
| } |
| return { |
| "summary": summary, |
| "settings": { |
| "severity_mode": str(severity_cfg.mode), |
| "severity_source_metric": str(severity_cfg.source_metric), |
| "warning_threshold": float(severity_cfg.thresholds.warning), |
| "critical_threshold": float(severity_cfg.thresholds.critical), |
| "percentile_warning": float(severity_cfg.thresholds.percentile_warning), |
| "percentile_critical": float(severity_cfg.thresholds.percentile_critical), |
| "grid": { |
| "origin_x_dbu": int(grid_cfg.origin_x_dbu), |
| "origin_y_dbu": int(grid_cfg.origin_y_dbu), |
| "site_width_dbu": int(grid_cfg.site_width_dbu), |
| "row_height_dbu": int(grid_cfg.row_height_dbu), |
| "sites_per_cell_x": int(grid_cfg.sites_per_cell_x), |
| "rows_per_cell_y": int(grid_cfg.rows_per_cell_y), |
| }, |
| "chip_rows": int(chip_rows), |
| "chip_cols": int(chip_cols), |
| "dbu_per_micron": int(dbu_per_micron), |
| "site_name": str(site_name or "QUREAD_SITE"), |
| }, |
| "artifact_counts": artifact_counts, |
| "checks": checks, |
| } |
|
|
|
|
| def validation_table_rows(report: Dict[str, Any]) -> List[List[Any]]: |
| rows = [] |
| for check in report.get("checks", []): |
| rows.append( |
| [ |
| str(check.get("status", "")), |
| str(check.get("artifact", "")), |
| str(check.get("code", "")), |
| str(check.get("message", "")), |
| ] |
| ) |
| rows.sort(key=lambda row: (_status_rank(row[0]), row[1], row[2]), reverse=True) |
| return rows |
|
|
|
|
| def validation_artifact_summary_rows(report: Dict[str, Any]) -> List[List[Any]]: |
| buckets: Dict[str, Dict[str, int]] = {} |
| for check in report.get("checks", []): |
| artifact = str(check.get("artifact", "unknown") or "unknown") |
| status = str(check.get("status", "PASS") or "PASS").upper() |
| bucket = buckets.setdefault(artifact, {"PASS": 0, "WARN": 0, "FAIL": 0}) |
| if status not in bucket: |
| bucket[status] = 0 |
| bucket[status] += 1 |
|
|
| rows: List[List[Any]] = [] |
| for artifact, counts in buckets.items(): |
| if int(counts.get("FAIL", 0)) > 0: |
| overall = "FAIL" |
| elif int(counts.get("WARN", 0)) > 0: |
| overall = "WARN" |
| else: |
| overall = "PASS" |
| rows.append( |
| [ |
| artifact, |
| int(counts.get("PASS", 0)), |
| int(counts.get("WARN", 0)), |
| int(counts.get("FAIL", 0)), |
| overall, |
| ] |
| ) |
| rows.sort(key=lambda row: (_status_rank(row[4]), row[3], row[2], row[0]), reverse=True) |
| return rows |
|
|
|
|
| def validation_focus_rows(report: Dict[str, Any], top_k: int = 12) -> List[List[Any]]: |
| rows: List[List[Any]] = [] |
| for check in report.get("checks", []): |
| status = str(check.get("status", "PASS") or "PASS").upper() |
| if status == "PASS": |
| continue |
| targets = [str(item) for item in (check.get("targets") or []) if str(item).strip()] |
| target_text = ", ".join(targets[:8]) if targets else "-" |
| if len(targets) > 8: |
| target_text += ", ..." |
| rows.append( |
| [ |
| status, |
| str(check.get("artifact", "")), |
| str(check.get("code", "")), |
| target_text, |
| str(check.get("message", "")), |
| ] |
| ) |
| rows.sort(key=lambda row: (_status_rank(row[0]), row[1], row[2]), reverse=True) |
| return rows[: max(1, int(top_k))] if rows else [] |
|
|
|
|
| def validation_focus_markdown(report: Dict[str, Any], top_k: int = 4) -> str: |
| rows = validation_focus_rows(report, top_k=top_k) |
| if not rows: |
| return "### Mismatch Focus\n- No warnings or failures. Current package checks are clean." |
|
|
| lines = ["### Mismatch Focus"] |
| for status, artifact, code, targets, message in rows[: max(1, int(top_k))]: |
| lines.append( |
| f"- `{status}` `{artifact}` `{code}`" |
| + ("" if targets == "-" else f" targets=`{targets}`") |
| + f": {message}" |
| ) |
| return "\n".join(lines) |
|
|
|
|
| def validation_summary_markdown(report: Dict[str, Any]) -> str: |
| summary = dict(report.get("summary") or {}) |
| settings = dict(report.get("settings") or {}) |
| counts = dict(report.get("artifact_counts") or {}) |
| return ( |
| f"### Validation Summary\n" |
| f"- Overall status: `{summary.get('overall_status', 'UNKNOWN')}`\n" |
| f"- Checks: `{summary.get('check_count', 0)}` total | " |
| f"`{summary.get('pass_count', 0)}` pass | `{summary.get('warn_count', 0)}` warn | `{summary.get('fail_count', 0)}` fail\n" |
| f"- Severity policy: mode=`{settings.get('severity_mode', 'linear')}` source=`{settings.get('severity_source_metric', 'composite_risk')}`\n" |
| f"- Grid: site=`{settings.get('grid', {}).get('site_width_dbu', '')} x {settings.get('grid', {}).get('row_height_dbu', '')}` DBU | " |
| f"cell=`{settings.get('grid', {}).get('sites_per_cell_x', '')} x {settings.get('grid', {}).get('rows_per_cell_y', '')}` | " |
| f"chip=`{settings.get('chip_rows', '')} x {settings.get('chip_cols', '')}`\n" |
| f"- Parsed artifacts: metrics=`{counts.get('metrics_rows', 0)}`, severity=`{counts.get('severity_rows', 0)}`, " |
| f"TCL=`{counts.get('synopsys_rows', 0)}`, SKILL=`{counts.get('cadence_rows', 0)}`, DEF=`{counts.get('def_rows', 0)}`, " |
| f"LEF macros=`{counts.get('lef_macros', 0)}`, LIB cells=`{counts.get('lib_cells', 0)}`" |
| ) |
|
|
|
|
| def build_validation_report_html(report: Dict[str, Any]) -> str: |
| summary = dict(report.get("summary") or {}) |
| settings = dict(report.get("settings") or {}) |
| counts = dict(report.get("artifact_counts") or {}) |
| rows = validation_table_rows(report) |
| artifact_rows = validation_artifact_summary_rows(report) |
| focus_rows = validation_focus_rows(report, top_k=12) |
| table_rows = "".join( |
| "<tr>" |
| + "".join(f"<td>{html.escape(str(cell))}</td>" for cell in row) |
| + "</tr>" |
| for row in rows |
| ) or "<tr><td colspan='4'>No checks.</td></tr>" |
| artifact_table_rows = "".join( |
| "<tr>" |
| + "".join(f"<td>{html.escape(str(cell))}</td>" for cell in row) |
| + "</tr>" |
| for row in artifact_rows |
| ) or "<tr><td colspan='5'>No artifacts.</td></tr>" |
| focus_table_rows = "".join( |
| "<tr>" |
| + "".join(f"<td>{html.escape(str(cell))}</td>" for cell in row) |
| + "</tr>" |
| for row in focus_rows |
| ) or "<tr><td colspan='5'>No warnings or failures.</td></tr>" |
| return f"""<!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> |
| <title>Quread Validation Report</title> |
| <style> |
| body {{ |
| margin: 0; |
| font-family: "Avenir Next", "Segoe UI", sans-serif; |
| background: linear-gradient(180deg, #f6f9ff 0%, #ffffff 100%); |
| color: #0f172a; |
| }} |
| .wrap {{ |
| max-width: 1200px; |
| margin: 0 auto; |
| padding: 28px 24px 40px; |
| }} |
| .hero, .card {{ |
| background: #ffffff; |
| border: 1px solid #dbe3ef; |
| border-radius: 18px; |
| box-shadow: 0 12px 36px rgba(15, 23, 42, 0.06); |
| }} |
| .hero {{ |
| padding: 24px 28px; |
| }} |
| .card {{ |
| padding: 18px; |
| margin-top: 18px; |
| }} |
| .grid {{ |
| display: grid; |
| grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); |
| gap: 18px; |
| margin-top: 18px; |
| }} |
| h1 {{ |
| margin: 0 0 8px; |
| font-size: 32px; |
| }} |
| h2 {{ |
| margin: 0 0 12px; |
| font-size: 20px; |
| }} |
| table {{ |
| width: 100%; |
| border-collapse: collapse; |
| font-size: 14px; |
| }} |
| th, td {{ |
| border-bottom: 1px solid #e5eaf2; |
| padding: 8px 10px; |
| text-align: left; |
| vertical-align: top; |
| }} |
| th {{ |
| background: #f4f7fb; |
| }} |
| p, li {{ |
| line-height: 1.55; |
| color: #475569; |
| }} |
| </style> |
| </head> |
| <body> |
| <div class="wrap"> |
| <section class="hero"> |
| <h1>Quread Validation Report</h1> |
| <p>Internal package validation for the current heatmap, severity, EDA, and DEF export state.</p> |
| </section> |
| <section class="grid"> |
| <div class="card"> |
| <h2>Summary</h2> |
| <ul> |
| <li>Overall status: <strong>{html.escape(str(summary.get("overall_status", "UNKNOWN")))}</strong></li> |
| <li>Checks: {int(summary.get("check_count", 0))} total, {int(summary.get("pass_count", 0))} pass, {int(summary.get("warn_count", 0))} warn, {int(summary.get("fail_count", 0))} fail</li> |
| <li>Severity policy: mode={html.escape(str(settings.get("severity_mode", "")))}, source={html.escape(str(settings.get("severity_source_metric", "")))}</li> |
| </ul> |
| </div> |
| <div class="card"> |
| <h2>Artifact Counts</h2> |
| <ul> |
| <li>Metrics rows: {int(counts.get("metrics_rows", 0))}</li> |
| <li>Severity rows: {int(counts.get("severity_rows", 0))}</li> |
| <li>TCL rows: {int(counts.get("synopsys_rows", 0))}</li> |
| <li>SKILL rows: {int(counts.get("cadence_rows", 0))}</li> |
| <li>DEF rows: {int(counts.get("def_rows", 0))}</li> |
| <li>LEF macros: {int(counts.get("lef_macros", 0))}</li> |
| <li>LIB cells: {int(counts.get("lib_cells", 0))}</li> |
| </ul> |
| </div> |
| </section> |
| <section class="card"> |
| <h2>Artifact Summary</h2> |
| <table> |
| <thead> |
| <tr><th>Artifact</th><th>Pass</th><th>Warn</th><th>Fail</th><th>Overall</th></tr> |
| </thead> |
| <tbody> |
| {artifact_table_rows} |
| </tbody> |
| </table> |
| </section> |
| <section class="card"> |
| <h2>Mismatch Focus</h2> |
| <table> |
| <thead> |
| <tr><th>Status</th><th>Artifact</th><th>Code</th><th>Targets</th><th>Message</th></tr> |
| </thead> |
| <tbody> |
| {focus_table_rows} |
| </tbody> |
| </table> |
| </section> |
| <section class="card"> |
| <h2>Checks</h2> |
| <table> |
| <thead> |
| <tr><th>Status</th><th>Artifact</th><th>Code</th><th>Message</th></tr> |
| </thead> |
| <tbody> |
| {table_rows} |
| </tbody> |
| </table> |
| </section> |
| </div> |
| </body> |
| </html> |
| """ |
|
|
|
|
| def validation_report_json(report: Dict[str, Any]) -> str: |
| return json.dumps(report, indent=2, sort_keys=True) |
|
|