""" SQLite-backed curation index. Why SQLite: - fast ad-hoc queries over large local datasets - incremental updates (skip unchanged bundles) - no extra dependencies """ from __future__ import annotations import json import os import sqlite3 import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Literal, Optional from .indexer import CurationIndexConfig, _bundle_row, _discover_bundles @dataclass(frozen=True) class SQLiteIndexConfig: workers: int = 8 incremental: bool = True include_depth_stream_summary: bool = True discover: Literal["children", "recursive"] = "children" def _now_s() -> float: return float(time.time()) def _connect(db_path: Path) -> sqlite3.Connection: p = Path(db_path) p.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(p)) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA temp_store=MEMORY;") conn.execute("PRAGMA foreign_keys=ON;") return conn def _ensure_column(conn: sqlite3.Connection, *, table: str, column: str, decl: str) -> None: """ Best-effort migration helper: add a column if it doesn't exist. """ cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table});").fetchall()] if str(column) not in cols: conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl};") def _ensure_schema(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS bundles ( bundle_dir TEXT PRIMARY KEY, capture_id TEXT, created_at TEXT, scene_type TEXT, operating_regime TEXT, source_format TEXT, has_packed_depth INTEGER, num_devices INTEGER, default_device_id TEXT, teacher_depth_count INTEGER, teacher_depth_mtime_ns INTEGER, manifest_mtime_ns INTEGER, manifest_size INTEGER, indexed_at_unix_s REAL, row_json TEXT ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS devices ( bundle_dir TEXT NOT NULL, device_id TEXT NOT NULL, video_path TEXT, video_size_bytes INTEGER, intrinsics_path TEXT, timestamps_path TEXT, packed_depth_index_path TEXT, packed_depth_frames_count INTEGER, packed_depth_gaps INTEGER, PRIMARY KEY (bundle_dir, device_id), FOREIGN KEY (bundle_dir) REFERENCES bundles(bundle_dir) ON DELETE CASCADE ); """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_scene ON bundles(scene_type);") conn.execute( "CREATE INDEX IF NOT EXISTS idx_bundles_packed_depth ON bundles(has_packed_depth);" ) conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_source ON bundles(source_format);") conn.execute( "CREATE INDEX IF NOT EXISTS idx_bundles_teacher_depth ON bundles(teacher_depth_count);" ) # Migrations for older DBs that predate these columns. _ensure_column(conn, table="bundles", column="default_device_id", decl="TEXT") _ensure_column(conn, table="bundles", column="teacher_depth_count", decl="INTEGER") _ensure_column(conn, table="bundles", column="teacher_depth_mtime_ns", decl="INTEGER") def _teacher_depth_stats(bundle_dir: Path) -> tuple[int, int]: """ Return (teacher_depth_count, teacher_depth_mtime_ns). """ depth_dir = Path(bundle_dir) / "teacher_outputs" / "depth" if not depth_dir.exists() or not depth_dir.is_dir(): return 0, 0 try: st = depth_dir.stat() mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) except Exception: mtime_ns = 0 try: n = 0 for ent in os.scandir(depth_dir): if not ent.is_file(): continue name = ent.name if name.startswith("frame_") and name.endswith(".npy"): n += 1 return int(n), int(mtime_ns) except Exception: return 0, int(mtime_ns) def _existing_manifest_stats(conn: sqlite3.Connection) -> Dict[str, Dict[str, int]]: cur = conn.execute( "SELECT bundle_dir, manifest_mtime_ns, manifest_size, " "COALESCE(teacher_depth_count,0), COALESCE(teacher_depth_mtime_ns,0) " "FROM bundles;" ) out: Dict[str, Dict[str, int]] = {} for bdir, mtime_ns, size, td_count, td_mtime_ns in cur.fetchall(): if bdir: out[str(bdir)] = { "manifest_mtime_ns": int(mtime_ns or 0), "manifest_size": int(size or 0), "teacher_depth_count": int(td_count or 0), "teacher_depth_mtime_ns": int(td_mtime_ns or 0), } return out def _upsert_row( conn: sqlite3.Connection, *, row: Dict[str, Any], manifest_mtime_ns: int, manifest_size: int, teacher_depth_count: int, teacher_depth_mtime_ns: int, ) -> None: bdir = str(row.get("bundle_dir", "")) capture_id = row.get("capture_id") created_at = row.get("created_at") scene_type = row.get("scene_type") operating_regime = row.get("operating_regime") ingest = row.get("ingest") if isinstance(row.get("ingest"), dict) else {} source_format = ingest.get("source_format") if isinstance(ingest, dict) else None summary = row.get("summary") if isinstance(row.get("summary"), dict) else {} has_packed_depth = 1 if bool(summary.get("has_packed_depth")) else 0 num_devices = int(summary.get("num_devices") or 0) row_json = json.dumps(row, default=str) indexed_at = float(row.get("indexed_at_unix_s") or _now_s()) devs = row.get("devices") if isinstance(row.get("devices"), list) else [] default_device_id = None try: if devs and isinstance(devs[0], dict): default_device_id = str(devs[0].get("device_id") or "") or None except Exception: default_device_id = None conn.execute( """ INSERT INTO bundles( bundle_dir, capture_id, created_at, scene_type, operating_regime, source_format, has_packed_depth, num_devices, default_device_id, teacher_depth_count, teacher_depth_mtime_ns, manifest_mtime_ns, manifest_size, indexed_at_unix_s, row_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(bundle_dir) DO UPDATE SET capture_id=excluded.capture_id, created_at=excluded.created_at, scene_type=excluded.scene_type, operating_regime=excluded.operating_regime, source_format=excluded.source_format, has_packed_depth=excluded.has_packed_depth, num_devices=excluded.num_devices, default_device_id=excluded.default_device_id, teacher_depth_count=excluded.teacher_depth_count, teacher_depth_mtime_ns=excluded.teacher_depth_mtime_ns, manifest_mtime_ns=excluded.manifest_mtime_ns, manifest_size=excluded.manifest_size, indexed_at_unix_s=excluded.indexed_at_unix_s, row_json=excluded.row_json; """, ( bdir, str(capture_id) if capture_id is not None else None, str(created_at) if created_at is not None else None, str(scene_type) if scene_type is not None else None, str(operating_regime) if operating_regime is not None else None, str(source_format) if source_format is not None else None, int(has_packed_depth), int(num_devices), str(default_device_id) if default_device_id is not None else None, int(teacher_depth_count), int(teacher_depth_mtime_ns), int(manifest_mtime_ns), int(manifest_size), float(indexed_at), row_json, ), ) # Devices table: simple upsert per device for d in devs: if not isinstance(d, dict): continue did = str(d.get("device_id") or "") if not did: continue wsum = d.get("waveform_depth_stream_summary") idx_path = None frames_count = None gaps = None if isinstance(d.get("waveform_depth_stream"), dict): idx_path = d["waveform_depth_stream"].get("index_path") if isinstance(wsum, dict) and bool(wsum.get("ok")): fr = wsum.get("frames", {}) if isinstance(wsum.get("frames"), dict) else {} frames_count = fr.get("count") gaps = fr.get("gaps") conn.execute( """ INSERT INTO devices( bundle_dir, device_id, video_path, video_size_bytes, intrinsics_path, timestamps_path, packed_depth_index_path, packed_depth_frames_count, packed_depth_gaps ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(bundle_dir, device_id) DO UPDATE SET video_path=excluded.video_path, video_size_bytes=excluded.video_size_bytes, intrinsics_path=excluded.intrinsics_path, timestamps_path=excluded.timestamps_path, packed_depth_index_path=excluded.packed_depth_index_path, packed_depth_frames_count=excluded.packed_depth_frames_count, packed_depth_gaps=excluded.packed_depth_gaps; """, ( bdir, did, d.get("video_path"), d.get("video_path_size_bytes"), d.get("intrinsics_path"), d.get("timestamps_path"), str(idx_path) if idx_path else None, int(frames_count) if frames_count is not None else None, int(gaps) if gaps is not None else None, ), ) def build_curation_index_sqlite( *, captures_root: Path, db_path: Path, config: Optional[SQLiteIndexConfig] = None, ) -> Dict[str, Any]: cfg = config or SQLiteIndexConfig() captures_root = Path(captures_root) db_path = Path(db_path) t0 = _now_s() conn = _connect(db_path) try: _ensure_schema(conn) existing = _existing_manifest_stats(conn) if bool(cfg.incremental) else {} bundles = _discover_bundles(captures_root, mode=str(cfg.discover)) to_index = [] skipped = 0 for b in bundles: mp = Path(b) / "manifest.json" try: st = mp.stat() mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) size = int(st.st_size) except Exception: mtime_ns = 0 size = 0 td_count, td_mtime_ns = _teacher_depth_stats(Path(b)) prev = existing.get(str(b)) if ( prev and int(prev.get("manifest_mtime_ns", -1)) == mtime_ns and int(prev.get("manifest_size", -2)) == size and int(prev.get("teacher_depth_count", -3)) == int(td_count) and int(prev.get("teacher_depth_mtime_ns", -4)) == int(td_mtime_ns) ): skipped += 1 continue to_index.append((Path(b), mtime_ns, size, int(td_count), int(td_mtime_ns))) # Parallel row computation; single-thread DB writes curation_cfg = CurationIndexConfig( workers=int(cfg.workers), include_depth_stream_summary=bool(cfg.include_depth_stream_summary), discover=str(cfg.discover), ) # Use the same worker pattern as JSONL indexer via its internal row builder. from concurrent.futures import ThreadPoolExecutor, as_completed ok = 0 fail = 0 conn.execute("BEGIN;") with ThreadPoolExecutor(max_workers=max(1, int(cfg.workers))) as ex: futs = { ex.submit( _bundle_row, bdir, include_depth_stream_summary=bool(curation_cfg.include_depth_stream_summary), ): (bdir, mtime_ns, size, td_count, td_mtime_ns) for (bdir, mtime_ns, size, td_count, td_mtime_ns) in to_index } n_written = 0 for f in as_completed(futs): bdir, mtime_ns, size, td_count, td_mtime_ns = futs[f] row = f.result() try: _upsert_row( conn, row=row, manifest_mtime_ns=mtime_ns, manifest_size=size, teacher_depth_count=int(td_count), teacher_depth_mtime_ns=int(td_mtime_ns), ) if bool(row.get("ok")) and not row.get("error"): ok += 1 else: fail += 1 except Exception: fail += 1 n_written += 1 # Periodic commit to keep WAL bounded if n_written % 200 == 0: conn.execute("COMMIT;") conn.execute("BEGIN;") conn.execute("COMMIT;") return { "captures_root": str(captures_root), "db_path": str(db_path), "num_bundles_found": int(len(bundles)), "num_indexed": int(len(to_index)), "num_skipped": int(skipped), "ok": int(ok), "failed": int(fail), "elapsed_s": float(_now_s() - t0), } finally: try: conn.close() except Exception: pass