from __future__ import annotations
import csv
import html
import io
import json
import re
from typing import Any, Dict, Iterable, List, Sequence, Tuple
from .def_translator import DEFGridConfig
from .design_importer import parse_design_def
from .eda_view import parse_def_blockages, parse_eda_text
from .severity_mapper import SeverityConfig
_LEF_SITE_BLOCK_RE = re.compile(r"(?is)\bSITE\s+([A-Za-z0-9_.$/-]+)(.*?)END\s+\1\b")
_LEF_MACRO_BLOCK_RE = re.compile(r"(?is)\bMACRO\s+([A-Za-z0-9_.$/-]+)(.*?)END\s+\1\b")
_LEF_SIZE_RE = re.compile(r"(?is)\bSIZE\s+([0-9.+-eE]+)\s+BY\s+([0-9.+-eE]+)\s*;")
_LEF_SITE_REF_RE = re.compile(r"(?is)\bSITE\s+([A-Za-z0-9_.$/-]+)\s*;")
_LIB_LIBRARY_RE = re.compile(r"(?is)\blibrary\s*\(\s*([^)]+)\s*\)")
_LIB_CELL_RE = re.compile(r"(?is)\bcell\s*\(\s*([^)]+)\s*\)\s*\{(.*?)\n\s*\}")
_LIB_AREA_RE = re.compile(r"(?is)\barea\s*:\s*([0-9.+-eE]+)\s*;")
_LIB_COMMENT_RE = re.compile(r'(?is)\bcomment\s*:\s*"([^"]*)"\s*;')
def _float_close(a: Any, b: Any, tol: float = 1e-5) -> bool:
if a in (None, "") and b in (None, ""):
return True
try:
return abs(float(a) - float(b)) <= float(tol)
except Exception:
return False
def _status_rank(status: str) -> int:
text = str(status or "").strip().lower()
if text == "fail":
return 2
if text == "warn":
return 1
return 0
def _final_status(checks: Sequence[Dict[str, Any]]) -> str:
rank = 0
for check in checks:
rank = max(rank, _status_rank(str(check.get("status", "pass"))))
if rank >= 2:
return "FAIL"
if rank == 1:
return "WARN"
return "PASS"
def _add_check(
checks: List[Dict[str, Any]],
*,
status: str,
code: str,
artifact: str,
message: str,
targets: Sequence[Any] | None = None,
) -> None:
checks.append(
{
"status": str(status).upper(),
"code": str(code),
"artifact": str(artifact),
"message": str(message),
"targets": [str(item) for item in (targets or []) if str(item).strip()],
}
)
def _safe_int(value: Any) -> int | None:
text = str(value).strip()
if text == "":
return None
try:
return int(float(text))
except Exception:
return None
def _safe_float(value: Any) -> float | None:
text = str(value).strip()
if text == "":
return None
try:
return float(text)
except Exception:
return None
def parse_metrics_csv_text(text: str) -> List[Dict[str, Any]]:
raw = str(text or "").strip()
if not raw:
raise ValueError("Metrics CSV is empty.")
reader = csv.DictReader(io.StringIO(raw))
required = [
"qubit",
"activity_count",
"activity_norm",
"gate_error",
"readout_error",
"fidelity",
"state_fidelity",
"process_fidelity",
"coherence_health",
"decoherence_risk",
"composite_risk",
"hotspot_level",
]
if reader.fieldnames is None or any(field not in reader.fieldnames for field in required):
raise ValueError("Metrics CSV missing required headers.")
rows: List[Dict[str, Any]] = []
for row in reader:
rows.append(
{
"qubit": int(row["qubit"]),
"activity_count": float(row["activity_count"]),
"activity_norm": float(row["activity_norm"]),
"gate_error": float(row["gate_error"]),
"readout_error": float(row["readout_error"]),
"fidelity": float(row["fidelity"]),
"state_fidelity": float(row["state_fidelity"]),
"process_fidelity": float(row["process_fidelity"]),
"coherence_health": float(row["coherence_health"]),
"decoherence_risk": float(row["decoherence_risk"]),
"composite_risk": float(row["composite_risk"]),
"hotspot_level": int(float(row["hotspot_level"])),
}
)
return sorted(rows, key=lambda row: int(row["qubit"]))
def parse_severity_csv_text(text: str) -> List[Dict[str, Any]]:
raw = str(text or "").strip()
if not raw:
raise ValueError("Severity CSV is empty.")
reader = csv.DictReader(io.StringIO(raw))
required = [
"qubit",
"layout_row",
"layout_col",
"source_metric",
"severity_mode",
"base_risk",
"severity_score",
"severity_band",
"severity_rank",
"severity_percentile",
"pnr_cost",
"timing_derate",
"density_cap",
"guardband_mv",
"route_priority",
]
if reader.fieldnames is None or any(field not in reader.fieldnames for field in required):
raise ValueError("Severity CSV missing required headers.")
rows: List[Dict[str, Any]] = []
for row in reader:
rows.append(
{
"qubit": int(row["qubit"]),
"layout_row": _safe_int(row.get("layout_row", "")),
"layout_col": _safe_int(row.get("layout_col", "")),
"source_metric": str(row.get("source_metric", "")),
"severity_mode": str(row.get("severity_mode", "")),
"base_risk": float(row["base_risk"]),
"severity_score": float(row["severity_score"]),
"severity_band": str(row.get("severity_band", "")).upper(),
"severity_rank": int(float(row["severity_rank"])),
"severity_percentile": float(row["severity_percentile"]),
"pnr_cost": float(row["pnr_cost"]),
"timing_derate": float(row["timing_derate"]),
"density_cap": float(row["density_cap"]),
"guardband_mv": float(row["guardband_mv"]),
"route_priority": str(row.get("route_priority", "")).upper(),
}
)
return sorted(rows, key=lambda row: int(row["qubit"]))
def parse_companion_lef_text(text: str) -> Dict[str, Any]:
raw = str(text or "")
if not raw.strip():
raise ValueError("Companion LEF is empty.")
sites: Dict[str, Dict[str, Any]] = {}
for match in _LEF_SITE_BLOCK_RE.finditer(raw):
name = str(match.group(1))
block = str(match.group(2))
size_match = _LEF_SIZE_RE.search(block)
if not size_match:
continue
sites[name] = {
"width_microns": float(size_match.group(1)),
"height_microns": float(size_match.group(2)),
}
macros: List[Dict[str, Any]] = []
for match in _LEF_MACRO_BLOCK_RE.finditer(raw):
name = str(match.group(1))
block = str(match.group(2))
size_match = _LEF_SIZE_RE.search(block)
site_match = _LEF_SITE_REF_RE.search(block)
macros.append(
{
"name": name,
"width_microns": None if not size_match else float(size_match.group(1)),
"height_microns": None if not size_match else float(size_match.group(2)),
"site_name": "" if not site_match else str(site_match.group(1)),
}
)
return {
"sites": sites,
"macros": macros,
}
def parse_companion_lib_text(text: str) -> Dict[str, Any]:
raw = str(text or "")
if not raw.strip():
raise ValueError("Companion LIB is empty.")
library_match = _LIB_LIBRARY_RE.search(raw)
if not library_match:
raise ValueError("Companion LIB missing library declaration.")
cells: List[Dict[str, Any]] = []
for match in _LIB_CELL_RE.finditer(raw):
name = str(match.group(1)).strip()
block = str(match.group(2))
area_match = _LIB_AREA_RE.search(block)
comment_match = _LIB_COMMENT_RE.search(block)
comment_text = "" if not comment_match else str(comment_match.group(1))
comment_fields: Dict[str, str] = {}
for item in comment_text.split(";"):
if "=" not in item:
continue
key, value = item.split("=", 1)
comment_fields[str(key).strip().lower()] = str(value).strip()
qubit_text = comment_fields.get("qubit", "")
qubit = None
if qubit_text.startswith("q") and qubit_text[1:].isdigit():
qubit = int(qubit_text[1:])
cells.append(
{
"name": name,
"area": None if not area_match else float(area_match.group(1)),
"comment": comment_text,
"qubit": qubit,
"band": str(comment_fields.get("band", "")).upper(),
"source_metric": str(comment_fields.get("source", "")),
"severity_mode": str(comment_fields.get("mode", "")),
"layout_row": _safe_int(comment_fields.get("row", "")),
"layout_col": _safe_int(comment_fields.get("col", "")),
}
)
return {
"library_name": str(library_match.group(1)).strip(),
"cells": cells,
}
def _rows_by_qubit(rows: Iterable[Dict[str, Any]]) -> Dict[int, Dict[str, Any]]:
return {int(row["qubit"]): dict(row) for row in rows if row.get("qubit") is not None}
def _expected_band_for_row(row: Dict[str, Any], severity_cfg: SeverityConfig) -> str:
mode = str(severity_cfg.mode or "linear").strip().lower()
if mode == "percentile":
percentile = float(row.get("severity_percentile", 0.0))
if percentile >= float(severity_cfg.thresholds.percentile_critical):
return "CRITICAL"
if percentile >= float(severity_cfg.thresholds.percentile_warning):
return "WARNING"
return "OK"
score = float(row.get("severity_score", 0.0))
if score >= float(severity_cfg.thresholds.critical):
return "CRITICAL"
if score >= float(severity_cfg.thresholds.warning):
return "WARNING"
return "OK"
def _compare_expected_row_maps(
checks: List[Dict[str, Any]],
*,
artifact: str,
observed: Dict[int, Dict[str, Any]],
expected: Dict[int, Dict[str, Any]],
float_fields: Sequence[str],
text_fields: Sequence[str],
int_fields: Sequence[str],
) -> None:
observed_qubits = sorted(observed.keys())
expected_qubits = sorted(expected.keys())
if observed_qubits != expected_qubits:
_add_check(
checks,
status="fail",
code=f"{artifact}_qubits",
artifact=artifact,
message=f"Qubit coverage mismatch. observed={observed_qubits} expected={expected_qubits}",
targets=[f"q{q}" for q in sorted(set(observed_qubits) ^ set(expected_qubits))],
)
return
mismatches: List[str] = []
for q in expected_qubits:
obs = observed[q]
exp = expected[q]
for field in float_fields:
if not _float_close(obs.get(field), exp.get(field)):
mismatches.append(f"q{q}:{field}")
break
if mismatches and mismatches[-1].startswith(f"q{q}:"):
continue
for field in text_fields:
if str(obs.get(field, "")).upper() != str(exp.get(field, "")).upper():
mismatches.append(f"q{q}:{field}")
break
if mismatches and mismatches[-1].startswith(f"q{q}:"):
continue
for field in int_fields:
if _safe_int(obs.get(field)) != _safe_int(exp.get(field)):
mismatches.append(f"q{q}:{field}")
break
if mismatches:
_add_check(
checks,
status="fail",
code=f"{artifact}_content",
artifact=artifact,
message=f"Observed rows diverge from expected export state at {', '.join(mismatches[:8])}.",
targets=mismatches[:16],
)
else:
_add_check(
checks,
status="pass",
code=f"{artifact}_content",
artifact=artifact,
message=f"{artifact} rows match the current session state for {len(expected_qubits)} qubits.",
)
def _collect_geometry_checks(
checks: List[Dict[str, Any]],
*,
blockage_rows: Sequence[Dict[str, Any]],
grid_cfg: DEFGridConfig,
chip_rows: int,
chip_cols: int,
imported_design_def_text: str = "",
) -> None:
cell_width = int(grid_cfg.site_width_dbu) * int(grid_cfg.sites_per_cell_x)
cell_height = int(grid_cfg.row_height_dbu) * int(grid_cfg.rows_per_cell_y)
core_x0 = int(grid_cfg.origin_x_dbu)
core_y0 = int(grid_cfg.origin_y_dbu)
core_x1 = core_x0 + int(chip_cols) * cell_width
core_y1 = core_y0 + int(chip_rows) * cell_height
malformed = []
misaligned = []
wrong_size = []
out_of_core = []
for row in blockage_rows:
xl, yl, xh, yh = [int(v) for v in row["rect_dbu"]]
q = int(row.get("qubit", -1))
if xh <= xl or yh <= yl:
malformed.append(q)
if (
(xl - core_x0) % int(grid_cfg.site_width_dbu) != 0
or (xh - core_x0) % int(grid_cfg.site_width_dbu) != 0
or (yl - core_y0) % int(grid_cfg.row_height_dbu) != 0
or (yh - core_y0) % int(grid_cfg.row_height_dbu) != 0
):
misaligned.append(q)
if (xh - xl) != cell_width or (yh - yl) != cell_height:
wrong_size.append(q)
if xl < core_x0 or yl < core_y0 or xh > core_x1 or yh > core_y1:
out_of_core.append(q)
if malformed:
_add_check(checks, status="fail", code="def_malformed", artifact="def", message=f"Malformed DEF rectangles for qubits {malformed}.", targets=[f"q{q}" for q in malformed])
else:
_add_check(checks, status="pass", code="def_malformed", artifact="def", message="All DEF blockage rectangles have positive area.")
if misaligned:
_add_check(checks, status="fail", code="def_alignment", artifact="def", message=f"DEF rectangles are off the site/row grid for qubits {misaligned}.", targets=[f"q{q}" for q in misaligned])
else:
_add_check(checks, status="pass", code="def_alignment", artifact="def", message="All DEF rectangles snap to the configured site/row grid.")
if wrong_size:
_add_check(checks, status="fail", code="def_box_size", artifact="def", message=f"DEF rectangles do not match one heatmap cell span for qubits {wrong_size}.", targets=[f"q{q}" for q in wrong_size])
else:
_add_check(checks, status="pass", code="def_box_size", artifact="def", message="Each DEF blockage covers exactly one heatmap cell span.")
if out_of_core:
_add_check(checks, status="fail", code="def_core_bounds", artifact="def", message=f"DEF rectangles exceed the current core extent for qubits {out_of_core}.", targets=[f"q{q}" for q in out_of_core])
else:
_add_check(checks, status="pass", code="def_core_bounds", artifact="def", message="All DEF rectangles stay within the current core extent.")
imported_def_meta = parse_design_def(imported_design_def_text) if str(imported_design_def_text or "").strip() else {}
diearea = imported_def_meta.get("diearea")
if diearea is None:
_add_check(checks, status="warn", code="def_diearea_bounds", artifact="def", message="Imported DEF DIEAREA not available; external floorplan-bound check was skipped.")
return
die_x0, die_y0, die_x1, die_y1 = [int(v) for v in diearea]
outside_die = []
for row in blockage_rows:
xl, yl, xh, yh = [int(v) for v in row["rect_dbu"]]
q = int(row.get("qubit", -1))
if xl < die_x0 or yl < die_y0 or xh > die_x1 or yh > die_y1:
outside_die.append(q)
if outside_die:
_add_check(checks, status="fail", code="def_diearea_bounds", artifact="def", message=f"DEF rectangles exceed imported DEF DIEAREA for qubits {outside_die}.", targets=[f"q{q}" for q in outside_die])
else:
_add_check(checks, status="pass", code="def_diearea_bounds", artifact="def", message="All DEF rectangles stay within the imported DEF DIEAREA.")
def validate_design_exports(
*,
metrics_csv_text: str,
severity_csv_text: str,
synopsys_tcl_text: str,
cadence_skill_text: str,
def_fragment_text: str,
companion_lef_text: str,
companion_lib_text: str,
expected_metrics: Dict[str, Any],
expected_severity_rows: Sequence[Dict[str, Any]],
expected_mapping_rows: Sequence[Dict[str, Any]],
expected_blockage_rows: Sequence[Dict[str, Any]],
severity_cfg: SeverityConfig,
grid_cfg: DEFGridConfig,
chip_rows: int,
chip_cols: int,
dbu_per_micron: int,
site_name: str,
imported_design_def_text: str = "",
) -> Dict[str, Any]:
checks: List[Dict[str, Any]] = []
expected_metrics_rows = []
qubit_count = int(len(expected_metrics.get("activity_count", [])))
for q in range(qubit_count):
expected_metrics_rows.append(
{
"qubit": int(q),
"activity_count": float(expected_metrics["activity_count"][q]),
"activity_norm": float(expected_metrics["activity_norm"][q]),
"gate_error": float(expected_metrics["gate_error"][q]),
"readout_error": float(expected_metrics["readout_error"][q]),
"fidelity": float(expected_metrics["fidelity"][q]),
"state_fidelity": float(expected_metrics["state_fidelity"][q]),
"process_fidelity": float(expected_metrics["process_fidelity"][q]),
"coherence_health": float(expected_metrics["coherence_health"][q]),
"decoherence_risk": float(expected_metrics["decoherence_risk"][q]),
"composite_risk": float(expected_metrics["composite_risk"][q]),
"hotspot_level": int(expected_metrics["hotspot_level"][q]),
}
)
parsed_metrics = parse_metrics_csv_text(metrics_csv_text)
_compare_expected_row_maps(
checks,
artifact="metrics_csv",
observed=_rows_by_qubit(parsed_metrics),
expected=_rows_by_qubit(expected_metrics_rows),
float_fields=[
"activity_count",
"activity_norm",
"gate_error",
"readout_error",
"fidelity",
"state_fidelity",
"process_fidelity",
"coherence_health",
"decoherence_risk",
"composite_risk",
],
text_fields=[],
int_fields=["hotspot_level"],
)
parsed_severity = parse_severity_csv_text(severity_csv_text)
expected_severity_map = _rows_by_qubit(expected_severity_rows)
_compare_expected_row_maps(
checks,
artifact="severity_csv",
observed=_rows_by_qubit(parsed_severity),
expected=expected_severity_map,
float_fields=[
"base_risk",
"severity_score",
"severity_percentile",
"pnr_cost",
"timing_derate",
"density_cap",
"guardband_mv",
],
text_fields=["source_metric", "severity_mode", "severity_band", "route_priority"],
int_fields=["layout_row", "layout_col", "severity_rank"],
)
threshold_mismatches = []
for row in parsed_severity:
expected_band = _expected_band_for_row(row, severity_cfg)
if str(row.get("severity_band", "")).upper() != str(expected_band).upper():
threshold_mismatches.append(int(row["qubit"]))
if threshold_mismatches:
_add_check(
checks,
status="fail",
code="severity_thresholds",
artifact="severity_csv",
message=f"Severity bands do not match the current threshold model for qubits {threshold_mismatches}.",
targets=[f"q{q}" for q in threshold_mismatches],
)
else:
_add_check(
checks,
status="pass",
code="severity_thresholds",
artifact="severity_csv",
message="Severity bands align with the current threshold model.",
)
parsed_tcl_rows, tcl_meta = parse_eda_text(synopsys_tcl_text, "synopsys_tcl")
parsed_skill_rows, skill_meta = parse_eda_text(cadence_skill_text, "cadence_skill")
parsed_def_rows, def_meta = parse_def_blockages(def_fragment_text)
expected_mapping_map = _rows_by_qubit(expected_mapping_rows)
expected_def_map = _rows_by_qubit(expected_blockage_rows)
_compare_expected_row_maps(
checks,
artifact="synopsys_tcl",
observed=_rows_by_qubit(parsed_tcl_rows),
expected=expected_mapping_map,
float_fields=[
"composite_risk",
"severity_score",
"severity_percentile",
"pnr_cost",
"timing_derate",
"density_cap",
"guardband_mv",
],
text_fields=["tier", "route_priority", "source_metric", "severity_mode"],
int_fields=["layout_row", "layout_col"],
)
_compare_expected_row_maps(
checks,
artifact="cadence_skill",
observed=_rows_by_qubit(parsed_skill_rows),
expected=expected_mapping_map,
float_fields=[
"composite_risk",
"severity_score",
"severity_percentile",
"pnr_cost",
"timing_derate",
"density_cap",
"guardband_mv",
],
text_fields=["tier", "route_priority", "source_metric", "severity_mode"],
int_fields=["layout_row", "layout_col"],
)
_compare_expected_row_maps(
checks,
artifact="def",
observed=_rows_by_qubit(parsed_def_rows),
expected=expected_def_map,
float_fields=["density", "risk_value", "severity_percentile", "pnr_cost", "density_cap"],
text_fields=["severity", "mode", "source_metric", "severity_mode", "route_priority"],
int_fields=["layout_row", "layout_col"],
)
observed_def_map = _rows_by_qubit(parsed_def_rows)
def_rect_mismatch = []
for q, expected_row in expected_def_map.items():
observed_row = observed_def_map.get(q)
if observed_row is None or tuple(observed_row.get("rect_dbu", ())) != tuple(expected_row.get("rect_dbu", ())):
def_rect_mismatch.append(q)
if def_rect_mismatch:
_add_check(checks, status="fail", code="def_rectangles", artifact="def", message=f"DEF rectangle mismatch for qubits {def_rect_mismatch}.", targets=[f"q{q}" for q in def_rect_mismatch])
else:
_add_check(checks, status="pass", code="def_rectangles", artifact="def", message="DEF rectangles match the current exported blockage geometry.")
if int(def_meta.get("declared_blockages") or -1) != len(parsed_def_rows):
_add_check(
checks,
status="fail",
code="def_declared_count",
artifact="def",
message=f"DEF declares {def_meta.get('declared_blockages')} blockages but parses {len(parsed_def_rows)} rows.",
)
else:
_add_check(checks, status="pass", code="def_declared_count", artifact="def", message=f"DEF declaration count matches parsed rows ({len(parsed_def_rows)}).")
_collect_geometry_checks(
checks,
blockage_rows=parsed_def_rows,
grid_cfg=grid_cfg,
chip_rows=int(chip_rows),
chip_cols=int(chip_cols),
imported_design_def_text=imported_design_def_text,
)
parsed_lef = parse_companion_lef_text(companion_lef_text)
parsed_lib = parse_companion_lib_text(companion_lib_text)
expected_site = str(site_name or "QUREAD_SITE")
site_info = parsed_lef["sites"].get(expected_site)
expected_site_width_um = float(int(grid_cfg.site_width_dbu)) / float(max(1, int(dbu_per_micron)))
expected_row_height_um = float(int(grid_cfg.row_height_dbu)) / float(max(1, int(dbu_per_micron)))
expected_cell_width_um = float(int(grid_cfg.site_width_dbu) * int(grid_cfg.sites_per_cell_x)) / float(max(1, int(dbu_per_micron)))
expected_cell_height_um = float(int(grid_cfg.row_height_dbu) * int(grid_cfg.rows_per_cell_y)) / float(max(1, int(dbu_per_micron)))
if not site_info:
_add_check(checks, status="fail", code="lef_site", artifact="lef", message=f"Companion LEF is missing expected site `{expected_site}`.")
elif not (_float_close(site_info["width_microns"], expected_site_width_um) and _float_close(site_info["height_microns"], expected_row_height_um)):
_add_check(checks, status="fail", code="lef_site", artifact="lef", message="Companion LEF site size does not match current DBU/grid settings.")
else:
_add_check(checks, status="pass", code="lef_site", artifact="lef", message="Companion LEF site declaration matches current DBU/grid settings.")
macros_by_name = {str(item["name"]): item for item in parsed_lef.get("macros", [])}
macro_expected_count = len(expected_severity_rows) + 3
if len(macros_by_name) != macro_expected_count:
_add_check(checks, status="fail", code="lef_macro_count", artifact="lef", message=f"Companion LEF macro count mismatch. observed={len(macros_by_name)} expected={macro_expected_count}.")
else:
_add_check(checks, status="pass", code="lef_macro_count", artifact="lef", message=f"Companion LEF contains the expected {macro_expected_count} macros.")
missing_lef_macros = []
bad_lef_macros = []
for row in expected_severity_rows:
name = f"QUREAD_Q{int(row['qubit'])}_{str(row.get('severity_band', 'OK')).upper()}"
macro = macros_by_name.get(name)
if macro is None:
missing_lef_macros.append(name)
continue
if not (
_float_close(macro.get("width_microns"), expected_cell_width_um)
and _float_close(macro.get("height_microns"), expected_cell_height_um)
and str(macro.get("site_name", "")) == expected_site
):
bad_lef_macros.append(name)
if missing_lef_macros or bad_lef_macros:
_add_check(
checks,
status="fail",
code="lef_macro_content",
artifact="lef",
message=f"Companion LEF macro mismatch. missing={missing_lef_macros[:6]} bad={bad_lef_macros[:6]}",
targets=missing_lef_macros[:6] + bad_lef_macros[:6],
)
else:
_add_check(checks, status="pass", code="lef_macro_content", artifact="lef", message="Companion LEF qubit macros match current severity rows.")
cells_by_name = {str(item["name"]): item for item in parsed_lib.get("cells", [])}
if len(cells_by_name) != len(expected_severity_rows):
_add_check(checks, status="fail", code="lib_cell_count", artifact="lib", message=f"Companion LIB cell count mismatch. observed={len(cells_by_name)} expected={len(expected_severity_rows)}.")
else:
_add_check(checks, status="pass", code="lib_cell_count", artifact="lib", message=f"Companion LIB contains the expected {len(expected_severity_rows)} cells.")
lib_mismatches = []
for row in expected_severity_rows:
name = f"QUREAD_Q{int(row['qubit'])}_{str(row.get('severity_band', 'OK')).upper()}"
cell = cells_by_name.get(name)
if cell is None:
lib_mismatches.append(name)
continue
if (
int(cell.get("qubit", -1)) != int(row["qubit"])
or str(cell.get("band", "")).upper() != str(row.get("severity_band", "")).upper()
or str(cell.get("source_metric", "")) != str(row.get("source_metric", ""))
or str(cell.get("severity_mode", "")) != str(row.get("severity_mode", ""))
or _safe_int(cell.get("layout_row")) != _safe_int(row.get("layout_row"))
or _safe_int(cell.get("layout_col")) != _safe_int(row.get("layout_col"))
):
lib_mismatches.append(name)
if lib_mismatches:
_add_check(checks, status="fail", code="lib_cell_content", artifact="lib", message=f"Companion LIB comments diverge from current severity rows for {lib_mismatches[:8]}.", targets=lib_mismatches[:8])
else:
_add_check(checks, status="pass", code="lib_cell_content", artifact="lib", message="Companion LIB annotations match current severity rows.")
if int(tcl_meta.get("parsed_rows", 0)) != int(skill_meta.get("parsed_rows", 0)):
_add_check(checks, status="fail", code="eda_row_parity", artifact="eda", message="Synopsys TCL and Cadence SKILL parse different row counts.")
else:
_add_check(checks, status="pass", code="eda_row_parity", artifact="eda", message=f"Synopsys TCL and Cadence SKILL both parse {int(tcl_meta.get('parsed_rows', 0))} rows.")
summary = {
"overall_status": _final_status(checks),
"check_count": len(checks),
"pass_count": sum(1 for check in checks if str(check["status"]) == "PASS"),
"warn_count": sum(1 for check in checks if str(check["status"]) == "WARN"),
"fail_count": sum(1 for check in checks if str(check["status"]) == "FAIL"),
}
artifact_counts = {
"metrics_rows": len(parsed_metrics),
"severity_rows": len(parsed_severity),
"synopsys_rows": len(parsed_tcl_rows),
"cadence_rows": len(parsed_skill_rows),
"def_rows": len(parsed_def_rows),
"lef_macros": len(parsed_lef.get("macros", [])),
"lib_cells": len(parsed_lib.get("cells", [])),
}
return {
"summary": summary,
"settings": {
"severity_mode": str(severity_cfg.mode),
"severity_source_metric": str(severity_cfg.source_metric),
"warning_threshold": float(severity_cfg.thresholds.warning),
"critical_threshold": float(severity_cfg.thresholds.critical),
"percentile_warning": float(severity_cfg.thresholds.percentile_warning),
"percentile_critical": float(severity_cfg.thresholds.percentile_critical),
"grid": {
"origin_x_dbu": int(grid_cfg.origin_x_dbu),
"origin_y_dbu": int(grid_cfg.origin_y_dbu),
"site_width_dbu": int(grid_cfg.site_width_dbu),
"row_height_dbu": int(grid_cfg.row_height_dbu),
"sites_per_cell_x": int(grid_cfg.sites_per_cell_x),
"rows_per_cell_y": int(grid_cfg.rows_per_cell_y),
},
"chip_rows": int(chip_rows),
"chip_cols": int(chip_cols),
"dbu_per_micron": int(dbu_per_micron),
"site_name": str(site_name or "QUREAD_SITE"),
},
"artifact_counts": artifact_counts,
"checks": checks,
}
def validation_table_rows(report: Dict[str, Any]) -> List[List[Any]]:
rows = []
for check in report.get("checks", []):
rows.append(
[
str(check.get("status", "")),
str(check.get("artifact", "")),
str(check.get("code", "")),
str(check.get("message", "")),
]
)
rows.sort(key=lambda row: (_status_rank(row[0]), row[1], row[2]), reverse=True)
return rows
def validation_artifact_summary_rows(report: Dict[str, Any]) -> List[List[Any]]:
buckets: Dict[str, Dict[str, int]] = {}
for check in report.get("checks", []):
artifact = str(check.get("artifact", "unknown") or "unknown")
status = str(check.get("status", "PASS") or "PASS").upper()
bucket = buckets.setdefault(artifact, {"PASS": 0, "WARN": 0, "FAIL": 0})
if status not in bucket:
bucket[status] = 0
bucket[status] += 1
rows: List[List[Any]] = []
for artifact, counts in buckets.items():
if int(counts.get("FAIL", 0)) > 0:
overall = "FAIL"
elif int(counts.get("WARN", 0)) > 0:
overall = "WARN"
else:
overall = "PASS"
rows.append(
[
artifact,
int(counts.get("PASS", 0)),
int(counts.get("WARN", 0)),
int(counts.get("FAIL", 0)),
overall,
]
)
rows.sort(key=lambda row: (_status_rank(row[4]), row[3], row[2], row[0]), reverse=True)
return rows
def validation_focus_rows(report: Dict[str, Any], top_k: int = 12) -> List[List[Any]]:
rows: List[List[Any]] = []
for check in report.get("checks", []):
status = str(check.get("status", "PASS") or "PASS").upper()
if status == "PASS":
continue
targets = [str(item) for item in (check.get("targets") or []) if str(item).strip()]
target_text = ", ".join(targets[:8]) if targets else "-"
if len(targets) > 8:
target_text += ", ..."
rows.append(
[
status,
str(check.get("artifact", "")),
str(check.get("code", "")),
target_text,
str(check.get("message", "")),
]
)
rows.sort(key=lambda row: (_status_rank(row[0]), row[1], row[2]), reverse=True)
return rows[: max(1, int(top_k))] if rows else []
def validation_focus_markdown(report: Dict[str, Any], top_k: int = 4) -> str:
rows = validation_focus_rows(report, top_k=top_k)
if not rows:
return "### Mismatch Focus\n- No warnings or failures. Current package checks are clean."
lines = ["### Mismatch Focus"]
for status, artifact, code, targets, message in rows[: max(1, int(top_k))]:
lines.append(
f"- `{status}` `{artifact}` `{code}`"
+ ("" if targets == "-" else f" targets=`{targets}`")
+ f": {message}"
)
return "\n".join(lines)
def validation_summary_markdown(report: Dict[str, Any]) -> str:
summary = dict(report.get("summary") or {})
settings = dict(report.get("settings") or {})
counts = dict(report.get("artifact_counts") or {})
return (
f"### Validation Summary\n"
f"- Overall status: `{summary.get('overall_status', 'UNKNOWN')}`\n"
f"- Checks: `{summary.get('check_count', 0)}` total | "
f"`{summary.get('pass_count', 0)}` pass | `{summary.get('warn_count', 0)}` warn | `{summary.get('fail_count', 0)}` fail\n"
f"- Severity policy: mode=`{settings.get('severity_mode', 'linear')}` source=`{settings.get('severity_source_metric', 'composite_risk')}`\n"
f"- Grid: site=`{settings.get('grid', {}).get('site_width_dbu', '')} x {settings.get('grid', {}).get('row_height_dbu', '')}` DBU | "
f"cell=`{settings.get('grid', {}).get('sites_per_cell_x', '')} x {settings.get('grid', {}).get('rows_per_cell_y', '')}` | "
f"chip=`{settings.get('chip_rows', '')} x {settings.get('chip_cols', '')}`\n"
f"- Parsed artifacts: metrics=`{counts.get('metrics_rows', 0)}`, severity=`{counts.get('severity_rows', 0)}`, "
f"TCL=`{counts.get('synopsys_rows', 0)}`, SKILL=`{counts.get('cadence_rows', 0)}`, DEF=`{counts.get('def_rows', 0)}`, "
f"LEF macros=`{counts.get('lef_macros', 0)}`, LIB cells=`{counts.get('lib_cells', 0)}`"
)
def build_validation_report_html(report: Dict[str, Any]) -> str:
summary = dict(report.get("summary") or {})
settings = dict(report.get("settings") or {})
counts = dict(report.get("artifact_counts") or {})
rows = validation_table_rows(report)
artifact_rows = validation_artifact_summary_rows(report)
focus_rows = validation_focus_rows(report, top_k=12)
table_rows = "".join(
"
"
+ "".join(f"| {html.escape(str(cell))} | " for cell in row)
+ "
"
for row in rows
) or "| No checks. |
"
artifact_table_rows = "".join(
""
+ "".join(f"| {html.escape(str(cell))} | " for cell in row)
+ "
"
for row in artifact_rows
) or "| No artifacts. |
"
focus_table_rows = "".join(
""
+ "".join(f"| {html.escape(str(cell))} | " for cell in row)
+ "
"
for row in focus_rows
) or "| No warnings or failures. |
"
return f"""
Quread Validation Report
Quread Validation Report
Internal package validation for the current heatmap, severity, EDA, and DEF export state.
Summary
- Overall status: {html.escape(str(summary.get("overall_status", "UNKNOWN")))}
- Checks: {int(summary.get("check_count", 0))} total, {int(summary.get("pass_count", 0))} pass, {int(summary.get("warn_count", 0))} warn, {int(summary.get("fail_count", 0))} fail
- Severity policy: mode={html.escape(str(settings.get("severity_mode", "")))}, source={html.escape(str(settings.get("severity_source_metric", "")))}
Artifact Counts
- Metrics rows: {int(counts.get("metrics_rows", 0))}
- Severity rows: {int(counts.get("severity_rows", 0))}
- TCL rows: {int(counts.get("synopsys_rows", 0))}
- SKILL rows: {int(counts.get("cadence_rows", 0))}
- DEF rows: {int(counts.get("def_rows", 0))}
- LEF macros: {int(counts.get("lef_macros", 0))}
- LIB cells: {int(counts.get("lib_cells", 0))}
Artifact Summary
| Artifact | Pass | Warn | Fail | Overall |
{artifact_table_rows}
Mismatch Focus
| Status | Artifact | Code | Targets | Message |
{focus_table_rows}
Checks
| Status | Artifact | Code | Message |
{table_rows}
"""
def validation_report_json(report: Dict[str, Any]) -> str:
return json.dumps(report, indent=2, sort_keys=True)