3d_model / ylff /services /curation /sqlite_index.py
Azan
Clean deployment build (Squashed)
7a87926
"""
SQLite-backed curation index.
Why SQLite:
- fast ad-hoc queries over large local datasets
- incremental updates (skip unchanged bundles)
- no extra dependencies
"""
from __future__ import annotations
import json
import os
import sqlite3
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Literal, Optional
from .indexer import CurationIndexConfig, _bundle_row, _discover_bundles
@dataclass(frozen=True)
class SQLiteIndexConfig:
workers: int = 8
incremental: bool = True
include_depth_stream_summary: bool = True
discover: Literal["children", "recursive"] = "children"
def _now_s() -> float:
return float(time.time())
def _connect(db_path: Path) -> sqlite3.Connection:
p = Path(db_path)
p.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(p))
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA temp_store=MEMORY;")
conn.execute("PRAGMA foreign_keys=ON;")
return conn
def _ensure_column(conn: sqlite3.Connection, *, table: str, column: str, decl: str) -> None:
"""
Best-effort migration helper: add a column if it doesn't exist.
"""
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table});").fetchall()]
if str(column) not in cols:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl};")
def _ensure_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS bundles (
bundle_dir TEXT PRIMARY KEY,
capture_id TEXT,
created_at TEXT,
scene_type TEXT,
operating_regime TEXT,
source_format TEXT,
has_packed_depth INTEGER,
num_devices INTEGER,
default_device_id TEXT,
teacher_depth_count INTEGER,
teacher_depth_mtime_ns INTEGER,
manifest_mtime_ns INTEGER,
manifest_size INTEGER,
indexed_at_unix_s REAL,
row_json TEXT
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS devices (
bundle_dir TEXT NOT NULL,
device_id TEXT NOT NULL,
video_path TEXT,
video_size_bytes INTEGER,
intrinsics_path TEXT,
timestamps_path TEXT,
packed_depth_index_path TEXT,
packed_depth_frames_count INTEGER,
packed_depth_gaps INTEGER,
PRIMARY KEY (bundle_dir, device_id),
FOREIGN KEY (bundle_dir) REFERENCES bundles(bundle_dir) ON DELETE CASCADE
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_scene ON bundles(scene_type);")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bundles_packed_depth ON bundles(has_packed_depth);"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_bundles_source ON bundles(source_format);")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bundles_teacher_depth ON bundles(teacher_depth_count);"
)
# Migrations for older DBs that predate these columns.
_ensure_column(conn, table="bundles", column="default_device_id", decl="TEXT")
_ensure_column(conn, table="bundles", column="teacher_depth_count", decl="INTEGER")
_ensure_column(conn, table="bundles", column="teacher_depth_mtime_ns", decl="INTEGER")
def _teacher_depth_stats(bundle_dir: Path) -> tuple[int, int]:
"""
Return (teacher_depth_count, teacher_depth_mtime_ns).
"""
depth_dir = Path(bundle_dir) / "teacher_outputs" / "depth"
if not depth_dir.exists() or not depth_dir.is_dir():
return 0, 0
try:
st = depth_dir.stat()
mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9)))
except Exception:
mtime_ns = 0
try:
n = 0
for ent in os.scandir(depth_dir):
if not ent.is_file():
continue
name = ent.name
if name.startswith("frame_") and name.endswith(".npy"):
n += 1
return int(n), int(mtime_ns)
except Exception:
return 0, int(mtime_ns)
def _existing_manifest_stats(conn: sqlite3.Connection) -> Dict[str, Dict[str, int]]:
cur = conn.execute(
"SELECT bundle_dir, manifest_mtime_ns, manifest_size, "
"COALESCE(teacher_depth_count,0), COALESCE(teacher_depth_mtime_ns,0) "
"FROM bundles;"
)
out: Dict[str, Dict[str, int]] = {}
for bdir, mtime_ns, size, td_count, td_mtime_ns in cur.fetchall():
if bdir:
out[str(bdir)] = {
"manifest_mtime_ns": int(mtime_ns or 0),
"manifest_size": int(size or 0),
"teacher_depth_count": int(td_count or 0),
"teacher_depth_mtime_ns": int(td_mtime_ns or 0),
}
return out
def _upsert_row(
conn: sqlite3.Connection,
*,
row: Dict[str, Any],
manifest_mtime_ns: int,
manifest_size: int,
teacher_depth_count: int,
teacher_depth_mtime_ns: int,
) -> None:
bdir = str(row.get("bundle_dir", ""))
capture_id = row.get("capture_id")
created_at = row.get("created_at")
scene_type = row.get("scene_type")
operating_regime = row.get("operating_regime")
ingest = row.get("ingest") if isinstance(row.get("ingest"), dict) else {}
source_format = ingest.get("source_format") if isinstance(ingest, dict) else None
summary = row.get("summary") if isinstance(row.get("summary"), dict) else {}
has_packed_depth = 1 if bool(summary.get("has_packed_depth")) else 0
num_devices = int(summary.get("num_devices") or 0)
row_json = json.dumps(row, default=str)
indexed_at = float(row.get("indexed_at_unix_s") or _now_s())
devs = row.get("devices") if isinstance(row.get("devices"), list) else []
default_device_id = None
try:
if devs and isinstance(devs[0], dict):
default_device_id = str(devs[0].get("device_id") or "") or None
except Exception:
default_device_id = None
conn.execute(
"""
INSERT INTO bundles(
bundle_dir, capture_id, created_at, scene_type, operating_regime,
source_format, has_packed_depth, num_devices, default_device_id,
teacher_depth_count, teacher_depth_mtime_ns,
manifest_mtime_ns, manifest_size, indexed_at_unix_s, row_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bundle_dir) DO UPDATE SET
capture_id=excluded.capture_id,
created_at=excluded.created_at,
scene_type=excluded.scene_type,
operating_regime=excluded.operating_regime,
source_format=excluded.source_format,
has_packed_depth=excluded.has_packed_depth,
num_devices=excluded.num_devices,
default_device_id=excluded.default_device_id,
teacher_depth_count=excluded.teacher_depth_count,
teacher_depth_mtime_ns=excluded.teacher_depth_mtime_ns,
manifest_mtime_ns=excluded.manifest_mtime_ns,
manifest_size=excluded.manifest_size,
indexed_at_unix_s=excluded.indexed_at_unix_s,
row_json=excluded.row_json;
""",
(
bdir,
str(capture_id) if capture_id is not None else None,
str(created_at) if created_at is not None else None,
str(scene_type) if scene_type is not None else None,
str(operating_regime) if operating_regime is not None else None,
str(source_format) if source_format is not None else None,
int(has_packed_depth),
int(num_devices),
str(default_device_id) if default_device_id is not None else None,
int(teacher_depth_count),
int(teacher_depth_mtime_ns),
int(manifest_mtime_ns),
int(manifest_size),
float(indexed_at),
row_json,
),
)
# Devices table: simple upsert per device
for d in devs:
if not isinstance(d, dict):
continue
did = str(d.get("device_id") or "")
if not did:
continue
wsum = d.get("waveform_depth_stream_summary")
idx_path = None
frames_count = None
gaps = None
if isinstance(d.get("waveform_depth_stream"), dict):
idx_path = d["waveform_depth_stream"].get("index_path")
if isinstance(wsum, dict) and bool(wsum.get("ok")):
fr = wsum.get("frames", {}) if isinstance(wsum.get("frames"), dict) else {}
frames_count = fr.get("count")
gaps = fr.get("gaps")
conn.execute(
"""
INSERT INTO devices(
bundle_dir, device_id, video_path, video_size_bytes,
intrinsics_path, timestamps_path,
packed_depth_index_path, packed_depth_frames_count, packed_depth_gaps
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bundle_dir, device_id) DO UPDATE SET
video_path=excluded.video_path,
video_size_bytes=excluded.video_size_bytes,
intrinsics_path=excluded.intrinsics_path,
timestamps_path=excluded.timestamps_path,
packed_depth_index_path=excluded.packed_depth_index_path,
packed_depth_frames_count=excluded.packed_depth_frames_count,
packed_depth_gaps=excluded.packed_depth_gaps;
""",
(
bdir,
did,
d.get("video_path"),
d.get("video_path_size_bytes"),
d.get("intrinsics_path"),
d.get("timestamps_path"),
str(idx_path) if idx_path else None,
int(frames_count) if frames_count is not None else None,
int(gaps) if gaps is not None else None,
),
)
def build_curation_index_sqlite(
*,
captures_root: Path,
db_path: Path,
config: Optional[SQLiteIndexConfig] = None,
) -> Dict[str, Any]:
cfg = config or SQLiteIndexConfig()
captures_root = Path(captures_root)
db_path = Path(db_path)
t0 = _now_s()
conn = _connect(db_path)
try:
_ensure_schema(conn)
existing = _existing_manifest_stats(conn) if bool(cfg.incremental) else {}
bundles = _discover_bundles(captures_root, mode=str(cfg.discover))
to_index = []
skipped = 0
for b in bundles:
mp = Path(b) / "manifest.json"
try:
st = mp.stat()
mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9)))
size = int(st.st_size)
except Exception:
mtime_ns = 0
size = 0
td_count, td_mtime_ns = _teacher_depth_stats(Path(b))
prev = existing.get(str(b))
if (
prev
and int(prev.get("manifest_mtime_ns", -1)) == mtime_ns
and int(prev.get("manifest_size", -2)) == size
and int(prev.get("teacher_depth_count", -3)) == int(td_count)
and int(prev.get("teacher_depth_mtime_ns", -4)) == int(td_mtime_ns)
):
skipped += 1
continue
to_index.append((Path(b), mtime_ns, size, int(td_count), int(td_mtime_ns)))
# Parallel row computation; single-thread DB writes
curation_cfg = CurationIndexConfig(
workers=int(cfg.workers),
include_depth_stream_summary=bool(cfg.include_depth_stream_summary),
discover=str(cfg.discover),
)
# Use the same worker pattern as JSONL indexer via its internal row builder.
from concurrent.futures import ThreadPoolExecutor, as_completed
ok = 0
fail = 0
conn.execute("BEGIN;")
with ThreadPoolExecutor(max_workers=max(1, int(cfg.workers))) as ex:
futs = {
ex.submit(
_bundle_row,
bdir,
include_depth_stream_summary=bool(curation_cfg.include_depth_stream_summary),
): (bdir, mtime_ns, size, td_count, td_mtime_ns)
for (bdir, mtime_ns, size, td_count, td_mtime_ns) in to_index
}
n_written = 0
for f in as_completed(futs):
bdir, mtime_ns, size, td_count, td_mtime_ns = futs[f]
row = f.result()
try:
_upsert_row(
conn,
row=row,
manifest_mtime_ns=mtime_ns,
manifest_size=size,
teacher_depth_count=int(td_count),
teacher_depth_mtime_ns=int(td_mtime_ns),
)
if bool(row.get("ok")) and not row.get("error"):
ok += 1
else:
fail += 1
except Exception:
fail += 1
n_written += 1
# Periodic commit to keep WAL bounded
if n_written % 200 == 0:
conn.execute("COMMIT;")
conn.execute("BEGIN;")
conn.execute("COMMIT;")
return {
"captures_root": str(captures_root),
"db_path": str(db_path),
"num_bundles_found": int(len(bundles)),
"num_indexed": int(len(to_index)),
"num_skipped": int(skipped),
"ok": int(ok),
"failed": int(fail),
"elapsed_s": float(_now_s() - t0),
}
finally:
try:
conn.close()
except Exception:
pass