| from __future__ import annotations |
|
|
| import csv |
| import io |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Tuple |
|
|
| import numpy as np |
|
|
|
|
| _DIRECT_RISK_METRICS = { |
| "activity_norm", |
| "gate_error", |
| "readout_error", |
| "decoherence_risk", |
| "composite_risk", |
| } |
|
|
| _INVERTED_RISK_METRICS = { |
| "coherence_health", |
| "fidelity", |
| "state_fidelity", |
| "process_fidelity", |
| } |
|
|
|
|
| @dataclass |
| class SeverityWeights: |
| activity: float = 0.25 |
| gate_error: float = 0.20 |
| readout_error: float = 0.15 |
| decoherence: float = 0.25 |
| fidelity: float = 0.15 |
|
|
|
|
| @dataclass |
| class SeverityThresholds: |
| warning: float = 0.45 |
| critical: float = 0.70 |
| percentile_warning: float = 67.0 |
| percentile_critical: float = 90.0 |
|
|
|
|
| @dataclass |
| class PNRScaling: |
| max_timing_derate: float = 1.15 |
| base_guardband_mv: float = 5.0 |
| max_guardband_mv: float = 60.0 |
| min_density_cap: float = 40.0 |
| max_density_cap: float = 100.0 |
| pnr_cost_scale: float = 100.0 |
|
|
|
|
| @dataclass |
| class SeverityConfig: |
| mode: str = "linear" |
| source_metric: str = "composite_risk" |
| thresholds: SeverityThresholds = field(default_factory=SeverityThresholds) |
| weights: SeverityWeights = field(default_factory=SeverityWeights) |
| pnr_scaling: PNRScaling = field(default_factory=PNRScaling) |
|
|
|
|
| def _clip01(v: float) -> float: |
| return float(np.clip(float(v), 0.0, 1.0)) |
|
|
|
|
| def _normalize_weights(weights: SeverityWeights) -> SeverityWeights: |
| vals = np.array( |
| [ |
| float(weights.activity), |
| float(weights.gate_error), |
| float(weights.readout_error), |
| float(weights.decoherence), |
| float(weights.fidelity), |
| ], |
| dtype=float, |
| ) |
| total = float(np.sum(vals)) |
| if total <= 0: |
| vals = np.array([0.25, 0.20, 0.15, 0.25, 0.15], dtype=float) |
| total = float(np.sum(vals)) |
| vals = vals / total |
| return SeverityWeights( |
| activity=float(vals[0]), |
| gate_error=float(vals[1]), |
| readout_error=float(vals[2]), |
| decoherence=float(vals[3]), |
| fidelity=float(vals[4]), |
| ) |
|
|
|
|
| def _clamp_thresholds(thresholds: SeverityThresholds) -> SeverityThresholds: |
| warning = _clip01(float(thresholds.warning)) |
| critical = _clip01(float(thresholds.critical)) |
| if critical < warning: |
| critical = warning |
| pct_warning = float(np.clip(float(thresholds.percentile_warning), 0.0, 100.0)) |
| pct_critical = float(np.clip(float(thresholds.percentile_critical), 0.0, 100.0)) |
| if pct_critical < pct_warning: |
| pct_critical = pct_warning |
| return SeverityThresholds( |
| warning=warning, |
| critical=critical, |
| percentile_warning=pct_warning, |
| percentile_critical=pct_critical, |
| ) |
|
|
|
|
| def _normalized_metric_risk(metrics: Dict[str, np.ndarray], metric_name: str) -> Tuple[np.ndarray, str]: |
| key = str(metric_name or "composite_risk").strip().lower() |
| if key == "activity_count": |
| if "activity_norm" not in metrics: |
| raise ValueError("Severity mapper requires activity_norm for activity_count mode.") |
| return np.asarray(metrics["activity_norm"], dtype=float), "activity_count" |
| if key in _DIRECT_RISK_METRICS: |
| if key not in metrics: |
| raise ValueError(f"Severity mapper missing metric: {key}") |
| return np.asarray(metrics[key], dtype=float), key |
| if key in _INVERTED_RISK_METRICS: |
| if key not in metrics: |
| raise ValueError(f"Severity mapper missing metric: {key}") |
| return 1.0 - np.asarray(metrics[key], dtype=float), key |
| raise ValueError(f"Unsupported severity source metric: {metric_name}") |
|
|
|
|
| def _weighted_risk(metrics: Dict[str, np.ndarray], weights: SeverityWeights) -> np.ndarray: |
| for key in ("activity_norm", "gate_error", "readout_error", "decoherence_risk", "fidelity"): |
| if key not in metrics: |
| raise ValueError(f"Severity mapper missing metric: {key}") |
| fidelity_risk = 1.0 - np.asarray(metrics["fidelity"], dtype=float) |
| return np.clip( |
| float(weights.activity) * np.asarray(metrics["activity_norm"], dtype=float) |
| + float(weights.gate_error) * np.asarray(metrics["gate_error"], dtype=float) |
| + float(weights.readout_error) * np.asarray(metrics["readout_error"], dtype=float) |
| + float(weights.decoherence) * np.asarray(metrics["decoherence_risk"], dtype=float) |
| + float(weights.fidelity) * fidelity_risk, |
| 0.0, |
| 1.0, |
| ) |
|
|
|
|
| def _percentile_desc(values: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| arr = np.asarray(values, dtype=float).reshape(-1) |
| n = int(arr.size) |
| if n == 0: |
| return np.zeros((0,), dtype=float), np.zeros((0,), dtype=int) |
| order = np.argsort(-arr, kind="stable") |
| ranks = np.empty((n,), dtype=int) |
| ranks[order] = np.arange(1, n + 1, dtype=int) |
| if n == 1: |
| pct = np.array([100.0], dtype=float) |
| else: |
| pct = 100.0 * (1.0 - (ranks.astype(float) - 1.0) / float(n - 1)) |
| return pct, ranks |
|
|
|
|
| def _band_from_score(score: float, thresholds: SeverityThresholds) -> str: |
| if float(score) >= float(thresholds.critical): |
| return "CRITICAL" |
| if float(score) >= float(thresholds.warning): |
| return "WARNING" |
| return "OK" |
|
|
|
|
| def _band_from_percentile(percentile: float, thresholds: SeverityThresholds) -> str: |
| if float(percentile) >= float(thresholds.percentile_critical): |
| return "CRITICAL" |
| if float(percentile) >= float(thresholds.percentile_warning): |
| return "WARNING" |
| return "OK" |
|
|
|
|
| def compute_severity_rows( |
| metrics: Dict[str, np.ndarray], |
| *, |
| cfg: SeverityConfig | None = None, |
| qubit_coords: Dict[int, Tuple[int, int]] | None = None, |
| ) -> List[Dict[str, Any]]: |
| cfg = cfg or SeverityConfig() |
| mode = str(cfg.mode or "linear").strip().lower() |
| thresholds = _clamp_thresholds(cfg.thresholds) |
| weights = _normalize_weights(cfg.weights) |
| scale = cfg.pnr_scaling |
|
|
| if mode in ("linear", "bucket", "percentile"): |
| base_risk, source_metric = _normalized_metric_risk(metrics, cfg.source_metric) |
| elif mode in ("weighted", "pnr_cost"): |
| base_risk = _weighted_risk(metrics, weights) |
| source_metric = "weighted_blend" |
| else: |
| raise ValueError(f"Unsupported severity mode: {cfg.mode}") |
|
|
| base_risk = np.clip(np.asarray(base_risk, dtype=float), 0.0, 1.0) |
| pct, ranks = _percentile_desc(base_risk) |
|
|
| if mode == "linear": |
| severity_score = np.asarray(base_risk, dtype=float) |
| bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object) |
| elif mode == "bucket": |
| bands = np.array([_band_from_score(v, thresholds) for v in base_risk], dtype=object) |
| severity_score = np.array( |
| [0.0 if b == "OK" else (0.5 if b == "WARNING" else 1.0) for b in bands], |
| dtype=float, |
| ) |
| elif mode == "percentile": |
| severity_score = np.asarray(pct, dtype=float) / 100.0 |
| bands = np.array([_band_from_percentile(v, thresholds) for v in pct], dtype=object) |
| else: |
| severity_score = np.asarray(base_risk, dtype=float) |
| bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object) |
|
|
| severity_pct, severity_ranks = _percentile_desc(severity_score) |
| if mode == "pnr_cost": |
| pnr_cost_norm = np.clip(0.7 * severity_score + 0.3 * (severity_pct / 100.0), 0.0, 1.0) |
| else: |
| pnr_cost_norm = np.asarray(severity_score, dtype=float) |
|
|
| timing_derate = 1.0 + pnr_cost_norm * (float(scale.max_timing_derate) - 1.0) |
| guardband_mv = float(scale.base_guardband_mv) + pnr_cost_norm * ( |
| float(scale.max_guardband_mv) - float(scale.base_guardband_mv) |
| ) |
| density_cap = np.clip( |
| float(scale.max_density_cap) |
| - pnr_cost_norm * (float(scale.max_density_cap) - float(scale.min_density_cap)), |
| float(scale.min_density_cap), |
| float(scale.max_density_cap), |
| ) |
| pnr_cost = np.clip(pnr_cost_norm, 0.0, 1.0) * float(scale.pnr_cost_scale) |
|
|
| rows: List[Dict[str, Any]] = [] |
| n = int(len(base_risk)) |
| for q in range(n): |
| rr = None |
| cc = None |
| if qubit_coords and int(q) in qubit_coords: |
| rr, cc = qubit_coords[int(q)] |
|
|
| band = str(bands[q]) |
| route_priority = "HIGH" if band == "CRITICAL" else ("MEDIUM" if band == "WARNING" else "LOW") |
| rows.append( |
| { |
| "qubit": int(q), |
| "layout_row": None if rr is None else int(rr), |
| "layout_col": None if cc is None else int(cc), |
| "source_metric": str(source_metric), |
| "severity_mode": mode, |
| "base_risk": float(base_risk[q]), |
| "severity_score": float(np.clip(severity_score[q], 0.0, 1.0)), |
| "severity_band": band, |
| "severity_rank": int(severity_ranks[q]), |
| "severity_percentile": float(severity_pct[q]), |
| "pnr_cost": float(pnr_cost[q]), |
| "timing_derate": float(timing_derate[q]), |
| "density_cap": float(density_cap[q]), |
| "guardband_mv": float(guardband_mv[q]), |
| "route_priority": route_priority, |
| } |
| ) |
|
|
| rows.sort( |
| key=lambda row: ( |
| 1 if str(row["severity_band"]) == "CRITICAL" else (0 if str(row["severity_band"]) == "WARNING" else -1), |
| float(row["severity_score"]), |
| -int(row["severity_rank"]), |
| -int(row["qubit"]), |
| ), |
| reverse=True, |
| ) |
| return rows |
|
|
|
|
| def severity_rows_to_csv(rows: List[Dict[str, Any]]) -> str: |
| out = io.StringIO() |
| writer = csv.writer(out) |
| writer.writerow( |
| [ |
| "qubit", |
| "layout_row", |
| "layout_col", |
| "source_metric", |
| "severity_mode", |
| "base_risk", |
| "severity_score", |
| "severity_band", |
| "severity_rank", |
| "severity_percentile", |
| "pnr_cost", |
| "timing_derate", |
| "density_cap", |
| "guardband_mv", |
| "route_priority", |
| ] |
| ) |
| for row in rows: |
| writer.writerow( |
| [ |
| int(row["qubit"]), |
| "" if row.get("layout_row") is None else int(row["layout_row"]), |
| "" if row.get("layout_col") is None else int(row["layout_col"]), |
| str(row.get("source_metric", "")), |
| str(row.get("severity_mode", "")), |
| f"{float(row.get('base_risk', 0.0)):.6g}", |
| f"{float(row.get('severity_score', 0.0)):.6g}", |
| str(row.get("severity_band", "")), |
| int(row.get("severity_rank", 0)), |
| f"{float(row.get('severity_percentile', 0.0)):.6g}", |
| f"{float(row.get('pnr_cost', 0.0)):.6g}", |
| f"{float(row.get('timing_derate', 1.0)):.6g}", |
| f"{float(row.get('density_cap', 100.0)):.6g}", |
| f"{float(row.get('guardband_mv', 0.0)):.6g}", |
| str(row.get("route_priority", "")), |
| ] |
| ) |
| return out.getvalue() |
|
|