hchevva commited on
Commit
87ea89f
·
verified ·
1 Parent(s): d49a735

Upload trends.py

Browse files
Files changed (1) hide show
  1. quread/trends.py +160 -0
quread/trends.py ADDED
@@ -0,0 +1,160 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import json
4
+ from typing import Any, Dict, List, Tuple
5
+
6
+ import numpy as np
7
+
8
+ from .metrics import (
9
+ MetricThresholds,
10
+ MetricWeights,
11
+ compute_metrics_from_csv,
12
+ )
13
+
14
+ _TREND_METRIC_ALIASES = {
15
+ "composite": "composite_risk",
16
+ "composite_risk": "composite_risk",
17
+ "gate_error": "gate_error",
18
+ "readout_error": "readout_error",
19
+ "decoherence_risk": "decoherence_risk",
20
+ "fidelity": "fidelity",
21
+ "state_fidelity": "state_fidelity",
22
+ "process_fidelity": "process_fidelity",
23
+ "coherence_health": "coherence_health",
24
+ }
25
+
26
+
27
+ def _resolve_metric(metric: str) -> str:
28
+ key = str(metric or "composite_risk").strip().lower()
29
+ return _TREND_METRIC_ALIASES.get(key, "composite_risk")
30
+
31
+
32
+ def _snapshot_timestamp(snapshot: Dict[str, Any], idx: int) -> str:
33
+ ts = snapshot.get("timestamp")
34
+ if ts is None:
35
+ ts = snapshot.get("ts")
36
+ if ts is None:
37
+ ts = snapshot.get("date")
38
+ text = str(ts).strip() if ts is not None else ""
39
+ if text:
40
+ return text
41
+ return f"snapshot_{idx + 1}"
42
+
43
+
44
+ def _normalize_snapshot(snapshot: Dict[str, Any], idx: int) -> Dict[str, Any] | None:
45
+ if not isinstance(snapshot, dict):
46
+ return None
47
+
48
+ ts = _snapshot_timestamp(snapshot, idx)
49
+ if isinstance(snapshot.get("calibration"), dict):
50
+ payload = snapshot["calibration"]
51
+ elif isinstance(snapshot.get("qubits"), dict):
52
+ payload = {"qubits": snapshot["qubits"]}
53
+ else:
54
+ qubit_like = {
55
+ str(k): v
56
+ for k, v in snapshot.items()
57
+ if str(k).strip().isdigit() and isinstance(v, dict)
58
+ }
59
+ if not qubit_like:
60
+ return None
61
+ payload = {"qubits": qubit_like}
62
+
63
+ return {
64
+ "timestamp": ts,
65
+ "calibration_json": json.dumps(payload),
66
+ }
67
+
68
+
69
+ def parse_calibration_snapshots_text(text: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
70
+ raw = str(text or "").strip()
71
+ if not raw:
72
+ return [], {"format": "empty", "parsed": 0, "skipped": 0}
73
+
74
+ snapshots: List[Dict[str, Any]] = []
75
+ skipped = 0
76
+ fmt = "unknown"
77
+
78
+ try:
79
+ parsed = json.loads(raw)
80
+ if isinstance(parsed, dict) and isinstance(parsed.get("snapshots"), list):
81
+ iterable = parsed["snapshots"]
82
+ fmt = "json:snapshots"
83
+ elif isinstance(parsed, list):
84
+ iterable = parsed
85
+ fmt = "json:list"
86
+ elif isinstance(parsed, dict):
87
+ iterable = [parsed]
88
+ fmt = "json:single"
89
+ else:
90
+ iterable = []
91
+ skipped += 1
92
+ except Exception:
93
+ fmt = "jsonl"
94
+ iterable = []
95
+ for line in raw.splitlines():
96
+ chunk = line.strip()
97
+ if not chunk:
98
+ continue
99
+ try:
100
+ iterable.append(json.loads(chunk))
101
+ except Exception:
102
+ skipped += 1
103
+
104
+ for idx, snap in enumerate(iterable):
105
+ normalized = _normalize_snapshot(snap, idx)
106
+ if normalized is None:
107
+ skipped += 1
108
+ continue
109
+ snapshots.append(normalized)
110
+
111
+ return snapshots, {"format": fmt, "parsed": len(snapshots), "skipped": int(skipped)}
112
+
113
+
114
+ def compute_metric_trends(
115
+ csv_text: str,
116
+ n_qubits: int,
117
+ snapshots_text: str,
118
+ *,
119
+ metric: str = "composite_risk",
120
+ state_vector: np.ndarray | None = None,
121
+ weights: MetricWeights | None = None,
122
+ thresholds: MetricThresholds | None = None,
123
+ ) -> Tuple[np.ndarray, List[str], List[Dict[str, float]], Dict[str, Any]]:
124
+ metric_key = _resolve_metric(metric)
125
+ snapshots, meta = parse_calibration_snapshots_text(snapshots_text)
126
+ if not snapshots:
127
+ raise ValueError("No valid calibration snapshots found.")
128
+
129
+ series = []
130
+ labels: List[str] = []
131
+ for snap in snapshots:
132
+ labels.append(str(snap["timestamp"]))
133
+ metrics, _ = compute_metrics_from_csv(
134
+ csv_text,
135
+ int(n_qubits),
136
+ calibration_json=str(snap["calibration_json"]),
137
+ state_vector=state_vector,
138
+ weights=weights,
139
+ thresholds=thresholds,
140
+ )
141
+ series.append(np.asarray(metrics[metric_key], dtype=float))
142
+
143
+ arr = np.vstack(series)
144
+ latest = arr[-1]
145
+ baseline = arr[0]
146
+
147
+ ranking: List[Dict[str, float]] = []
148
+ for q in range(int(n_qubits)):
149
+ ranking.append(
150
+ {
151
+ "qubit": float(q),
152
+ "latest": float(latest[q]),
153
+ "baseline": float(baseline[q]),
154
+ "delta": float(latest[q] - baseline[q]),
155
+ }
156
+ )
157
+ ranking.sort(key=lambda r: r["latest"], reverse=True)
158
+ meta["metric"] = metric_key
159
+ meta["points"] = int(arr.shape[0])
160
+ return arr, labels, ranking, meta