File size: 7,775 Bytes
7a87926 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
"""
Query helpers for the SQLite curation index.
Design goals:
- fast filtering for dataset selection
- safe parameterized SQL (no arbitrary SQL execution by default)
- easy export to paths / jsonl
"""
from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence
@dataclass(frozen=True)
class QueryFilters:
# bundle-level filters
source_format: Optional[str] = None
has_packed_depth: Optional[bool] = None
scene_type: Optional[str] = None
operating_regime: Optional[str] = None
min_devices: Optional[int] = None
# device-level packed depth filters (requires join)
packed_depth_min_frames: Optional[int] = None
packed_depth_max_gaps: Optional[int] = None
def _connect_ro(db_path: Path) -> sqlite3.Connection:
# URI mode to open read-only even if file is shared
p = Path(db_path).resolve()
conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def query_bundle_dirs(
*,
db_path: Path,
filters: Optional[QueryFilters] = None,
limit: Optional[int] = None,
order_by: str = "bundle_dir",
) -> List[str]:
"""
Return a list of bundle_dir strings matching filters.
"""
f = filters or QueryFilters()
params: List[Any] = []
where: List[str] = []
join_devices = False
if f.source_format is not None:
where.append("b.source_format = ?")
params.append(str(f.source_format))
if f.has_packed_depth is not None:
where.append("b.has_packed_depth = ?")
params.append(1 if bool(f.has_packed_depth) else 0)
if f.scene_type is not None:
where.append("b.scene_type = ?")
params.append(str(f.scene_type))
if f.operating_regime is not None:
where.append("b.operating_regime = ?")
params.append(str(f.operating_regime))
if f.min_devices is not None:
where.append("b.num_devices >= ?")
params.append(int(f.min_devices))
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
join_devices = True
# Only consider devices that actually have packed depth index summarized.
where.append("d.packed_depth_index_path IS NOT NULL")
if f.packed_depth_min_frames is not None:
where.append("d.packed_depth_frames_count >= ?")
params.append(int(f.packed_depth_min_frames))
if f.packed_depth_max_gaps is not None:
where.append("d.packed_depth_gaps <= ?")
params.append(int(f.packed_depth_max_gaps))
wsql = ""
if where:
wsql = "WHERE " + " AND ".join(where)
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
order_by = "bundle_dir"
sql = (
"SELECT DISTINCT b.bundle_dir AS bundle_dir "
"FROM bundles b "
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
+ wsql
+ f" ORDER BY b.{order_by} ASC"
)
if limit is not None and int(limit) > 0:
sql += " LIMIT ?"
params.append(int(limit))
conn = _connect_ro(Path(db_path))
try:
rows = conn.execute(sql, params).fetchall()
return [str(r["bundle_dir"]) for r in rows]
finally:
conn.close()
def iter_bundle_shard_seeds(
*,
db_path: Path,
filters: Optional[QueryFilters] = None,
limit: Optional[int] = None,
order_by: str = "bundle_dir",
) -> Iterator[Dict[str, Any]]:
"""
Stream bundle "seed" rows needed for sample-index generation.
Returns dicts with:
- bundle_dir
- num_devices
- default_device_id
- teacher_depth_count
"""
f = filters or QueryFilters()
params: List[Any] = []
where: List[str] = []
join_devices = False
if f.source_format is not None:
where.append("b.source_format = ?")
params.append(str(f.source_format))
if f.has_packed_depth is not None:
where.append("b.has_packed_depth = ?")
params.append(1 if bool(f.has_packed_depth) else 0)
if f.scene_type is not None:
where.append("b.scene_type = ?")
params.append(str(f.scene_type))
if f.operating_regime is not None:
where.append("b.operating_regime = ?")
params.append(str(f.operating_regime))
if f.min_devices is not None:
where.append("b.num_devices >= ?")
params.append(int(f.min_devices))
if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
join_devices = True
where.append("d.packed_depth_index_path IS NOT NULL")
if f.packed_depth_min_frames is not None:
where.append("d.packed_depth_frames_count >= ?")
params.append(int(f.packed_depth_min_frames))
if f.packed_depth_max_gaps is not None:
where.append("d.packed_depth_gaps <= ?")
params.append(int(f.packed_depth_max_gaps))
wsql = ""
if where:
wsql = "WHERE " + " AND ".join(where)
if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
order_by = "bundle_dir"
sql = (
"SELECT DISTINCT "
"b.bundle_dir AS bundle_dir, "
"COALESCE(b.num_devices, 0) AS num_devices, "
"b.default_device_id AS default_device_id, "
"COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count "
"FROM bundles b "
+ ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
+ wsql
+ f" ORDER BY b.{order_by} ASC"
)
if limit is not None and int(limit) > 0:
sql += " LIMIT ?"
params.append(int(limit))
conn = _connect_ro(Path(db_path))
try:
cur = conn.execute(sql, params)
for r in cur:
yield {
"bundle_dir": str(r["bundle_dir"]),
"num_devices": int(r["num_devices"] or 0),
"default_device_id": r["default_device_id"],
"teacher_depth_count": int(r["teacher_depth_count"] or 0),
}
finally:
conn.close()
def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None:
p = Path(output_path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else ""))
def export_rows_jsonl(
*,
db_path: Path,
bundle_dirs: Sequence[str],
output_path: Path,
) -> None:
"""
Export stored row_json for specific bundle dirs as JSONL.
"""
p = Path(output_path)
p.parent.mkdir(parents=True, exist_ok=True)
# Chunk to keep SQL param counts reasonable
chunk = 500
conn = _connect_ro(Path(db_path))
try:
with p.open("w") as f:
for i in range(0, len(bundle_dirs), chunk):
sub = [str(x) for x in bundle_dirs[i : i + chunk]]
if not sub:
continue
qmarks = ",".join(["?"] * len(sub))
sql = (
"SELECT row_json FROM bundles "
f"WHERE bundle_dir IN ({qmarks}) "
"ORDER BY bundle_dir ASC"
)
rows = conn.execute(sql, sub).fetchall()
for r in rows:
try:
# row_json is already JSON; normalize to single-line JSONL
obj = json.loads(r["row_json"])
f.write(json.dumps(obj, default=str) + "\n")
except Exception:
# fall back to raw blob
f.write(str(r["row_json"]) + "\n")
finally:
conn.close()
|