""" Query helpers for the SQLite curation index. Design goals: - fast filtering for dataset selection - safe parameterized SQL (no arbitrary SQL execution by default) - easy export to paths / jsonl """ from __future__ import annotations import json import sqlite3 from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Sequence @dataclass(frozen=True) class QueryFilters: # bundle-level filters source_format: Optional[str] = None has_packed_depth: Optional[bool] = None scene_type: Optional[str] = None operating_regime: Optional[str] = None min_devices: Optional[int] = None # device-level packed depth filters (requires join) packed_depth_min_frames: Optional[int] = None packed_depth_max_gaps: Optional[int] = None def _connect_ro(db_path: Path) -> sqlite3.Connection: # URI mode to open read-only even if file is shared p = Path(db_path).resolve() conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn def query_bundle_dirs( *, db_path: Path, filters: Optional[QueryFilters] = None, limit: Optional[int] = None, order_by: str = "bundle_dir", ) -> List[str]: """ Return a list of bundle_dir strings matching filters. """ f = filters or QueryFilters() params: List[Any] = [] where: List[str] = [] join_devices = False if f.source_format is not None: where.append("b.source_format = ?") params.append(str(f.source_format)) if f.has_packed_depth is not None: where.append("b.has_packed_depth = ?") params.append(1 if bool(f.has_packed_depth) else 0) if f.scene_type is not None: where.append("b.scene_type = ?") params.append(str(f.scene_type)) if f.operating_regime is not None: where.append("b.operating_regime = ?") params.append(str(f.operating_regime)) if f.min_devices is not None: where.append("b.num_devices >= ?") params.append(int(f.min_devices)) if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: join_devices = True # Only consider devices that actually have packed depth index summarized. where.append("d.packed_depth_index_path IS NOT NULL") if f.packed_depth_min_frames is not None: where.append("d.packed_depth_frames_count >= ?") params.append(int(f.packed_depth_min_frames)) if f.packed_depth_max_gaps is not None: where.append("d.packed_depth_gaps <= ?") params.append(int(f.packed_depth_max_gaps)) wsql = "" if where: wsql = "WHERE " + " AND ".join(where) if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: order_by = "bundle_dir" sql = ( "SELECT DISTINCT b.bundle_dir AS bundle_dir " "FROM bundles b " + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") + wsql + f" ORDER BY b.{order_by} ASC" ) if limit is not None and int(limit) > 0: sql += " LIMIT ?" params.append(int(limit)) conn = _connect_ro(Path(db_path)) try: rows = conn.execute(sql, params).fetchall() return [str(r["bundle_dir"]) for r in rows] finally: conn.close() def iter_bundle_shard_seeds( *, db_path: Path, filters: Optional[QueryFilters] = None, limit: Optional[int] = None, order_by: str = "bundle_dir", ) -> Iterator[Dict[str, Any]]: """ Stream bundle "seed" rows needed for sample-index generation. Returns dicts with: - bundle_dir - num_devices - default_device_id - teacher_depth_count """ f = filters or QueryFilters() params: List[Any] = [] where: List[str] = [] join_devices = False if f.source_format is not None: where.append("b.source_format = ?") params.append(str(f.source_format)) if f.has_packed_depth is not None: where.append("b.has_packed_depth = ?") params.append(1 if bool(f.has_packed_depth) else 0) if f.scene_type is not None: where.append("b.scene_type = ?") params.append(str(f.scene_type)) if f.operating_regime is not None: where.append("b.operating_regime = ?") params.append(str(f.operating_regime)) if f.min_devices is not None: where.append("b.num_devices >= ?") params.append(int(f.min_devices)) if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: join_devices = True where.append("d.packed_depth_index_path IS NOT NULL") if f.packed_depth_min_frames is not None: where.append("d.packed_depth_frames_count >= ?") params.append(int(f.packed_depth_min_frames)) if f.packed_depth_max_gaps is not None: where.append("d.packed_depth_gaps <= ?") params.append(int(f.packed_depth_max_gaps)) wsql = "" if where: wsql = "WHERE " + " AND ".join(where) if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: order_by = "bundle_dir" sql = ( "SELECT DISTINCT " "b.bundle_dir AS bundle_dir, " "COALESCE(b.num_devices, 0) AS num_devices, " "b.default_device_id AS default_device_id, " "COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count " "FROM bundles b " + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") + wsql + f" ORDER BY b.{order_by} ASC" ) if limit is not None and int(limit) > 0: sql += " LIMIT ?" params.append(int(limit)) conn = _connect_ro(Path(db_path)) try: cur = conn.execute(sql, params) for r in cur: yield { "bundle_dir": str(r["bundle_dir"]), "num_devices": int(r["num_devices"] or 0), "default_device_id": r["default_device_id"], "teacher_depth_count": int(r["teacher_depth_count"] or 0), } finally: conn.close() def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None: p = Path(output_path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else "")) def export_rows_jsonl( *, db_path: Path, bundle_dirs: Sequence[str], output_path: Path, ) -> None: """ Export stored row_json for specific bundle dirs as JSONL. """ p = Path(output_path) p.parent.mkdir(parents=True, exist_ok=True) # Chunk to keep SQL param counts reasonable chunk = 500 conn = _connect_ro(Path(db_path)) try: with p.open("w") as f: for i in range(0, len(bundle_dirs), chunk): sub = [str(x) for x in bundle_dirs[i : i + chunk]] if not sub: continue qmarks = ",".join(["?"] * len(sub)) sql = ( "SELECT row_json FROM bundles " f"WHERE bundle_dir IN ({qmarks}) " "ORDER BY bundle_dir ASC" ) rows = conn.execute(sql, sub).fetchall() for r in rows: try: # row_json is already JSON; normalize to single-line JSONL obj = json.loads(r["row_json"]) f.write(json.dumps(obj, default=str) + "\n") except Exception: # fall back to raw blob f.write(str(r["row_json"]) + "\n") finally: conn.close()