QureadAI / quread /severity_mapper.py
hchevva's picture
Upload 4 files
7b310d8 verified
from __future__ import annotations
import csv
import io
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple
import numpy as np
_DIRECT_RISK_METRICS = {
"activity_norm",
"gate_error",
"readout_error",
"decoherence_risk",
"composite_risk",
}
_INVERTED_RISK_METRICS = {
"coherence_health",
"fidelity",
"state_fidelity",
"process_fidelity",
}
@dataclass
class SeverityWeights:
activity: float = 0.25
gate_error: float = 0.20
readout_error: float = 0.15
decoherence: float = 0.25
fidelity: float = 0.15
@dataclass
class SeverityThresholds:
warning: float = 0.45
critical: float = 0.70
percentile_warning: float = 67.0
percentile_critical: float = 90.0
@dataclass
class PNRScaling:
max_timing_derate: float = 1.15
base_guardband_mv: float = 5.0
max_guardband_mv: float = 60.0
min_density_cap: float = 40.0
max_density_cap: float = 100.0
pnr_cost_scale: float = 100.0
@dataclass
class SeverityConfig:
mode: str = "linear"
source_metric: str = "composite_risk"
thresholds: SeverityThresholds = field(default_factory=SeverityThresholds)
weights: SeverityWeights = field(default_factory=SeverityWeights)
pnr_scaling: PNRScaling = field(default_factory=PNRScaling)
def _clip01(v: float) -> float:
return float(np.clip(float(v), 0.0, 1.0))
def _normalize_weights(weights: SeverityWeights) -> SeverityWeights:
vals = np.array(
[
float(weights.activity),
float(weights.gate_error),
float(weights.readout_error),
float(weights.decoherence),
float(weights.fidelity),
],
dtype=float,
)
total = float(np.sum(vals))
if total <= 0:
vals = np.array([0.25, 0.20, 0.15, 0.25, 0.15], dtype=float)
total = float(np.sum(vals))
vals = vals / total
return SeverityWeights(
activity=float(vals[0]),
gate_error=float(vals[1]),
readout_error=float(vals[2]),
decoherence=float(vals[3]),
fidelity=float(vals[4]),
)
def _clamp_thresholds(thresholds: SeverityThresholds) -> SeverityThresholds:
warning = _clip01(float(thresholds.warning))
critical = _clip01(float(thresholds.critical))
if critical < warning:
critical = warning
pct_warning = float(np.clip(float(thresholds.percentile_warning), 0.0, 100.0))
pct_critical = float(np.clip(float(thresholds.percentile_critical), 0.0, 100.0))
if pct_critical < pct_warning:
pct_critical = pct_warning
return SeverityThresholds(
warning=warning,
critical=critical,
percentile_warning=pct_warning,
percentile_critical=pct_critical,
)
def _normalized_metric_risk(metrics: Dict[str, np.ndarray], metric_name: str) -> Tuple[np.ndarray, str]:
key = str(metric_name or "composite_risk").strip().lower()
if key == "activity_count":
if "activity_norm" not in metrics:
raise ValueError("Severity mapper requires activity_norm for activity_count mode.")
return np.asarray(metrics["activity_norm"], dtype=float), "activity_count"
if key in _DIRECT_RISK_METRICS:
if key not in metrics:
raise ValueError(f"Severity mapper missing metric: {key}")
return np.asarray(metrics[key], dtype=float), key
if key in _INVERTED_RISK_METRICS:
if key not in metrics:
raise ValueError(f"Severity mapper missing metric: {key}")
return 1.0 - np.asarray(metrics[key], dtype=float), key
raise ValueError(f"Unsupported severity source metric: {metric_name}")
def _weighted_risk(metrics: Dict[str, np.ndarray], weights: SeverityWeights) -> np.ndarray:
for key in ("activity_norm", "gate_error", "readout_error", "decoherence_risk", "fidelity"):
if key not in metrics:
raise ValueError(f"Severity mapper missing metric: {key}")
fidelity_risk = 1.0 - np.asarray(metrics["fidelity"], dtype=float)
return np.clip(
float(weights.activity) * np.asarray(metrics["activity_norm"], dtype=float)
+ float(weights.gate_error) * np.asarray(metrics["gate_error"], dtype=float)
+ float(weights.readout_error) * np.asarray(metrics["readout_error"], dtype=float)
+ float(weights.decoherence) * np.asarray(metrics["decoherence_risk"], dtype=float)
+ float(weights.fidelity) * fidelity_risk,
0.0,
1.0,
)
def _percentile_desc(values: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
arr = np.asarray(values, dtype=float).reshape(-1)
n = int(arr.size)
if n == 0:
return np.zeros((0,), dtype=float), np.zeros((0,), dtype=int)
order = np.argsort(-arr, kind="stable")
ranks = np.empty((n,), dtype=int)
ranks[order] = np.arange(1, n + 1, dtype=int)
if n == 1:
pct = np.array([100.0], dtype=float)
else:
pct = 100.0 * (1.0 - (ranks.astype(float) - 1.0) / float(n - 1))
return pct, ranks
def _band_from_score(score: float, thresholds: SeverityThresholds) -> str:
if float(score) >= float(thresholds.critical):
return "CRITICAL"
if float(score) >= float(thresholds.warning):
return "WARNING"
return "OK"
def _band_from_percentile(percentile: float, thresholds: SeverityThresholds) -> str:
if float(percentile) >= float(thresholds.percentile_critical):
return "CRITICAL"
if float(percentile) >= float(thresholds.percentile_warning):
return "WARNING"
return "OK"
def compute_severity_rows(
metrics: Dict[str, np.ndarray],
*,
cfg: SeverityConfig | None = None,
qubit_coords: Dict[int, Tuple[int, int]] | None = None,
) -> List[Dict[str, Any]]:
cfg = cfg or SeverityConfig()
mode = str(cfg.mode or "linear").strip().lower()
thresholds = _clamp_thresholds(cfg.thresholds)
weights = _normalize_weights(cfg.weights)
scale = cfg.pnr_scaling
if mode in ("linear", "bucket", "percentile"):
base_risk, source_metric = _normalized_metric_risk(metrics, cfg.source_metric)
elif mode in ("weighted", "pnr_cost"):
base_risk = _weighted_risk(metrics, weights)
source_metric = "weighted_blend"
else:
raise ValueError(f"Unsupported severity mode: {cfg.mode}")
base_risk = np.clip(np.asarray(base_risk, dtype=float), 0.0, 1.0)
pct, ranks = _percentile_desc(base_risk)
if mode == "linear":
severity_score = np.asarray(base_risk, dtype=float)
bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object)
elif mode == "bucket":
bands = np.array([_band_from_score(v, thresholds) for v in base_risk], dtype=object)
severity_score = np.array(
[0.0 if b == "OK" else (0.5 if b == "WARNING" else 1.0) for b in bands],
dtype=float,
)
elif mode == "percentile":
severity_score = np.asarray(pct, dtype=float) / 100.0
bands = np.array([_band_from_percentile(v, thresholds) for v in pct], dtype=object)
else:
severity_score = np.asarray(base_risk, dtype=float)
bands = np.array([_band_from_score(v, thresholds) for v in severity_score], dtype=object)
severity_pct, severity_ranks = _percentile_desc(severity_score)
if mode == "pnr_cost":
pnr_cost_norm = np.clip(0.7 * severity_score + 0.3 * (severity_pct / 100.0), 0.0, 1.0)
else:
pnr_cost_norm = np.asarray(severity_score, dtype=float)
timing_derate = 1.0 + pnr_cost_norm * (float(scale.max_timing_derate) - 1.0)
guardband_mv = float(scale.base_guardband_mv) + pnr_cost_norm * (
float(scale.max_guardband_mv) - float(scale.base_guardband_mv)
)
density_cap = np.clip(
float(scale.max_density_cap)
- pnr_cost_norm * (float(scale.max_density_cap) - float(scale.min_density_cap)),
float(scale.min_density_cap),
float(scale.max_density_cap),
)
pnr_cost = np.clip(pnr_cost_norm, 0.0, 1.0) * float(scale.pnr_cost_scale)
rows: List[Dict[str, Any]] = []
n = int(len(base_risk))
for q in range(n):
rr = None
cc = None
if qubit_coords and int(q) in qubit_coords:
rr, cc = qubit_coords[int(q)]
band = str(bands[q])
route_priority = "HIGH" if band == "CRITICAL" else ("MEDIUM" if band == "WARNING" else "LOW")
rows.append(
{
"qubit": int(q),
"layout_row": None if rr is None else int(rr),
"layout_col": None if cc is None else int(cc),
"source_metric": str(source_metric),
"severity_mode": mode,
"base_risk": float(base_risk[q]),
"severity_score": float(np.clip(severity_score[q], 0.0, 1.0)),
"severity_band": band,
"severity_rank": int(severity_ranks[q]),
"severity_percentile": float(severity_pct[q]),
"pnr_cost": float(pnr_cost[q]),
"timing_derate": float(timing_derate[q]),
"density_cap": float(density_cap[q]),
"guardband_mv": float(guardband_mv[q]),
"route_priority": route_priority,
}
)
rows.sort(
key=lambda row: (
1 if str(row["severity_band"]) == "CRITICAL" else (0 if str(row["severity_band"]) == "WARNING" else -1),
float(row["severity_score"]),
-int(row["severity_rank"]),
-int(row["qubit"]),
),
reverse=True,
)
return rows
def severity_rows_to_csv(rows: List[Dict[str, Any]]) -> str:
out = io.StringIO()
writer = csv.writer(out)
writer.writerow(
[
"qubit",
"layout_row",
"layout_col",
"source_metric",
"severity_mode",
"base_risk",
"severity_score",
"severity_band",
"severity_rank",
"severity_percentile",
"pnr_cost",
"timing_derate",
"density_cap",
"guardband_mv",
"route_priority",
]
)
for row in rows:
writer.writerow(
[
int(row["qubit"]),
"" if row.get("layout_row") is None else int(row["layout_row"]),
"" if row.get("layout_col") is None else int(row["layout_col"]),
str(row.get("source_metric", "")),
str(row.get("severity_mode", "")),
f"{float(row.get('base_risk', 0.0)):.6g}",
f"{float(row.get('severity_score', 0.0)):.6g}",
str(row.get("severity_band", "")),
int(row.get("severity_rank", 0)),
f"{float(row.get('severity_percentile', 0.0)):.6g}",
f"{float(row.get('pnr_cost', 0.0)):.6g}",
f"{float(row.get('timing_derate', 1.0)):.6g}",
f"{float(row.get('density_cap', 100.0)):.6g}",
f"{float(row.get('guardband_mv', 0.0)):.6g}",
str(row.get("route_priority", "")),
]
)
return out.getvalue()