3d_model / ylff /services /curation /sqlite_query.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Query helpers for the SQLite curation index.
Design goals:
- fast filtering for dataset selection
- safe parameterized SQL (no arbitrary SQL execution by default)
- easy export to paths / jsonl
"""
from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence
@dataclass(frozen=True)
class QueryFilters:
# bundle-level filters
source_format: Optional[str] = None
has_packed_depth: Optional[bool] = None
scene_type: Optional[str] = None
operating_regime: Optional[str] = None
min_devices: Optional[int] = None
# device-level packed depth filters (requires join)
packed_depth_min_frames: Optional[int] = None
packed_depth_max_gaps: Optional[int] = None
def _connect_ro(db_path: Path) -> sqlite3.Connection:
# URI mode to open read-only even if file is shared
p = Path(db_path).resolve()
conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def query_bundle_dirs(
*,
db_path: Path,
filters: Optional[QueryFilters] = None,
limit: Optional[int] = None,
order_by: str = "bundle_dir",
) -> List[str]:
"""
Return a list of bundle_dir strings matching filters.
"""
f = filters or QueryFilters()
params: List[Any] = []
where: List[str] = []
join_devices = False
if f.source_format is not None:
where.append("b.source_format = ?")
params.append(str(f.source_format))
if f.has_packed_depth is not None:
where.append("b.has_packed_depth = ?")
params.append(1 if bool(f.has_packed_depth) else 0)
if f.scene_type is not None:
where.append("b.scene_type = ?")
params.append(str(f.scene_type))
if f.operating_regime is not None:
where.append("b.operating_regime = ?")
params.append(str(f.operating_regime))
if f.min_devices is not None:
where.append("b.num_devices >= ?")
params.append(int(f.min_devices))
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
join_devices = True
# Only consider devices that actually have packed depth index summarized.
where.append("d.packed_depth_index_path IS NOT NULL")
if f.packed_depth_min_frames is not None:
where.append("d.packed_depth_frames_count >= ?")
params.append(int(f.packed_depth_min_frames))
if f.packed_depth_max_gaps is not None:
where.append("d.packed_depth_gaps <= ?")
params.append(int(f.packed_depth_max_gaps))
wsql = ""
if where:
wsql = "WHERE " + " AND ".join(where)
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
order_by = "bundle_dir"
sql = (
"SELECT DISTINCT b.bundle_dir AS bundle_dir "
"FROM bundles b "
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
+ wsql
+ f" ORDER BY b.{order_by} ASC"
)
if limit is not None and int(limit) > 0:
sql += " LIMIT ?"
params.append(int(limit))
conn = _connect_ro(Path(db_path))
try:
rows = conn.execute(sql, params).fetchall()
return [str(r["bundle_dir"]) for r in rows]
finally:
conn.close()
def iter_bundle_shard_seeds(
*,
db_path: Path,
filters: Optional[QueryFilters] = None,
limit: Optional[int] = None,
order_by: str = "bundle_dir",
) -> Iterator[Dict[str, Any]]:
"""
Stream bundle "seed" rows needed for sample-index generation.
Returns dicts with:
- bundle_dir
- num_devices
- default_device_id
- teacher_depth_count
"""
f = filters or QueryFilters()
params: List[Any] = []
where: List[str] = []
join_devices = False
if f.source_format is not None:
where.append("b.source_format = ?")
params.append(str(f.source_format))
if f.has_packed_depth is not None:
where.append("b.has_packed_depth = ?")
params.append(1 if bool(f.has_packed_depth) else 0)
if f.scene_type is not None:
where.append("b.scene_type = ?")
params.append(str(f.scene_type))
if f.operating_regime is not None:
where.append("b.operating_regime = ?")
params.append(str(f.operating_regime))
if f.min_devices is not None:
where.append("b.num_devices >= ?")
params.append(int(f.min_devices))
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
join_devices = True
where.append("d.packed_depth_index_path IS NOT NULL")
if f.packed_depth_min_frames is not None:
where.append("d.packed_depth_frames_count >= ?")
params.append(int(f.packed_depth_min_frames))
if f.packed_depth_max_gaps is not None:
where.append("d.packed_depth_gaps <= ?")
params.append(int(f.packed_depth_max_gaps))
wsql = ""
if where:
wsql = "WHERE " + " AND ".join(where)
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
order_by = "bundle_dir"
sql = (
"SELECT DISTINCT "
"b.bundle_dir AS bundle_dir, "
"COALESCE(b.num_devices, 0) AS num_devices, "
"b.default_device_id AS default_device_id, "
"COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count "
"FROM bundles b "
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
+ wsql
+ f" ORDER BY b.{order_by} ASC"
)
if limit is not None and int(limit) > 0:
sql += " LIMIT ?"
params.append(int(limit))
conn = _connect_ro(Path(db_path))
try:
cur = conn.execute(sql, params)
for r in cur:
yield {
"bundle_dir": str(r["bundle_dir"]),
"num_devices": int(r["num_devices"] or 0),
"default_device_id": r["default_device_id"],
"teacher_depth_count": int(r["teacher_depth_count"] or 0),
}
finally:
conn.close()
def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None:
p = Path(output_path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else ""))
def export_rows_jsonl(
*,
db_path: Path,
bundle_dirs: Sequence[str],
output_path: Path,
) -> None:
"""
Export stored row_json for specific bundle dirs as JSONL.
"""
p = Path(output_path)
p.parent.mkdir(parents=True, exist_ok=True)
# Chunk to keep SQL param counts reasonable
chunk = 500
conn = _connect_ro(Path(db_path))
try:
with p.open("w") as f:
for i in range(0, len(bundle_dirs), chunk):
sub = [str(x) for x in bundle_dirs[i : i + chunk]]
if not sub:
continue
qmarks = ",".join(["?"] * len(sub))
sql = (
"SELECT row_json FROM bundles "
f"WHERE bundle_dir IN ({qmarks}) "
"ORDER BY bundle_dir ASC"
)
rows = conn.execute(sql, sub).fetchall()
for r in rows:
try:
# row_json is already JSON; normalize to single-line JSONL
obj = json.loads(r["row_json"])
f.write(json.dumps(obj, default=str) + "\n")
except Exception:
# fall back to raw blob
f.write(str(r["row_json"]) + "\n")
finally:
conn.close()