Spaces:
Running
Running
| """Data loaders for the sync_pilot dashboard. | |
| All loaders are Streamlit-cached so multiple pages and reruns are cheap. The | |
| ``SYNC_PILOT_DATA_SOURCE`` env var picks between ``"local"`` (default — | |
| read from the on-disk ``data/outputs/median`` directory under the package | |
| project root) and ``"hf"`` (snapshot-download from the private HuggingFace | |
| dataset and read from the local cache). Page code never inspects the source. | |
| Lightweight markdown parsing for the taxonomy lives here too — we extract | |
| structured fields per dimension so the taxonomy browser can render rich | |
| controls rather than just dumping the raw text. The parser falls back to | |
| raw markdown for any section it can't decompose, so a malformed section | |
| never blocks rendering of the rest. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any, Literal | |
| import streamlit as st | |
| from sync_pilot import config | |
| from sync_pilot.schema import TrackRecord | |
| SummaryName = Literal["tagging", "clap", "description", "transcription"] | |
| _SUMMARY_FILENAMES: dict[str, str] = { | |
| "tagging": "_batch_summary.json", | |
| "clap": "_clap_summary.json", | |
| "description": "_description_summary.json", | |
| "transcription": "_transcription_summary.json", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Local-vs-HF data source resolution | |
| # --------------------------------------------------------------------------- | |
| def _data_source() -> str: | |
| """Return ``"local"`` (default) or ``"hf"``. | |
| Setting ``SYNC_PILOT_DATA_SOURCE=hf`` triggers a snapshot-download of the | |
| private dataset on first call (cached for the session via ``lru_cache``) | |
| and routes every subsequent loader to the local snapshot directory. | |
| """ | |
| return os.getenv("SYNC_PILOT_DATA_SOURCE", "local").strip().lower() | |
| def _resolved_data_root() -> Path: | |
| """Where data lives on disk for this session. | |
| Local mode: ``<project>/data``. | |
| HF mode: a directory inside ``HF_CACHE_DIR / "sync_pilot_dashboard"`` that | |
| we populate via ``huggingface_hub.snapshot_download``. The HF snapshot | |
| mirrors the project's ``data/`` subtree under a ``sync_pilot/`` prefix | |
| inside the repo (see the ``publish`` CLI), so we point callers at | |
| ``<snapshot>/sync_pilot`` to keep the on-disk layout identical to local. | |
| """ | |
| mode = _data_source() | |
| if mode == "local": | |
| return config.DATA_DIR | |
| if mode != "hf": | |
| raise RuntimeError( | |
| f"SYNC_PILOT_DATA_SOURCE must be 'local' or 'hf', got {mode!r}" | |
| ) | |
| # Lazy import so local-only users don't pay the import cost. | |
| from huggingface_hub import snapshot_download | |
| repo_id = os.getenv("PRIVATE_DATASET_REPO", "").strip() | |
| if not repo_id: | |
| raise RuntimeError( | |
| "SYNC_PILOT_DATA_SOURCE=hf but PRIVATE_DATASET_REPO is unset" | |
| ) | |
| revision = os.getenv("PRIVATE_DATASET_REVISION", "main").strip() or "main" | |
| token = os.getenv("HF_TOKEN") or None | |
| local_dir = config.HF_CACHE_DIR / "sync_pilot_dashboard" / repo_id.replace("/", "__") | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| snapshot_dir = snapshot_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| revision=revision, | |
| token=token, | |
| local_dir=str(local_dir), | |
| # ``_audit_hf.jsonl`` is a Space-only sidecar (curator edits made | |
| # from the HF dashboard). It must come down on boot so we append | |
| # rather than overwriting previous Space-side history. | |
| # ``_review.json`` is the INFER-curation sidecar; same story. | |
| allow_patterns=[ | |
| "sync_pilot/outputs/median/*.json", | |
| "sync_pilot/groundtruth/taxonomy.md", | |
| "sync_pilot/groundtruth/median/*.json", | |
| "sync_pilot/groundtruth/median/_audit_hf.jsonl", | |
| "sync_pilot/audio/median/manifest.jsonl", | |
| "sync_pilot/audio/median/*.m4a", | |
| # Extension (GT-expansion) set — display + edit in the GT-review | |
| # "Extension" tab. ``_audit_hf.jsonl`` is the Space-only edit | |
| # history; it must come down on boot so writebacks append rather | |
| # than overwrite prior Space-side edits (same story as median). | |
| "sync_pilot/outputs/gt_expansion/median_adjacent_combined_500/*.json", | |
| "sync_pilot/gt_expansion/median_adjacent_combined_500/groundtruth/*.json", | |
| "sync_pilot/gt_expansion/median_adjacent_combined_500/groundtruth/_audit_hf.jsonl", | |
| "sync_pilot/gt_expansion/median_adjacent_combined_500/manifest.jsonl", | |
| "sync_pilot/gt_expansion/median_adjacent_combined_500/_review.json", | |
| # Cohort map (catalog / ext / ext-median) — without it load_cohorts() | |
| # is empty and the GT-review Ext-Median tab can't appear. | |
| "sync_pilot/gt_ingest/cohorts.json", | |
| ], | |
| ) | |
| # The downloaded snapshot mirrors the repo tree, so the project-equivalent | |
| # data root is the ``sync_pilot`` subdirectory inside it. | |
| return Path(snapshot_dir) / "sync_pilot" | |
| def _outputs_dir() -> Path: | |
| return _resolved_data_root() / "outputs" / "median" | |
| def _groundtruth_dir() -> Path: | |
| return _resolved_data_root() / "groundtruth" / "median" | |
| def _hf_writeback( | |
| files: list[tuple[Path, str]], | |
| *, | |
| commit_message: str, | |
| ) -> bool: | |
| """Push one or more locally-edited files to the private dataset. | |
| ``files`` is ``[(local_path, path_in_repo), ...]``. We use | |
| ``HfApi.create_commit`` with ``CommitOperationAdd`` operations so the | |
| set lands as a single atomic commit (e.g. updated track JSON + audit | |
| log entry in one shot). Returns True on success, False on failure; | |
| callers surface a Streamlit warning so the curator knows the local | |
| edit succeeded but the push didn't (state will revert on Space | |
| restart in that case). | |
| Lazy imports ``huggingface_hub`` so local-mode users never pay the | |
| import cost and our ``BlockTorch``-style smoke tests stay clean. | |
| """ | |
| repo_id = os.getenv("PRIVATE_DATASET_REPO", "").strip() | |
| token = os.getenv("HF_TOKEN") or None | |
| if not repo_id or not token: | |
| st.warning( | |
| "HF writeback skipped — PRIVATE_DATASET_REPO or HF_TOKEN not set " | |
| "in the Space environment. Edit persisted to the snapshot cache " | |
| "only and will be lost on Space restart." | |
| ) | |
| return False | |
| revision = os.getenv("PRIVATE_DATASET_REVISION", "main").strip() or "main" | |
| try: | |
| from huggingface_hub import CommitOperationAdd, HfApi | |
| except Exception as exc: # noqa: BLE001 | |
| st.error(f"HF writeback failed: huggingface_hub import error ({exc})") | |
| return False | |
| operations = [ | |
| CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=str(local)) | |
| for local, repo_path in files | |
| if local.exists() | |
| ] | |
| if not operations: | |
| return False | |
| try: | |
| HfApi(token=token).create_commit( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| revision=revision, | |
| operations=operations, | |
| commit_message=commit_message, | |
| ) | |
| return True | |
| except Exception as exc: # noqa: BLE001 | |
| st.error(f"HF writeback failed: {exc}") | |
| return False | |
| def _review_local_path() -> Path: | |
| """Where the GT-review sidecar is written. Always under the LOCAL data | |
| dir — never inside the HF snapshot cache (which is read-only). The | |
| reader still consults _resolved_data_root() first so an HF snapshot can | |
| bundle a published review state.""" | |
| return config.DATA_DIR / "groundtruth" / "median" / "_review.json" | |
| def _review_read_path() -> Path: | |
| return _resolved_data_root() / "groundtruth" / "median" / "_review.json" | |
| def _manifest_path() -> Path: | |
| return _resolved_data_root() / "audio" / "median" / "manifest.jsonl" | |
| # Expansion paths resolve via ``_resolved_data_root()`` so the GT-review | |
| # "Extension" tab reads the same way the catalog does: from the on-disk | |
| # ``data/`` tree locally, and from the downloaded HF snapshot in ``hf`` mode. | |
| # In local mode ``_resolved_data_root()`` == ``config.DATA_DIR``, so existing | |
| # local read/write behaviour is unchanged. | |
| def _expansion_outputs_dir() -> Path: | |
| return _resolved_data_root() / "outputs" / "gt_expansion" / "median_adjacent_combined_500" | |
| def _expansion_groundtruth_dir() -> Path: | |
| return ( | |
| _resolved_data_root() | |
| / "gt_expansion" | |
| / "median_adjacent_combined_500" | |
| / "groundtruth" | |
| ) | |
| def _expansion_review_path() -> Path: | |
| return ( | |
| _resolved_data_root() | |
| / "gt_expansion" | |
| / "median_adjacent_combined_500" | |
| / "_review.json" | |
| ) | |
| def _expansion_manifest_path() -> Path: | |
| return ( | |
| _resolved_data_root() | |
| / "gt_expansion" | |
| / "median_adjacent_combined_500" | |
| / "manifest.jsonl" | |
| ) | |
| def _taxonomy_path() -> Path: | |
| # The taxonomy lives under the package source tree, not under data/, so | |
| # it ships with the install and is the same in local & HF modes (the HF | |
| # publish copies it into the dataset alongside outputs/ so the dashboard | |
| # works even when run with no source checkout). | |
| local = config.PACKAGE_ROOT / "groundtruth" / "taxonomy.md" | |
| if local.exists(): | |
| return local | |
| # HF mode fallback — taxonomy.md was published to the dataset. | |
| return _resolved_data_root() / "groundtruth" / "taxonomy.md" | |
| # --------------------------------------------------------------------------- | |
| # Track + summary loaders | |
| # --------------------------------------------------------------------------- | |
| def load_tracks() -> list[dict[str, Any]]: | |
| """Load every per-track JSON, validated through the schema. | |
| Returns dicts (not the Pydantic model itself) because Streamlit's data | |
| cache works best on JSON-serialisable types and pages downstream don't | |
| need the Pydantic features. We still round-trip through ``TrackRecord`` | |
| so a structurally bad file fails loudly here rather than crashing a | |
| page deep in rendering. | |
| """ | |
| out: list[dict[str, Any]] = [] | |
| outputs_dir = _outputs_dir() | |
| if not outputs_dir.exists(): | |
| return out | |
| for p in sorted(outputs_dir.glob("*.json")): | |
| if p.name.startswith("_"): | |
| continue | |
| try: | |
| raw = json.loads(p.read_text()) | |
| # Validate but don't drop fields we didn't model — the schema is | |
| # ``extra='forbid'`` for the record itself but ``metadata`` is | |
| # ``dict[str, Any]`` so all the provenance keys survive. | |
| TrackRecord.model_validate(raw) | |
| out.append(raw) | |
| except Exception as e: # noqa: BLE001 — render a warning, keep going | |
| st.warning(f"Skipping unparseable track {p.name}: {e}") | |
| continue | |
| out.sort(key=lambda r: r.get("track_id", "")) | |
| return out | |
| def load_summary(name: SummaryName) -> dict[str, Any]: | |
| """Load one of the four ``_*_summary.json`` files. | |
| Returns an empty dict if the summary is missing (e.g. that stage hasn't | |
| been run yet in the snapshot) so pages can degrade gracefully. | |
| """ | |
| fname = _SUMMARY_FILENAMES[name] | |
| p = _outputs_dir() / fname | |
| if not p.exists(): | |
| return {} | |
| try: | |
| return json.loads(p.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Could not parse {fname}: {e}") | |
| return {} | |
| def load_groundtruth() -> dict[str, dict[str, Any]]: | |
| """Load every ``GroundTruthRecord`` JSON under ``groundtruth/median/``, | |
| keyed by ``track_id`` for O(1) lookup from the Tracks page. | |
| Returns plain dicts (not the Pydantic model) for the same reason | |
| ``load_tracks()`` does — Streamlit's data cache works best on JSON- | |
| serialisable types. We don't round-trip through the Pydantic schema | |
| here because the GT records are written by sync_pilot's own extractor | |
| (already validated on write) and the dashboard never mutates them. | |
| A bad file still degrades gracefully — we just skip it with a warning. | |
| Returns ``{}`` when the directory doesn't exist (e.g. a fresh checkout | |
| that hasn't run ``research-batch`` yet), so caller code can check | |
| ``gt_by_id.get(track_id)`` without further guards. | |
| """ | |
| out: dict[str, dict[str, Any]] = {} | |
| gt_dir = _groundtruth_dir() | |
| if not gt_dir.exists(): | |
| return out | |
| for p in sorted(gt_dir.glob("*.json")): | |
| if p.name.startswith("_"): | |
| continue | |
| try: | |
| raw = json.loads(p.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Skipping unparseable ground-truth {p.name}: {e}") | |
| continue | |
| tid = raw.get("track_id") | |
| if tid: | |
| out[tid] = raw | |
| return out | |
| def load_expansion_tracks() -> list[dict[str, Any]]: | |
| """Load expansion TrackRecords for read-only GT review display.""" | |
| out: list[dict[str, Any]] = [] | |
| outputs_dir = _expansion_outputs_dir() | |
| if not outputs_dir.exists(): | |
| return out | |
| for p in sorted(outputs_dir.glob("*.json")): | |
| if p.name.startswith("_"): | |
| continue | |
| try: | |
| raw = json.loads(p.read_text()) | |
| TrackRecord.model_validate(raw) | |
| out.append(raw) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Skipping unparseable expansion track {p.name}: {e}") | |
| continue | |
| out.sort(key=lambda r: r.get("track_id", "")) | |
| return out | |
| def load_expansion_groundtruth() -> dict[str, dict[str, Any]]: | |
| """Load metadata GT records for the expansion set.""" | |
| out: dict[str, dict[str, Any]] = {} | |
| gt_dir = _expansion_groundtruth_dir() | |
| if not gt_dir.exists(): | |
| return out | |
| for p in sorted(gt_dir.glob("*.json")): | |
| if p.name.startswith("_"): | |
| continue | |
| try: | |
| raw = json.loads(p.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Skipping unparseable expansion ground-truth {p.name}: {e}") | |
| continue | |
| tid = raw.get("track_id") | |
| if tid: | |
| out[tid] = raw | |
| return out | |
| def load_expansion_triage() -> dict[str, dict[str, Any]]: | |
| """Load the ``_triage.json`` review-priority ranking for the expansion set. | |
| Produced by ``scripts/triage_disagreement.py`` — a per-track priority score | |
| (model disagreement + uncertainty) used to order the GT-review queue so the | |
| highest-information tracks surface first. Returns ``{track_id: {priority, | |
| disagreement, uncertainty, reasons, ...}}``; ``{}`` when the sidecar is | |
| absent (triage not yet run / not in the snapshot), so the page degrades to | |
| plain alphabetical order. The ``_triage.json`` sidecar rides the same | |
| ``outputs/.../*.json`` publish + snapshot globs as the TrackRecords. | |
| """ | |
| path = _expansion_outputs_dir() / "_triage.json" | |
| if not path.exists(): | |
| return {} | |
| try: | |
| data = json.loads(path.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Could not parse _triage.json: {e}") | |
| return {} | |
| out: dict[str, dict[str, Any]] = {} | |
| for row in data.get("ranked", []): | |
| tid = row.get("track_id") | |
| if tid: | |
| out[tid] = row | |
| return out | |
| def load_cohorts() -> dict[str, str]: | |
| """Load the ``gt_ingest/cohorts.json`` track→cohort map. | |
| Cohorts: ``catalog`` (Median eval set), ``ext`` (prior expansion), and | |
| ``ext-median`` (the Spotify-sourced Median playlist tracks, pooled into the | |
| combined_500 expansion but tagged distinctly for focused annotation). Built | |
| by ``scripts/build_cohorts.py``. Returns ``{track_id: cohort}``; ``{}`` when | |
| absent so the GT-review page degrades to its undivided Extension view. | |
| """ | |
| path = _resolved_data_root() / "gt_ingest" / "cohorts.json" | |
| if not path.exists(): | |
| return {} | |
| try: | |
| data = json.loads(path.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Could not parse cohorts.json: {e}") | |
| return {} | |
| return data.get("cohorts", {}) | |
| def load_subtypes_by_family() -> dict[str, list[str]]: | |
| """Parse dim 2 (genre subtype) from ``taxonomy.md`` grouped by parent | |
| family. Returns ``{family: [bare_subtype, ...]}`` — e.g. | |
| ``{'arabesk': ['acılı-arabesk', 'fantezi-arabesk', ...], 'halk': [...]}``. | |
| The existing ``load_taxonomy`` parser flattens dim 2 into a single | |
| ``controlled_vocab`` list and loses the ``#### Under \\`<family>\\``` | |
| sub-heading grouping. The GT-review sheet needs the grouping to build | |
| cascading dropdowns (subtype options filtered by current family), so | |
| this loader re-parses the same section with a sub-heading-aware regex. | |
| Nested forms like ``oyun-havası.halay`` are kept verbatim — the dotted | |
| form is preserved through to the editable dropdown. | |
| """ | |
| p = _taxonomy_path() | |
| if not p.exists(): | |
| return {} | |
| raw = p.read_text() | |
| dim2 = re.search( | |
| r"###\s+Dimension\s+2[^\n]*\n(.+?)(?=###\s+Dimension\s+3)", | |
| raw, | |
| re.DOTALL, | |
| ) | |
| if not dim2: | |
| return {} | |
| body = dim2.group(1) | |
| out: dict[str, list[str]] = {} | |
| # Stop each section at the next `#### Under` block, the next bold | |
| # callout (e.g. ``**Source authority:**``, ``**Example tracks…**``), | |
| # or end-of-string. Without the bold-callout boundary the LAST family | |
| # (here ``fantezi``) eats the catalog-examples list that follows it. | |
| section_re = re.compile( | |
| r"####\s+Under\s+`([^`]+)`[^\n]*\n(.+?)(?=####|^\*\*|\Z)", | |
| re.DOTALL | re.MULTILINE, | |
| ) | |
| for m in section_re.finditer(body): | |
| family = m.group(1).strip() | |
| terms: list[str] = [] | |
| for line in m.group(2).splitlines(): | |
| tm = re.match(r"^\s*-\s*`([^`]+)`", line) | |
| if tm: | |
| terms.append(tm.group(1).strip()) | |
| if terms: | |
| out[family] = terms | |
| return out | |
| def save_gt_edit( | |
| track_id: str, | |
| field: str, | |
| new_value: Any, | |
| *, | |
| old_value: Any = None, | |
| confidence: str = "high", | |
| cascade_family: str | None = None, | |
| ) -> bool: | |
| """Apply one inline GT edit from the review sheet to the per-track JSON. | |
| - Writes ``record[field] = new_value`` atomically (tmp file + rename). | |
| - Bumps ``record[f'{field}_confidence']`` to ``confidence`` (default | |
| "high") so describe-batch and the rest of the pipeline treat the | |
| human-edited value as authoritative. | |
| - When ``cascade_family`` is provided (only used when ``field == | |
| 'genre_subtype'`` and the picked subtype's parent family differs | |
| from the existing family), also updates ``record['genre_family']`` | |
| so the two fields stay consistent. | |
| - Snapshots the GT JSON to ``<track>.json.bak.<timestamp>`` on the | |
| FIRST edit per track (subsequent edits skip the backup so the | |
| directory doesn't fill with .bak files). | |
| - Appends a structured entry to ``data/groundtruth/median/_audit.jsonl`` | |
| (append-only JSONL) for traceability. | |
| - Validates the record against the Pydantic schema BEFORE writing so a | |
| bad edit never lands on disk. | |
| Returns True on success, False on validation/write failure. In HF | |
| mode the edit is also pushed back to the private dataset in a single | |
| atomic commit (track JSON + audit log together); local edits skip | |
| the HF round-trip. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| gt_dir = _groundtruth_dir() | |
| gt_path = gt_dir / f"{track_id}.json" | |
| if not gt_path.exists(): | |
| st.error(f"GT JSON not found for {track_id}") | |
| return False | |
| # First-edit-per-track backup (local only — we don't churn .bak files | |
| # on the ephemeral Space FS; the audit log captures the old value). | |
| if not in_hf: | |
| existing_baks = list(gt_dir.glob(f"{track_id}.json.bak.*")) | |
| if not existing_baks: | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") | |
| bak = gt_dir / f"{track_id}.json.bak.{ts}" | |
| bak.write_bytes(gt_path.read_bytes()) | |
| record = json.loads(gt_path.read_text()) | |
| record[field] = new_value if new_value != "" else None | |
| conf_field = f"{field}_confidence" | |
| if conf_field in record: | |
| record[conf_field] = confidence | |
| if cascade_family: | |
| record["genre_family"] = cascade_family | |
| if "genre_family_confidence" in record: | |
| record["genre_family_confidence"] = confidence | |
| # Validate before persisting. The Pydantic schema has extra='forbid', | |
| # so a typo'd field name would surface here instead of silently landing. | |
| from sync_pilot.groundtruth.schema import GroundTruthRecord | |
| try: | |
| GroundTruthRecord.model_validate(record) | |
| except Exception as e: # noqa: BLE001 | |
| st.error(f"GT edit rejected by schema for {track_id}.{field}: {e}") | |
| return False | |
| # Split audit log streams: ``_audit.jsonl`` is local-only and shipped | |
| # by ``sync_pilot publish``; ``_audit_hf.jsonl`` is Space-only and | |
| # mutated only by HF writebacks. Keeping them separate means a local | |
| # publish never clobbers Space-side history (and vice versa). | |
| audit_name = "_audit_hf.jsonl" if in_hf else "_audit.jsonl" | |
| audit_path = gt_dir / audit_name | |
| audit_path.parent.mkdir(parents=True, exist_ok=True) | |
| audit_entry = { | |
| "track_id": track_id, | |
| "field": field, | |
| "old_value": old_value, | |
| "new_value": new_value if new_value != "" else None, | |
| "cascade_family": cascade_family, | |
| "confidence_set_to": confidence, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "source": "gt-review-sheet" + ("-hf" if in_hf else ""), | |
| } | |
| with audit_path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") | |
| tmp = gt_path.with_suffix(".json.tmp") | |
| tmp.write_text( | |
| json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| tmp.replace(gt_path) | |
| if in_hf: | |
| rel = f"sync_pilot/groundtruth/median/{track_id}.json" | |
| rel_audit = f"sync_pilot/groundtruth/median/{audit_name}" | |
| _hf_writeback( | |
| [(gt_path, rel), (audit_path, rel_audit)], | |
| commit_message=f"gt-review: {track_id}.{field}", | |
| ) | |
| load_groundtruth.clear() | |
| return True | |
| def save_description( | |
| track_id: str, | |
| new_value: str, | |
| *, | |
| old_value: str | None = None, | |
| ) -> bool: | |
| """Persist a human-edited description back to the track JSON. | |
| The description lives on ``TrackRecord`` (``data/outputs/median/<tid>.json``) | |
| — distinct from the GT JSON ``save_gt_edit`` writes to. We mirror the | |
| GT save path for safety: | |
| - Atomic write (tmp file + rename). | |
| - Pydantic validation against ``TrackRecord`` before persisting so a | |
| bad edit can't land on disk. | |
| - Audit-log entry appended to ``data/groundtruth/median/_audit.jsonl`` | |
| (same JSONL the GT edits use, ``source="gt-review-sheet-desc"`` to | |
| distinguish provenance on inspection). | |
| Unlike ``save_gt_edit`` we do *not* take a ``.bak`` snapshot — the | |
| track JSON is re-derivable by re-running ``describe-batch`` and the | |
| audit log captures the prior text. Returns True on success, False | |
| on validation/write failure. In HF mode the edit is also pushed back | |
| to the private dataset. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| outputs_dir = _outputs_dir() | |
| track_path = outputs_dir / f"{track_id}.json" | |
| if not track_path.exists(): | |
| st.error(f"Track JSON not found for {track_id}") | |
| return False | |
| record = json.loads(track_path.read_text()) | |
| record["description"] = new_value if new_value else None | |
| from sync_pilot.schema import TrackRecord | |
| try: | |
| TrackRecord.model_validate(record) | |
| except Exception as e: # noqa: BLE001 | |
| st.error(f"Description edit rejected by schema for {track_id}: {e}") | |
| return False | |
| audit_name = "_audit_hf.jsonl" if in_hf else "_audit.jsonl" | |
| audit_path = _groundtruth_dir() / audit_name | |
| audit_path.parent.mkdir(parents=True, exist_ok=True) | |
| audit_entry = { | |
| "track_id": track_id, | |
| "field": "description", | |
| "old_value": old_value, | |
| "new_value": new_value if new_value else None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "source": "gt-review-sheet-desc" + ("-hf" if in_hf else ""), | |
| } | |
| with audit_path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") | |
| tmp = track_path.with_suffix(".json.tmp") | |
| tmp.write_text( | |
| json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| tmp.replace(track_path) | |
| if in_hf: | |
| rel = f"sync_pilot/outputs/median/{track_id}.json" | |
| rel_audit = f"sync_pilot/groundtruth/median/{audit_name}" | |
| _hf_writeback( | |
| [(track_path, rel), (audit_path, rel_audit)], | |
| commit_message=f"gt-review desc: {track_id}", | |
| ) | |
| load_tracks.clear() | |
| return True | |
| def save_expansion_gt_edit( | |
| track_id: str, | |
| field: str, | |
| new_value: Any, | |
| *, | |
| old_value: Any = None, | |
| confidence: str = "high", | |
| cascade_family: str | None = None, | |
| ) -> bool: | |
| """Apply one GT edit to the expansion ground-truth set. | |
| Mirrors ``save_gt_edit``: first-edit ``.bak`` snapshot (local only), | |
| atomic write, schema validation, and an audit-log append. In ``hf`` | |
| mode the edit is pushed back to the private dataset in a single atomic | |
| commit (GT JSON + Space-side ``_audit_hf.jsonl``) so curations made on | |
| the Space survive restarts; the Space audit stream stays isolated from | |
| the local ``_audit.jsonl`` so a re-publish never clobbers Space history. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| gt_dir = _expansion_groundtruth_dir() | |
| gt_path = gt_dir / f"{track_id}.json" | |
| if not gt_path.exists(): | |
| st.error(f"Expansion GT JSON not found for {track_id}") | |
| return False | |
| if not in_hf: | |
| existing_baks = list(gt_dir.glob(f"{track_id}.json.bak.*")) | |
| if not existing_baks: | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") | |
| bak = gt_dir / f"{track_id}.json.bak.{ts}" | |
| bak.write_bytes(gt_path.read_bytes()) | |
| record = json.loads(gt_path.read_text()) | |
| record[field] = new_value if new_value != "" else None | |
| conf_field = f"{field}_confidence" | |
| if conf_field in record: | |
| record[conf_field] = confidence | |
| if cascade_family: | |
| record["genre_family"] = cascade_family | |
| if "genre_family_confidence" in record: | |
| record["genre_family_confidence"] = confidence | |
| from sync_pilot.groundtruth.schema import GroundTruthRecord | |
| try: | |
| GroundTruthRecord.model_validate(record) | |
| except Exception as e: # noqa: BLE001 | |
| st.error(f"Expansion GT edit rejected by schema for {track_id}.{field}: {e}") | |
| return False | |
| audit_name = "_audit_hf.jsonl" if in_hf else "_audit.jsonl" | |
| audit_path = gt_dir / audit_name | |
| audit_path.parent.mkdir(parents=True, exist_ok=True) | |
| audit_entry = { | |
| "track_id": track_id, | |
| "field": field, | |
| "old_value": old_value, | |
| "new_value": new_value if new_value != "" else None, | |
| "cascade_family": cascade_family, | |
| "confidence_set_to": confidence, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "source": "gt-review-sheet-ext" + ("-hf" if in_hf else ""), | |
| } | |
| with audit_path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") | |
| tmp = gt_path.with_suffix(".json.tmp") | |
| tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp.replace(gt_path) | |
| if in_hf: | |
| rel = f"sync_pilot/gt_expansion/median_adjacent_combined_500/groundtruth/{track_id}.json" | |
| rel_audit = f"sync_pilot/gt_expansion/median_adjacent_combined_500/groundtruth/{audit_name}" | |
| _hf_writeback( | |
| [(gt_path, rel), (audit_path, rel_audit)], | |
| commit_message=f"gt-review ext: {track_id}.{field}", | |
| ) | |
| load_expansion_groundtruth.clear() | |
| return True | |
| def save_expansion_description( | |
| track_id: str, | |
| new_value: str, | |
| *, | |
| old_value: str | None = None, | |
| ) -> bool: | |
| """Persist an edited description to the expansion TrackRecord. | |
| Mirrors ``save_description``: atomic write, schema validation, and an | |
| audit-log append. In ``hf`` mode the track JSON + Space-side | |
| ``_audit_hf.jsonl`` are pushed back to the private dataset so the edit | |
| survives Space restarts. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| track_path = _expansion_outputs_dir() / f"{track_id}.json" | |
| if not track_path.exists(): | |
| st.error(f"Expansion track JSON not found for {track_id}") | |
| return False | |
| record = json.loads(track_path.read_text()) | |
| record["description"] = new_value if new_value else None | |
| try: | |
| TrackRecord.model_validate(record) | |
| except Exception as e: # noqa: BLE001 | |
| st.error(f"Expansion description edit rejected for {track_id}: {e}") | |
| return False | |
| audit_name = "_audit_hf.jsonl" if in_hf else "_audit.jsonl" | |
| audit_path = _expansion_groundtruth_dir() / audit_name | |
| audit_path.parent.mkdir(parents=True, exist_ok=True) | |
| audit_entry = { | |
| "track_id": track_id, | |
| "field": "description", | |
| "old_value": old_value, | |
| "new_value": new_value if new_value else None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "source": "gt-review-sheet-desc-ext" + ("-hf" if in_hf else ""), | |
| } | |
| with audit_path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(audit_entry, ensure_ascii=False) + "\n") | |
| tmp = track_path.with_suffix(".json.tmp") | |
| tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp.replace(track_path) | |
| if in_hf: | |
| rel = f"sync_pilot/outputs/gt_expansion/median_adjacent_combined_500/{track_id}.json" | |
| rel_audit = f"sync_pilot/gt_expansion/median_adjacent_combined_500/groundtruth/{audit_name}" | |
| _hf_writeback( | |
| [(track_path, rel), (audit_path, rel_audit)], | |
| commit_message=f"gt-review ext desc: {track_id}", | |
| ) | |
| load_expansion_tracks.clear() | |
| return True | |
| def load_review() -> dict[str, dict[str, Any]]: | |
| """Load the GT-review sidecar (``_review.json``). | |
| Structure is ``{track_id: {"categories": {<key>: bool}, "updated_at": | |
| iso8601}}``. Returns ``{}`` when the file does not exist (fresh catalog, | |
| no reviews yet) so callers can use ``state.get(track_id, {})`` without | |
| further guards. Bad / unparseable files surface a Streamlit warning and | |
| return ``{}`` rather than crashing the page. | |
| """ | |
| p = _review_read_path() | |
| if not p.exists(): | |
| return {} | |
| try: | |
| out = json.loads(p.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Could not parse {p.name}: {e}") | |
| return {} | |
| if not isinstance(out, dict): | |
| st.warning(f"{p.name} is not a JSON object — ignoring") | |
| return {} | |
| return out | |
| def load_expansion_review() -> dict[str, dict[str, Any]]: | |
| """Load the expansion-only GT-review sidecar.""" | |
| p = _expansion_review_path() | |
| if not p.exists(): | |
| return {} | |
| try: | |
| out = json.loads(p.read_text()) | |
| except Exception as e: # noqa: BLE001 | |
| st.warning(f"Could not parse expansion {p.name}: {e}") | |
| return {} | |
| if not isinstance(out, dict): | |
| st.warning(f"Expansion {p.name} is not a JSON object — ignoring") | |
| return {} | |
| return out | |
| def save_review(state: dict[str, dict[str, Any]]) -> None: | |
| """Atomic write of the full review sidecar. | |
| In local mode writes to ``data/groundtruth/median/_review.json``; in | |
| HF mode writes to the snapshot dir and pushes the updated file back | |
| to the private dataset. ``load_review`` is invalidated on success | |
| so the next page read reflects the new state without a TTL wait. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| p = _review_read_path() if in_hf else _review_local_path() | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = p.with_suffix(".json.tmp") | |
| tmp.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") | |
| tmp.replace(p) | |
| if in_hf: | |
| _hf_writeback( | |
| [(p, "sync_pilot/groundtruth/median/_review.json")], | |
| commit_message="gt-review: update _review.json", | |
| ) | |
| load_review.clear() | |
| def save_expansion_review(state: dict[str, dict[str, Any]]) -> None: | |
| """Atomic write of the expansion-only review sidecar. | |
| In ``hf`` mode the updated sidecar is pushed back to the private dataset | |
| (mirrors ``save_review``). ``_expansion_review_path`` already resolves to | |
| the snapshot dir in HF mode and the local data dir locally, so the same | |
| write target doubles as the push source. | |
| """ | |
| in_hf = _data_source() == "hf" | |
| p = _expansion_review_path() | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = p.with_suffix(".json.tmp") | |
| tmp.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") | |
| tmp.replace(p) | |
| if in_hf: | |
| _hf_writeback( | |
| [(p, "sync_pilot/gt_expansion/median_adjacent_combined_500/_review.json")], | |
| commit_message="gt-review ext: update _review.json", | |
| ) | |
| load_expansion_review.clear() | |
| def load_manifest() -> dict[str, dict[str, Any]]: | |
| """Return ``{track_id: manifest_row}`` for fast lookup of YouTube URLs.""" | |
| out: dict[str, dict[str, Any]] = {} | |
| p = _manifest_path() | |
| if not p.exists(): | |
| return out | |
| with p.open() as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| row = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| tid = row.get("track_id") | |
| if tid: | |
| out[tid] = row | |
| return out | |
| def load_expansion_manifest() -> dict[str, dict[str, Any]]: | |
| """Return ``{track_id: manifest_row}`` for expansion-source display.""" | |
| out: dict[str, dict[str, Any]] = {} | |
| p = _expansion_manifest_path() | |
| if not p.exists(): | |
| return out | |
| with p.open() as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| row = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| tid = row.get("track_id") | |
| if tid: | |
| out[tid] = row | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # Lyrics subtitle-leak stripper (display-only) | |
| # --------------------------------------------------------------------------- | |
| # Whisper transcribes the burned-in subtitle credit ("Altyazı M.K." / "Çeviri | |
| # ve Altyazı M.K.") at the start and end of many YouTube rips. The raw lyrics | |
| # string in the JSON deliberately keeps the leak so the on-disk record is | |
| # faithful to what the model produced; we strip it here for display. | |
| _SUBTITLE_BOILERPLATE = re.compile( | |
| r"(?:^|\s)(?:Çeviri\s+ve\s+)?Altyazı(?:yan)?(?:\s+M\.K\.)?\s*", | |
| re.IGNORECASE, | |
| ) | |
| # Broader Whisper boilerplate hallucinations on YouTube/TRT rips: thanks-for- | |
| # watching and the TRT audio-description disclaimer. Explicit char classes (not | |
| # re.IGNORECASE) avoid the Turkish dotted-İ casefold pitfall. | |
| _ASR_HALLUCINATION = re.compile( | |
| r"\s*(?:" | |
| r"[İi]zlediğiniz için teşekkür(?:ler|\s+eder\w+)?" | |
| r"|[Bb]u dizinin betimlemesi[^.]*?yaptırılmıştır" | |
| r"|[Bb]u dizinin betimlemesi(?:\s+TRT\s+tarafından)?" | |
| r"|[Ss]esli [Bb]etimleme [Dd]erneği(?:'ne)?" | |
| r")\.?\s*" | |
| ) | |
| def strip_subtitle_leak(text: str | None) -> str: | |
| """Display-only cleanup of the burned-in subtitle credit in Turkish lyrics. | |
| Removes the ``Altyazı M.K.`` / ``Çeviri ve Altyazı M.K.`` boilerplate | |
| that Whisper picks up from baked-in YouTube subtitles, plus broader | |
| thanks-for-watching / TRT audio-description hallucinations. Does NOT touch | |
| the on-disk JSON — the raw string is preserved there for provenance. | |
| """ | |
| if not text: | |
| return "" | |
| cleaned = _SUBTITLE_BOILERPLATE.sub(" ", text) | |
| cleaned = _ASR_HALLUCINATION.sub(" ", cleaned) | |
| # Collapse the spaces we may have introduced. | |
| return re.sub(r"\s{2,}", " ", cleaned).strip() | |
| # --------------------------------------------------------------------------- | |
| # Taxonomy parsing | |
| # --------------------------------------------------------------------------- | |
| class TaxonomyDimension: | |
| """One ``### Dimension N — Title`` block from the taxonomy markdown.""" | |
| number: int | |
| title: str | |
| tr_label: str = "" | |
| en_label: str = "" | |
| definition: str = "" | |
| multiplicity: str = "" | |
| default: str = "" | |
| source_authority: str = "" | |
| expert_review_required: bool = False | |
| controlled_vocab: list[dict[str, str]] = field(default_factory=list) | |
| example_tracks: list[dict[str, str]] = field(default_factory=list) | |
| notes: str = "" | |
| raw_markdown: str = "" | |
| class TaxonomyOpenQuestion: | |
| """One ``Q<N>.`` entry from the open-questions section.""" | |
| number: int | |
| question: str | |
| body: str | |
| reviewers: list[str] = field(default_factory=list) | |
| class TaxonomySpec: | |
| """Structured representation of ``taxonomy.md`` for dashboard rendering.""" | |
| version: str = "" | |
| status: str = "" | |
| last_updated: str = "" | |
| catalog: str = "" | |
| authors: str = "" | |
| review_targets: str = "" | |
| purpose_md: str = "" | |
| schema_conventions_md: str = "" | |
| dimensions: list[TaxonomyDimension] = field(default_factory=list) | |
| open_questions: list[TaxonomyOpenQuestion] = field(default_factory=list) | |
| references_md: str = "" | |
| raw: str = "" | |
| _META_PATTERN = re.compile(r"<!--(.*?)-->", re.DOTALL) | |
| _BOLD_FIELD_PATTERN = re.compile(r"\*\*([^*:]+):\*\*\s*(.+)") | |
| def _parse_metadata_block(raw: str) -> dict[str, str]: | |
| m = _META_PATTERN.search(raw) | |
| out: dict[str, str] = {} | |
| if not m: | |
| return out | |
| block = m.group(1) | |
| # The header uses YAML-ish ``key: value`` lines; nested fields under | |
| # ``review_targets:`` are preserved as a multi-line string. | |
| current_key: str | None = None | |
| for line in block.splitlines(): | |
| line = line.rstrip() | |
| if not line.strip(): | |
| continue | |
| if line.startswith(" - ") and current_key: | |
| out[current_key] += "\n" + line.strip() | |
| continue | |
| if ":" in line: | |
| key, _, val = line.partition(":") | |
| key = key.strip() | |
| val = val.strip() | |
| if not key: | |
| continue | |
| current_key = key | |
| out[key] = val | |
| return out | |
| def _parse_dimension_block(num: int, body: str) -> TaxonomyDimension: | |
| """Parse a single ``### Dimension N — Title`` block into structured form. | |
| Best-effort: we extract the fields we know how to render and stash the | |
| whole body in ``raw_markdown`` so the renderer can fall back to a | |
| plain markdown dump for anything we didn't decompose. | |
| """ | |
| title_match = re.match(r"###\s+Dimension\s+\d+\s+—\s+(.+)", body.splitlines()[0]) | |
| title = title_match.group(1).strip() if title_match else f"Dimension {num}" | |
| dim = TaxonomyDimension(number=num, title=title, raw_markdown=body) | |
| # **TR / EN:** `tr` / `en` | |
| tr_en = re.search(r"\*\*TR\s*/\s*EN:\*\*\s*`([^`]+)`\s*/\s*`([^`]+)`", body) | |
| if tr_en: | |
| dim.tr_label = tr_en.group(1) | |
| dim.en_label = tr_en.group(2) | |
| defn = re.search(r"\*\*Definition:\*\*\s*(.+?)(?=\n\*\*|\Z)", body, re.DOTALL) | |
| if defn: | |
| dim.definition = defn.group(1).strip() | |
| mult = re.search(r"\*\*Multiplicity:\*\*\s*(.+)", body) | |
| if mult: | |
| dim.multiplicity = mult.group(1).strip() | |
| deflt = re.search(r"\*\*Default:\*\*\s*(.+)", body) | |
| if deflt: | |
| dim.default = deflt.group(1).strip() | |
| src = re.search(r"\*\*Source authority:\*\*\s*(.+?)(?=\n\*\*|\Z)", body, re.DOTALL) | |
| if src: | |
| dim.source_authority = src.group(1).strip() | |
| notes = re.search(r"\*\*Notes\s*/\s*ambiguities:\*\*\s*(.+?)(?=\n---|\Z)", body, re.DOTALL) | |
| if notes: | |
| dim.notes = notes.group(1).strip() | |
| # The makam dimension explicitly says it's expert_review_required. | |
| if "expert_review_required: true" in body or "⚠ CRITICAL" in body: | |
| dim.expert_review_required = True | |
| # Controlled-vocab extraction: top-level ``- `term` — definition`` bullets, | |
| # found after the ``**Controlled vocabulary`` heading and before the next | |
| # ``**`` heading. We don't try to disambiguate grouped sub-headings here — | |
| # the renderer falls back to raw markdown for that depth of structure. | |
| vocab_section = re.search( | |
| r"\*\*Controlled vocabulary[^*]*?:?\*\*\s*(.+?)(?=\n\*\*|\Z)", | |
| body, | |
| re.DOTALL, | |
| ) | |
| if vocab_section: | |
| for line in vocab_section.group(1).splitlines(): | |
| m = re.match(r"^\s*-\s*`([^`]+)`\s*(?:—\s*(.+))?$", line) | |
| if m: | |
| term = m.group(1).strip() | |
| definition = (m.group(2) or "").strip() | |
| dim.controlled_vocab.append({"term": term, "definition": definition}) | |
| # Example tracks: ``- `<track_id>` → <comment>``. | |
| ex_section = re.search( | |
| r"\*\*Example tracks[^*]*?:?\*\*\s*(.+?)(?=\n\*\*|\Z)", | |
| body, | |
| re.DOTALL, | |
| ) | |
| if ex_section: | |
| for line in ex_section.group(1).splitlines(): | |
| m = re.match(r"^\s*-\s*`([0-9_a-zA-ZşŞçÇğĞıİöÖüÜ]+)`\s*→\s*(.+)$", line) | |
| if m: | |
| dim.example_tracks.append( | |
| {"track_id": m.group(1).strip(), "note": m.group(2).strip()} | |
| ) | |
| return dim | |
| def _parse_open_questions(section_md: str) -> list[TaxonomyOpenQuestion]: | |
| """Parse the ``## 4. Open questions`` body into a list of cards. | |
| Each block starts ``**Q<N>. ...?**`` and runs until the next ``**Q<N+1>`` | |
| or the section terminator. The body may contain explicit reviewer call- | |
| outs like ``**Aran:`` / ``**Murat:`` / ``**Emre:`` which we lift into | |
| a ``reviewers`` list for badge rendering. | |
| """ | |
| out: list[TaxonomyOpenQuestion] = [] | |
| # Split on each Q-header. We keep the headers via a lookahead split. | |
| parts = re.split(r"\n(?=\*\*Q\d+\.)", section_md) | |
| for part in parts: | |
| m = re.match(r"\*\*Q(\d+)\.\s*(.+?)\*\*\s*(.*)", part, re.DOTALL) | |
| if not m: | |
| continue | |
| number = int(m.group(1)) | |
| question = m.group(2).strip().rstrip("*").strip() | |
| body = m.group(3).strip() | |
| reviewers: list[str] = [] | |
| for name in ("Aran", "Murat", "Emre"): | |
| if re.search(rf"\b{name}\b", body): | |
| reviewers.append(name) | |
| out.append( | |
| TaxonomyOpenQuestion( | |
| number=number, question=question, body=body, reviewers=reviewers | |
| ) | |
| ) | |
| out.sort(key=lambda q: q.number) | |
| return out | |
| def load_taxonomy() -> TaxonomySpec: | |
| """Parse ``taxonomy.md`` into a structured ``TaxonomySpec``. | |
| Any sections we fail to decompose are still available via ``.raw`` so | |
| page code can fall back to a plain markdown dump. We deliberately keep | |
| the parser narrow rather than pulling in a real markdown AST — the | |
| file's structure is stable and the parser is the only consumer. | |
| """ | |
| p = _taxonomy_path() | |
| if not p.exists(): | |
| return TaxonomySpec() | |
| raw = p.read_text() | |
| spec = TaxonomySpec(raw=raw) | |
| meta = _parse_metadata_block(raw) | |
| spec.version = meta.get("taxonomy_version", "") | |
| spec.status = meta.get("status", "") | |
| spec.last_updated = meta.get("last_updated", "") | |
| spec.catalog = meta.get("catalog", "") | |
| spec.authors = meta.get("authors", "") | |
| spec.review_targets = meta.get("review_targets", "") | |
| # The major section headers we slice on. We split conservatively so the | |
| # parser keeps working if new top-level sections are added. | |
| purpose_match = re.search( | |
| r"##\s*1\. Purpose & scope(.+?)(?=##\s*\d+\.)", raw, re.DOTALL | |
| ) | |
| if purpose_match: | |
| spec.purpose_md = purpose_match.group(1).strip() | |
| conv_match = re.search( | |
| r"##\s*2\. Schema conventions(.+?)(?=##\s*\d+\.)", raw, re.DOTALL | |
| ) | |
| if conv_match: | |
| spec.schema_conventions_md = conv_match.group(1).strip() | |
| dims_match = re.search( | |
| r"##\s*3\. Dimensions(.+?)(?=##\s*\d+\.\s*Open questions)", raw, re.DOTALL | |
| ) | |
| if dims_match: | |
| dims_body = dims_match.group(1) | |
| # Split on each ``### Dimension N`` header. | |
| chunks = re.split(r"\n(?=###\s+Dimension\s+\d+\s+—)", dims_body) | |
| for chunk in chunks: | |
| m = re.match(r"###\s+Dimension\s+(\d+)", chunk) | |
| if not m: | |
| continue | |
| n = int(m.group(1)) | |
| spec.dimensions.append(_parse_dimension_block(n, chunk.strip())) | |
| spec.dimensions.sort(key=lambda d: d.number) | |
| open_q_match = re.search( | |
| r"##\s*4\. Open questions[^\n]*\n(.+?)(?=##\s*\d+\.)", raw, re.DOTALL | |
| ) | |
| if open_q_match: | |
| spec.open_questions = _parse_open_questions(open_q_match.group(1)) | |
| refs_match = re.search(r"##\s*5\. References(.+)", raw, re.DOTALL) | |
| if refs_match: | |
| spec.references_md = refs_match.group(1).strip() | |
| return spec | |
| # --------------------------------------------------------------------------- | |
| # Convenience derived data | |
| # --------------------------------------------------------------------------- | |
| def total_audio_minutes(tracks: list[dict[str, Any]]) -> float: | |
| """Sum ``duration_sec`` across all tracks; convert to minutes.""" | |
| return sum(float(t.get("duration_sec", 0.0)) for t in tracks) / 60.0 | |
| def parse_iso(ts: str | None) -> datetime | None: | |
| """Best-effort ISO-8601 parse; returns None on failure or empty input.""" | |
| if not ts: | |
| return None | |
| try: | |
| return datetime.fromisoformat(ts.replace("Z", "+00:00")) | |
| except ValueError: | |
| return None | |