|
|
""" |
|
|
SQLite-backed curation index. |
|
|
|
|
|
Why SQLite: |
|
|
- fast ad-hoc queries over large local datasets |
|
|
- incremental updates (skip unchanged bundles) |
|
|
- no extra dependencies |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import os |
|
|
import sqlite3 |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Literal, Optional |
|
|
|
|
|
from .indexer import CurationIndexConfig, _bundle_row, _discover_bundles |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class SQLiteIndexConfig: |
|
|
workers: int = 8 |
|
|
incremental: bool = True |
|
|
include_depth_stream_summary: bool = True |
|
|
discover: Literal["children", "recursive"] = "children" |
|
|
|
|
|
|
|
|
def _now_s() -> float: |
|
|
return float(time.time()) |
|
|
|
|
|
|
|
|
def _connect(db_path: Path) -> sqlite3.Connection: |
|
|
p = Path(db_path) |
|
|
p.parent.mkdir(parents=True, exist_ok=True) |
|
|
conn = sqlite3.connect(str(p)) |
|
|
conn.execute("PRAGMA journal_mode=WAL;") |
|
|
conn.execute("PRAGMA synchronous=NORMAL;") |
|
|
conn.execute("PRAGMA temp_store=MEMORY;") |
|
|
conn.execute("PRAGMA foreign_keys=ON;") |
|
|
return conn |
|
|
|
|
|
|
|
|
def _ensure_column(conn: sqlite3.Connection, *, table: str, column: str, decl: str) -> None: |
|
|
""" |
|
|
Best-effort migration helper: add a column if it doesn't exist. |
|
|
""" |
|
|
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table});").fetchall()] |
|
|
if str(column) not in cols: |
|
|
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl};") |
|
|
|
|
|
|
|
|
def _ensure_schema(conn: sqlite3.Connection) -> None: |
|
|
conn.execute( |
|
|
""" |
|
|
CREATE TABLE IF NOT EXISTS bundles ( |
|
|
bundle_dir TEXT PRIMARY KEY, |
|
|
capture_id TEXT, |
|
|
created_at TEXT, |
|
|
scene_type TEXT, |
|
|
operating_regime TEXT, |
|
|
source_format TEXT, |
|
|
has_packed_depth INTEGER, |
|
|
num_devices INTEGER, |
|
|
default_device_id TEXT, |
|
|
teacher_depth_count INTEGER, |
|
|
teacher_depth_mtime_ns INTEGER, |
|
|
manifest_mtime_ns INTEGER, |
|
|
manifest_size INTEGER, |
|
|
indexed_at_unix_s REAL, |
|
|
row_json TEXT |
|
|
); |
|
|
""" |
|
|
) |
|
|
conn.execute( |
|
|
""" |
|
|
CREATE TABLE IF NOT EXISTS devices ( |
|
|
bundle_dir TEXT NOT NULL, |
|
|
device_id TEXT NOT NULL, |
|
|
video_path TEXT, |
|
|
video_size_bytes INTEGER, |
|
|
intrinsics_path TEXT, |
|
|
timestamps_path TEXT, |
|
|
packed_depth_index_path TEXT, |
|
|
packed_depth_frames_count INTEGER, |
|
|
packed_depth_gaps INTEGER, |
|
|
PRIMARY KEY (bundle_dir, device_id), |
|
|
FOREIGN KEY (bundle_dir) REFERENCES bundles(bundle_dir) ON DELETE CASCADE |
|
|
); |
|
|
""" |
|
|
) |
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_scene ON bundles(scene_type);") |
|
|
conn.execute( |
|
|
"CREATE INDEX IF NOT EXISTS idx_bundles_packed_depth ON bundles(has_packed_depth);" |
|
|
) |
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_source ON bundles(source_format);") |
|
|
conn.execute( |
|
|
"CREATE INDEX IF NOT EXISTS idx_bundles_teacher_depth ON bundles(teacher_depth_count);" |
|
|
) |
|
|
|
|
|
|
|
|
_ensure_column(conn, table="bundles", column="default_device_id", decl="TEXT") |
|
|
_ensure_column(conn, table="bundles", column="teacher_depth_count", decl="INTEGER") |
|
|
_ensure_column(conn, table="bundles", column="teacher_depth_mtime_ns", decl="INTEGER") |
|
|
|
|
|
|
|
|
def _teacher_depth_stats(bundle_dir: Path) -> tuple[int, int]: |
|
|
""" |
|
|
Return (teacher_depth_count, teacher_depth_mtime_ns). |
|
|
""" |
|
|
depth_dir = Path(bundle_dir) / "teacher_outputs" / "depth" |
|
|
if not depth_dir.exists() or not depth_dir.is_dir(): |
|
|
return 0, 0 |
|
|
try: |
|
|
st = depth_dir.stat() |
|
|
mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) |
|
|
except Exception: |
|
|
mtime_ns = 0 |
|
|
try: |
|
|
n = 0 |
|
|
for ent in os.scandir(depth_dir): |
|
|
if not ent.is_file(): |
|
|
continue |
|
|
name = ent.name |
|
|
if name.startswith("frame_") and name.endswith(".npy"): |
|
|
n += 1 |
|
|
return int(n), int(mtime_ns) |
|
|
except Exception: |
|
|
return 0, int(mtime_ns) |
|
|
|
|
|
|
|
|
def _existing_manifest_stats(conn: sqlite3.Connection) -> Dict[str, Dict[str, int]]: |
|
|
cur = conn.execute( |
|
|
"SELECT bundle_dir, manifest_mtime_ns, manifest_size, " |
|
|
"COALESCE(teacher_depth_count,0), COALESCE(teacher_depth_mtime_ns,0) " |
|
|
"FROM bundles;" |
|
|
) |
|
|
out: Dict[str, Dict[str, int]] = {} |
|
|
for bdir, mtime_ns, size, td_count, td_mtime_ns in cur.fetchall(): |
|
|
if bdir: |
|
|
out[str(bdir)] = { |
|
|
"manifest_mtime_ns": int(mtime_ns or 0), |
|
|
"manifest_size": int(size or 0), |
|
|
"teacher_depth_count": int(td_count or 0), |
|
|
"teacher_depth_mtime_ns": int(td_mtime_ns or 0), |
|
|
} |
|
|
return out |
|
|
|
|
|
|
|
|
def _upsert_row( |
|
|
conn: sqlite3.Connection, |
|
|
*, |
|
|
row: Dict[str, Any], |
|
|
manifest_mtime_ns: int, |
|
|
manifest_size: int, |
|
|
teacher_depth_count: int, |
|
|
teacher_depth_mtime_ns: int, |
|
|
) -> None: |
|
|
bdir = str(row.get("bundle_dir", "")) |
|
|
capture_id = row.get("capture_id") |
|
|
created_at = row.get("created_at") |
|
|
scene_type = row.get("scene_type") |
|
|
operating_regime = row.get("operating_regime") |
|
|
ingest = row.get("ingest") if isinstance(row.get("ingest"), dict) else {} |
|
|
source_format = ingest.get("source_format") if isinstance(ingest, dict) else None |
|
|
summary = row.get("summary") if isinstance(row.get("summary"), dict) else {} |
|
|
has_packed_depth = 1 if bool(summary.get("has_packed_depth")) else 0 |
|
|
num_devices = int(summary.get("num_devices") or 0) |
|
|
|
|
|
row_json = json.dumps(row, default=str) |
|
|
indexed_at = float(row.get("indexed_at_unix_s") or _now_s()) |
|
|
|
|
|
devs = row.get("devices") if isinstance(row.get("devices"), list) else [] |
|
|
default_device_id = None |
|
|
try: |
|
|
if devs and isinstance(devs[0], dict): |
|
|
default_device_id = str(devs[0].get("device_id") or "") or None |
|
|
except Exception: |
|
|
default_device_id = None |
|
|
|
|
|
conn.execute( |
|
|
""" |
|
|
INSERT INTO bundles( |
|
|
bundle_dir, capture_id, created_at, scene_type, operating_regime, |
|
|
source_format, has_packed_depth, num_devices, default_device_id, |
|
|
teacher_depth_count, teacher_depth_mtime_ns, |
|
|
manifest_mtime_ns, manifest_size, indexed_at_unix_s, row_json |
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
ON CONFLICT(bundle_dir) DO UPDATE SET |
|
|
capture_id=excluded.capture_id, |
|
|
created_at=excluded.created_at, |
|
|
scene_type=excluded.scene_type, |
|
|
operating_regime=excluded.operating_regime, |
|
|
source_format=excluded.source_format, |
|
|
has_packed_depth=excluded.has_packed_depth, |
|
|
num_devices=excluded.num_devices, |
|
|
default_device_id=excluded.default_device_id, |
|
|
teacher_depth_count=excluded.teacher_depth_count, |
|
|
teacher_depth_mtime_ns=excluded.teacher_depth_mtime_ns, |
|
|
manifest_mtime_ns=excluded.manifest_mtime_ns, |
|
|
manifest_size=excluded.manifest_size, |
|
|
indexed_at_unix_s=excluded.indexed_at_unix_s, |
|
|
row_json=excluded.row_json; |
|
|
""", |
|
|
( |
|
|
bdir, |
|
|
str(capture_id) if capture_id is not None else None, |
|
|
str(created_at) if created_at is not None else None, |
|
|
str(scene_type) if scene_type is not None else None, |
|
|
str(operating_regime) if operating_regime is not None else None, |
|
|
str(source_format) if source_format is not None else None, |
|
|
int(has_packed_depth), |
|
|
int(num_devices), |
|
|
str(default_device_id) if default_device_id is not None else None, |
|
|
int(teacher_depth_count), |
|
|
int(teacher_depth_mtime_ns), |
|
|
int(manifest_mtime_ns), |
|
|
int(manifest_size), |
|
|
float(indexed_at), |
|
|
row_json, |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
for d in devs: |
|
|
if not isinstance(d, dict): |
|
|
continue |
|
|
did = str(d.get("device_id") or "") |
|
|
if not did: |
|
|
continue |
|
|
|
|
|
wsum = d.get("waveform_depth_stream_summary") |
|
|
idx_path = None |
|
|
frames_count = None |
|
|
gaps = None |
|
|
if isinstance(d.get("waveform_depth_stream"), dict): |
|
|
idx_path = d["waveform_depth_stream"].get("index_path") |
|
|
if isinstance(wsum, dict) and bool(wsum.get("ok")): |
|
|
fr = wsum.get("frames", {}) if isinstance(wsum.get("frames"), dict) else {} |
|
|
frames_count = fr.get("count") |
|
|
gaps = fr.get("gaps") |
|
|
|
|
|
conn.execute( |
|
|
""" |
|
|
INSERT INTO devices( |
|
|
bundle_dir, device_id, video_path, video_size_bytes, |
|
|
intrinsics_path, timestamps_path, |
|
|
packed_depth_index_path, packed_depth_frames_count, packed_depth_gaps |
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
ON CONFLICT(bundle_dir, device_id) DO UPDATE SET |
|
|
video_path=excluded.video_path, |
|
|
video_size_bytes=excluded.video_size_bytes, |
|
|
intrinsics_path=excluded.intrinsics_path, |
|
|
timestamps_path=excluded.timestamps_path, |
|
|
packed_depth_index_path=excluded.packed_depth_index_path, |
|
|
packed_depth_frames_count=excluded.packed_depth_frames_count, |
|
|
packed_depth_gaps=excluded.packed_depth_gaps; |
|
|
""", |
|
|
( |
|
|
bdir, |
|
|
did, |
|
|
d.get("video_path"), |
|
|
d.get("video_path_size_bytes"), |
|
|
d.get("intrinsics_path"), |
|
|
d.get("timestamps_path"), |
|
|
str(idx_path) if idx_path else None, |
|
|
int(frames_count) if frames_count is not None else None, |
|
|
int(gaps) if gaps is not None else None, |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
def build_curation_index_sqlite( |
|
|
*, |
|
|
captures_root: Path, |
|
|
db_path: Path, |
|
|
config: Optional[SQLiteIndexConfig] = None, |
|
|
) -> Dict[str, Any]: |
|
|
cfg = config or SQLiteIndexConfig() |
|
|
captures_root = Path(captures_root) |
|
|
db_path = Path(db_path) |
|
|
t0 = _now_s() |
|
|
|
|
|
conn = _connect(db_path) |
|
|
try: |
|
|
_ensure_schema(conn) |
|
|
|
|
|
existing = _existing_manifest_stats(conn) if bool(cfg.incremental) else {} |
|
|
|
|
|
bundles = _discover_bundles(captures_root, mode=str(cfg.discover)) |
|
|
to_index = [] |
|
|
skipped = 0 |
|
|
for b in bundles: |
|
|
mp = Path(b) / "manifest.json" |
|
|
try: |
|
|
st = mp.stat() |
|
|
mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) |
|
|
size = int(st.st_size) |
|
|
except Exception: |
|
|
mtime_ns = 0 |
|
|
size = 0 |
|
|
|
|
|
td_count, td_mtime_ns = _teacher_depth_stats(Path(b)) |
|
|
prev = existing.get(str(b)) |
|
|
if ( |
|
|
prev |
|
|
and int(prev.get("manifest_mtime_ns", -1)) == mtime_ns |
|
|
and int(prev.get("manifest_size", -2)) == size |
|
|
and int(prev.get("teacher_depth_count", -3)) == int(td_count) |
|
|
and int(prev.get("teacher_depth_mtime_ns", -4)) == int(td_mtime_ns) |
|
|
): |
|
|
skipped += 1 |
|
|
continue |
|
|
to_index.append((Path(b), mtime_ns, size, int(td_count), int(td_mtime_ns))) |
|
|
|
|
|
|
|
|
curation_cfg = CurationIndexConfig( |
|
|
workers=int(cfg.workers), |
|
|
include_depth_stream_summary=bool(cfg.include_depth_stream_summary), |
|
|
discover=str(cfg.discover), |
|
|
) |
|
|
|
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
ok = 0 |
|
|
fail = 0 |
|
|
conn.execute("BEGIN;") |
|
|
with ThreadPoolExecutor(max_workers=max(1, int(cfg.workers))) as ex: |
|
|
futs = { |
|
|
ex.submit( |
|
|
_bundle_row, |
|
|
bdir, |
|
|
include_depth_stream_summary=bool(curation_cfg.include_depth_stream_summary), |
|
|
): (bdir, mtime_ns, size, td_count, td_mtime_ns) |
|
|
for (bdir, mtime_ns, size, td_count, td_mtime_ns) in to_index |
|
|
} |
|
|
n_written = 0 |
|
|
for f in as_completed(futs): |
|
|
bdir, mtime_ns, size, td_count, td_mtime_ns = futs[f] |
|
|
row = f.result() |
|
|
try: |
|
|
_upsert_row( |
|
|
conn, |
|
|
row=row, |
|
|
manifest_mtime_ns=mtime_ns, |
|
|
manifest_size=size, |
|
|
teacher_depth_count=int(td_count), |
|
|
teacher_depth_mtime_ns=int(td_mtime_ns), |
|
|
) |
|
|
if bool(row.get("ok")) and not row.get("error"): |
|
|
ok += 1 |
|
|
else: |
|
|
fail += 1 |
|
|
except Exception: |
|
|
fail += 1 |
|
|
n_written += 1 |
|
|
|
|
|
if n_written % 200 == 0: |
|
|
conn.execute("COMMIT;") |
|
|
conn.execute("BEGIN;") |
|
|
conn.execute("COMMIT;") |
|
|
|
|
|
return { |
|
|
"captures_root": str(captures_root), |
|
|
"db_path": str(db_path), |
|
|
"num_bundles_found": int(len(bundles)), |
|
|
"num_indexed": int(len(to_index)), |
|
|
"num_skipped": int(skipped), |
|
|
"ok": int(ok), |
|
|
"failed": int(fail), |
|
|
"elapsed_s": float(_now_s() - t0), |
|
|
} |
|
|
finally: |
|
|
try: |
|
|
conn.close() |
|
|
except Exception: |
|
|
pass |
|
|
|