|
|
""" |
|
|
Query helpers for the SQLite curation index. |
|
|
|
|
|
Design goals: |
|
|
- fast filtering for dataset selection |
|
|
- safe parameterized SQL (no arbitrary SQL execution by default) |
|
|
- easy export to paths / jsonl |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import sqlite3 |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Iterator, List, Optional, Sequence |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class QueryFilters: |
|
|
|
|
|
source_format: Optional[str] = None |
|
|
has_packed_depth: Optional[bool] = None |
|
|
scene_type: Optional[str] = None |
|
|
operating_regime: Optional[str] = None |
|
|
min_devices: Optional[int] = None |
|
|
|
|
|
packed_depth_min_frames: Optional[int] = None |
|
|
packed_depth_max_gaps: Optional[int] = None |
|
|
|
|
|
|
|
|
def _connect_ro(db_path: Path) -> sqlite3.Connection: |
|
|
|
|
|
p = Path(db_path).resolve() |
|
|
conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True) |
|
|
conn.row_factory = sqlite3.Row |
|
|
return conn |
|
|
|
|
|
|
|
|
def query_bundle_dirs( |
|
|
*, |
|
|
db_path: Path, |
|
|
filters: Optional[QueryFilters] = None, |
|
|
limit: Optional[int] = None, |
|
|
order_by: str = "bundle_dir", |
|
|
) -> List[str]: |
|
|
""" |
|
|
Return a list of bundle_dir strings matching filters. |
|
|
""" |
|
|
f = filters or QueryFilters() |
|
|
params: List[Any] = [] |
|
|
|
|
|
where: List[str] = [] |
|
|
join_devices = False |
|
|
|
|
|
if f.source_format is not None: |
|
|
where.append("b.source_format = ?") |
|
|
params.append(str(f.source_format)) |
|
|
if f.has_packed_depth is not None: |
|
|
where.append("b.has_packed_depth = ?") |
|
|
params.append(1 if bool(f.has_packed_depth) else 0) |
|
|
if f.scene_type is not None: |
|
|
where.append("b.scene_type = ?") |
|
|
params.append(str(f.scene_type)) |
|
|
if f.operating_regime is not None: |
|
|
where.append("b.operating_regime = ?") |
|
|
params.append(str(f.operating_regime)) |
|
|
if f.min_devices is not None: |
|
|
where.append("b.num_devices >= ?") |
|
|
params.append(int(f.min_devices)) |
|
|
|
|
|
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: |
|
|
join_devices = True |
|
|
|
|
|
where.append("d.packed_depth_index_path IS NOT NULL") |
|
|
if f.packed_depth_min_frames is not None: |
|
|
where.append("d.packed_depth_frames_count >= ?") |
|
|
params.append(int(f.packed_depth_min_frames)) |
|
|
if f.packed_depth_max_gaps is not None: |
|
|
where.append("d.packed_depth_gaps <= ?") |
|
|
params.append(int(f.packed_depth_max_gaps)) |
|
|
|
|
|
wsql = "" |
|
|
if where: |
|
|
wsql = "WHERE " + " AND ".join(where) |
|
|
|
|
|
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: |
|
|
order_by = "bundle_dir" |
|
|
|
|
|
sql = ( |
|
|
"SELECT DISTINCT b.bundle_dir AS bundle_dir " |
|
|
"FROM bundles b " |
|
|
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") |
|
|
+ wsql |
|
|
+ f" ORDER BY b.{order_by} ASC" |
|
|
) |
|
|
if limit is not None and int(limit) > 0: |
|
|
sql += " LIMIT ?" |
|
|
params.append(int(limit)) |
|
|
|
|
|
conn = _connect_ro(Path(db_path)) |
|
|
try: |
|
|
rows = conn.execute(sql, params).fetchall() |
|
|
return [str(r["bundle_dir"]) for r in rows] |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def iter_bundle_shard_seeds( |
|
|
*, |
|
|
db_path: Path, |
|
|
filters: Optional[QueryFilters] = None, |
|
|
limit: Optional[int] = None, |
|
|
order_by: str = "bundle_dir", |
|
|
) -> Iterator[Dict[str, Any]]: |
|
|
""" |
|
|
Stream bundle "seed" rows needed for sample-index generation. |
|
|
|
|
|
Returns dicts with: |
|
|
- bundle_dir |
|
|
- num_devices |
|
|
- default_device_id |
|
|
- teacher_depth_count |
|
|
""" |
|
|
f = filters or QueryFilters() |
|
|
params: List[Any] = [] |
|
|
|
|
|
where: List[str] = [] |
|
|
join_devices = False |
|
|
|
|
|
if f.source_format is not None: |
|
|
where.append("b.source_format = ?") |
|
|
params.append(str(f.source_format)) |
|
|
if f.has_packed_depth is not None: |
|
|
where.append("b.has_packed_depth = ?") |
|
|
params.append(1 if bool(f.has_packed_depth) else 0) |
|
|
if f.scene_type is not None: |
|
|
where.append("b.scene_type = ?") |
|
|
params.append(str(f.scene_type)) |
|
|
if f.operating_regime is not None: |
|
|
where.append("b.operating_regime = ?") |
|
|
params.append(str(f.operating_regime)) |
|
|
if f.min_devices is not None: |
|
|
where.append("b.num_devices >= ?") |
|
|
params.append(int(f.min_devices)) |
|
|
|
|
|
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: |
|
|
join_devices = True |
|
|
where.append("d.packed_depth_index_path IS NOT NULL") |
|
|
if f.packed_depth_min_frames is not None: |
|
|
where.append("d.packed_depth_frames_count >= ?") |
|
|
params.append(int(f.packed_depth_min_frames)) |
|
|
if f.packed_depth_max_gaps is not None: |
|
|
where.append("d.packed_depth_gaps <= ?") |
|
|
params.append(int(f.packed_depth_max_gaps)) |
|
|
|
|
|
wsql = "" |
|
|
if where: |
|
|
wsql = "WHERE " + " AND ".join(where) |
|
|
|
|
|
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: |
|
|
order_by = "bundle_dir" |
|
|
|
|
|
sql = ( |
|
|
"SELECT DISTINCT " |
|
|
"b.bundle_dir AS bundle_dir, " |
|
|
"COALESCE(b.num_devices, 0) AS num_devices, " |
|
|
"b.default_device_id AS default_device_id, " |
|
|
"COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count " |
|
|
"FROM bundles b " |
|
|
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") |
|
|
+ wsql |
|
|
+ f" ORDER BY b.{order_by} ASC" |
|
|
) |
|
|
if limit is not None and int(limit) > 0: |
|
|
sql += " LIMIT ?" |
|
|
params.append(int(limit)) |
|
|
|
|
|
conn = _connect_ro(Path(db_path)) |
|
|
try: |
|
|
cur = conn.execute(sql, params) |
|
|
for r in cur: |
|
|
yield { |
|
|
"bundle_dir": str(r["bundle_dir"]), |
|
|
"num_devices": int(r["num_devices"] or 0), |
|
|
"default_device_id": r["default_device_id"], |
|
|
"teacher_depth_count": int(r["teacher_depth_count"] or 0), |
|
|
} |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None: |
|
|
p = Path(output_path) |
|
|
p.parent.mkdir(parents=True, exist_ok=True) |
|
|
p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else "")) |
|
|
|
|
|
|
|
|
def export_rows_jsonl( |
|
|
*, |
|
|
db_path: Path, |
|
|
bundle_dirs: Sequence[str], |
|
|
output_path: Path, |
|
|
) -> None: |
|
|
""" |
|
|
Export stored row_json for specific bundle dirs as JSONL. |
|
|
""" |
|
|
p = Path(output_path) |
|
|
p.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
chunk = 500 |
|
|
conn = _connect_ro(Path(db_path)) |
|
|
try: |
|
|
with p.open("w") as f: |
|
|
for i in range(0, len(bundle_dirs), chunk): |
|
|
sub = [str(x) for x in bundle_dirs[i : i + chunk]] |
|
|
if not sub: |
|
|
continue |
|
|
qmarks = ",".join(["?"] * len(sub)) |
|
|
sql = ( |
|
|
"SELECT row_json FROM bundles " |
|
|
f"WHERE bundle_dir IN ({qmarks}) " |
|
|
"ORDER BY bundle_dir ASC" |
|
|
) |
|
|
rows = conn.execute(sql, sub).fetchall() |
|
|
for r in rows: |
|
|
try: |
|
|
|
|
|
obj = json.loads(r["row_json"]) |
|
|
f.write(json.dumps(obj, default=str) + "\n") |
|
|
except Exception: |
|
|
|
|
|
f.write(str(r["row_json"]) + "\n") |
|
|
finally: |
|
|
conn.close() |
|
|
|