from __future__ import annotations import csv import io from dataclasses import dataclass, field from typing import Any, Dict, List, Tuple import numpy as np _DIRECT_RISK_METRICS = { "activity_norm", "gate_error", "readout_error", "decoherence_risk", "composite_risk", } _INVERTED_RISK_METRICS = { "coherence_health", "fidelity", "state_fidelity", "process_fidelity", } @dataclass class SeverityWeights: activity: float = 0.25 gate_error: float = 0.20 readout_error: float = 0.15 decoherence: float = 0.25 fidelity: float = 0.15 @dataclass class SeverityThresholds: warning: float = 0.45 critical: float = 0.70 percentile_warning: float = 67.0 percentile_critical: float = 90.0 @dataclass class PNRScaling: max_timing_derate: float = 1.15 base_guardband_mv: float = 5.0 max_guardband_mv: float = 60.0 min_density_cap: float = 40.0 max_density_cap: float = 100.0 pnr_cost_scale: float = 100.0 @dataclass class SeverityConfig: mode: str = "linear" source_metric: str = "composite_risk" thresholds: SeverityThresholds = field(default_factory=SeverityThresholds) weights: SeverityWeights = field(default_factory=SeverityWeights) pnr_scaling: PNRScaling = field(default_factory=PNRScaling) def _clip01(v: float) -> float: return float(np.clip(float(v), 0.0, 1.0)) def _normalize_weights(weights: SeverityWeights) -> SeverityWeights: vals = np.array( [ float(weights.activity), float(weights.gate_error), float(weights.readout_error), float(weights.decoherence), float(weights.fidelity), ], dtype=float, ) total = float(np.sum(vals)) if total <= 0: vals = np.array([0.25, 0.20, 0.15, 0.25, 0.15], dtype=float) total = float(np.sum(vals)) vals = vals / total return SeverityWeights( activity=float(vals[0]), gate_error=float(vals[1]), readout_error=float(vals[2]), decoherence=float(vals[3]), fidelity=float(vals[4]), ) def _clamp_thresholds(thresholds: SeverityThresholds) -> SeverityThresholds: warning = _clip01(float(thresholds.warning)) critical = _clip01(float(thresholds.critical)) if critical < warning: critical = warning pct_warning = float(np.clip(float(thresholds.percentile_warning), 0.0, 100.0)) pct_critical = float(np.clip(float(thresholds.percentile_critical), 0.0, 100.0)) if pct_critical < pct_warning: pct_critical = pct_warning return SeverityThresholds( warning=warning, critical=critical, percentile_warning=pct_warning, percentile_critical=pct_critical, ) def _normalized_metric_risk(metrics: Dict[str, np.ndarray], metric_name: str) -> Tuple[np.ndarray, str]: key = str(metric_name or "composite_risk").strip().lower() if key == "activity_count": if "activity_norm" not in metrics: raise ValueError("Severity mapper requires activity_norm for activity_count mode.") return np.asarray(metrics["activity_norm"], dtype=float), "activity_count" if key in _DIRECT_RISK_METRICS: if key not in metrics: raise ValueError(f"Severity mapper missing metric: {key}") return np.asarray(metrics[key], dtype=float), key if key in _INVERTED_RISK_METRICS: if key not in metrics: raise ValueError(f"Severity mapper missing metric: {key}") return 1.0 - np.asarray(metrics[key], dtype=float), key raise ValueError(f"Unsupported severity source metric: {metric_name}") def _weighted_risk(metrics: Dict[str, np.ndarray], weights: SeverityWeights) -> np.ndarray: for key in ("activity_norm", "gate_error", "readout_error", "decoherence_risk", "fidelity"): if key not in metrics: raise ValueError(f"Severity mapper missing metric: {key}") fidelity_risk = 1.0 - np.asarray(metrics["fidelity"], dtype=float) return np.clip( float(weights.activity) * np.asarray(metrics["activity_norm"], dtype=float) + float(weights.gate_error) * np.asarray(metrics["gate_error"], dtype=float) + float(weights.readout_error) * np.asarray(metrics["readout_error"], dtype=float) + float(weights.decoherence) * np.asarray(metrics["decoherence_risk"], dtype=float) + float(weights.fidelity) * fidelity_risk, 0.0, 1.0, ) def _percentile_desc(values: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: arr = np.asarray(values, dtype=float).reshape(-1) n = int(arr.size) if n == 0: return np.zeros((0,), dtype=float), np.zeros((0,), dtype=int) order = np.argsort(-arr, kind="stable") ranks = np.empty((n,), dtype=int) ranks[order] = np.arange(1, n + 1, dtype=int) if n == 1: pct = np.array([100.0], dtype=float) else: pct = 100.0 * (1.0 - (ranks.astype(float) - 1.0) / float(n - 1)) return pct, ranks def _band_from_score(score: float, thresholds: SeverityThresholds) -> str: if float(score) >= float(thresholds.critical): return "CRITICAL" if float(score) >= float(thresholds.warning): return "WARNING" return "OK" def _band_from_percentile(percentile: float, thresholds: SeverityThresholds) -> str: if float(percentile) >= float(thresholds.percentile_critical): return "CRITICAL" if float(percentile) >= float(thresholds.percentile_warning): return "WARNING" return "OK" def compute_severity_rows( metrics: Dict[str, np.ndarray], *, cfg: SeverityConfig | None = None, qubit_coords: Dict[int, Tuple[int, int]] | None = None, ) -> List[Dict[str, Any]]: cfg = cfg or SeverityConfig() mode = str(cfg.mode or "linear").strip().lower() thresholds = _clamp_thresholds(cfg.thresholds) weights = _normalize_weights(cfg.weights) scale = cfg.pnr_scaling if mode in ("linear", "bucket", "percentile"): base_risk, source_metric = _normalized_metric_risk(metrics, cfg.source_metric) elif mode in ("weighted", "pnr_cost"): base_risk = _weighted_risk(metrics, weights) source_metric = "weighted_blend" else: raise ValueError(f"Unsupported severity mode: {cfg.mode}") base_risk = np.clip(np.asarray(base_risk, dtype=float), 0.0, 1.0) pct, ranks = _percentile_desc(base_risk) if mode == "linear": severity_score = np.asarray(base_risk, dtype=float) bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object) elif mode == "bucket": bands = np.array([_band_from_score(v, thresholds) for v in base_risk], dtype=object) severity_score = np.array( [0.0 if b == "OK" else (0.5 if b == "WARNING" else 1.0) for b in bands], dtype=float, ) elif mode == "percentile": severity_score = np.asarray(pct, dtype=float) / 100.0 bands = np.array([_band_from_percentile(v, thresholds) for v in pct], dtype=object) else: severity_score = np.asarray(base_risk, dtype=float) bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object) severity_pct, severity_ranks = _percentile_desc(severity_score) if mode == "pnr_cost": pnr_cost_norm = np.clip(0.7 * severity_score + 0.3 * (severity_pct / 100.0), 0.0, 1.0) else: pnr_cost_norm = np.asarray(severity_score, dtype=float) timing_derate = 1.0 + pnr_cost_norm * (float(scale.max_timing_derate) - 1.0) guardband_mv = float(scale.base_guardband_mv) + pnr_cost_norm * ( float(scale.max_guardband_mv) - float(scale.base_guardband_mv) ) density_cap = np.clip( float(scale.max_density_cap) - pnr_cost_norm * (float(scale.max_density_cap) - float(scale.min_density_cap)), float(scale.min_density_cap), float(scale.max_density_cap), ) pnr_cost = np.clip(pnr_cost_norm, 0.0, 1.0) * float(scale.pnr_cost_scale) rows: List[Dict[str, Any]] = [] n = int(len(base_risk)) for q in range(n): rr = None cc = None if qubit_coords and int(q) in qubit_coords: rr, cc = qubit_coords[int(q)] band = str(bands[q]) route_priority = "HIGH" if band == "CRITICAL" else ("MEDIUM" if band == "WARNING" else "LOW") rows.append( { "qubit": int(q), "layout_row": None if rr is None else int(rr), "layout_col": None if cc is None else int(cc), "source_metric": str(source_metric), "severity_mode": mode, "base_risk": float(base_risk[q]), "severity_score": float(np.clip(severity_score[q], 0.0, 1.0)), "severity_band": band, "severity_rank": int(severity_ranks[q]), "severity_percentile": float(severity_pct[q]), "pnr_cost": float(pnr_cost[q]), "timing_derate": float(timing_derate[q]), "density_cap": float(density_cap[q]), "guardband_mv": float(guardband_mv[q]), "route_priority": route_priority, } ) rows.sort( key=lambda row: ( 1 if str(row["severity_band"]) == "CRITICAL" else (0 if str(row["severity_band"]) == "WARNING" else -1), float(row["severity_score"]), -int(row["severity_rank"]), -int(row["qubit"]), ), reverse=True, ) return rows def severity_rows_to_csv(rows: List[Dict[str, Any]]) -> str: out = io.StringIO() writer = csv.writer(out) writer.writerow( [ "qubit", "layout_row", "layout_col", "source_metric", "severity_mode", "base_risk", "severity_score", "severity_band", "severity_rank", "severity_percentile", "pnr_cost", "timing_derate", "density_cap", "guardband_mv", "route_priority", ] ) for row in rows: writer.writerow( [ int(row["qubit"]), "" if row.get("layout_row") is None else int(row["layout_row"]), "" if row.get("layout_col") is None else int(row["layout_col"]), str(row.get("source_metric", "")), str(row.get("severity_mode", "")), f"{float(row.get('base_risk', 0.0)):.6g}", f"{float(row.get('severity_score', 0.0)):.6g}", str(row.get("severity_band", "")), int(row.get("severity_rank", 0)), f"{float(row.get('severity_percentile', 0.0)):.6g}", f"{float(row.get('pnr_cost', 0.0)):.6g}", f"{float(row.get('timing_derate', 1.0)):.6g}", f"{float(row.get('density_cap', 100.0)):.6g}", f"{float(row.get('guardband_mv', 0.0)):.6g}", str(row.get("route_priority", "")), ] ) return out.getvalue()