| from __future__ import annotations |
|
|
| import csv |
| import io |
| from typing import Any, Dict, Optional, Tuple |
|
|
|
|
| def default_qubit_coords(n_qubits: int, rows: int, cols: int) -> Dict[int, Tuple[int, int]]: |
| n = int(n_qubits) |
| rmax = int(rows) |
| cmax = int(cols) |
| m: Dict[int, Tuple[int, int]] = {} |
| for q in range(n): |
| rr = q // max(1, cmax) |
| cc = q % max(1, cmax) |
| if rr >= rmax: |
| break |
| m[q] = (rr, cc) |
| return m |
|
|
|
|
| def _to_int(v: Any) -> Optional[int]: |
| try: |
| s = str(v).strip() |
| if s == "": |
| return None |
| return int(s) |
| except Exception: |
| return None |
|
|
|
|
| def parse_layout_csv_text( |
| csv_text: str, |
| n_qubits: int, |
| *, |
| rows: int, |
| cols: int, |
| ) -> Tuple[Dict[int, Tuple[int, int]], Dict[str, Any]]: |
| """ |
| Parse qubit-to-layout CSV. |
| Expected headers (case-insensitive): |
| qubit,row,col |
| Also accepts aliases: |
| qubit_id/q/id, r/y for row, c/x/column for col. |
| """ |
| n = int(n_qubits) |
| rmax = int(rows) |
| cmax = int(cols) |
| fallback = default_qubit_coords(n, rmax, cmax) |
|
|
| text = (csv_text or "").strip() |
| if not text: |
| return fallback, { |
| "source": "default", |
| "parsed_rows": 0, |
| "mapped": len(fallback), |
| "fallback": len(fallback), |
| "skipped": 0, |
| "duplicates": 0, |
| } |
|
|
| reader = csv.DictReader(io.StringIO(text)) |
| coords: Dict[int, Tuple[int, int]] = {} |
| skipped = 0 |
| duplicates = 0 |
| parsed_rows = 0 |
|
|
| def _pick(row: Dict[str, Any], *keys: str) -> Any: |
| row_lc = {str(k).strip().lower(): v for k, v in row.items()} |
| for k in keys: |
| if k in row_lc: |
| return row_lc[k] |
| return None |
|
|
| for raw in reader: |
| parsed_rows += 1 |
| q = _to_int(_pick(raw, "qubit", "qubit_id", "q", "id")) |
| rr = _to_int(_pick(raw, "row", "r", "y")) |
| cc = _to_int(_pick(raw, "col", "column", "c", "x")) |
| if q is None or rr is None or cc is None: |
| skipped += 1 |
| continue |
| if not (0 <= q < n): |
| skipped += 1 |
| continue |
| if not (0 <= rr < rmax and 0 <= cc < cmax): |
| skipped += 1 |
| continue |
| if q in coords: |
| duplicates += 1 |
| coords[q] = (rr, cc) |
|
|
| merged = dict(fallback) |
| merged.update(coords) |
| return merged, { |
| "source": "uploaded", |
| "parsed_rows": parsed_rows, |
| "mapped": len(coords), |
| "fallback": len(merged) - len(coords), |
| "skipped": skipped, |
| "duplicates": duplicates, |
| } |
|
|