File size: 7,775 Bytes
7a87926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
Query helpers for the SQLite curation index.

Design goals:
- fast filtering for dataset selection
- safe parameterized SQL (no arbitrary SQL execution by default)
- easy export to paths / jsonl
"""

from __future__ import annotations

import json
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence


@dataclass(frozen=True)
class QueryFilters:
    # bundle-level filters
    source_format: Optional[str] = None
    has_packed_depth: Optional[bool] = None
    scene_type: Optional[str] = None
    operating_regime: Optional[str] = None
    min_devices: Optional[int] = None
    # device-level packed depth filters (requires join)
    packed_depth_min_frames: Optional[int] = None
    packed_depth_max_gaps: Optional[int] = None


def _connect_ro(db_path: Path) -> sqlite3.Connection:
    # URI mode to open read-only even if file is shared
    p = Path(db_path).resolve()
    conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True)
    conn.row_factory = sqlite3.Row
    return conn


def query_bundle_dirs(
    *,
    db_path: Path,
    filters: Optional[QueryFilters] = None,
    limit: Optional[int] = None,
    order_by: str = "bundle_dir",
) -> List[str]:
    """
    Return a list of bundle_dir strings matching filters.
    """
    f = filters or QueryFilters()
    params: List[Any] = []

    where: List[str] = []
    join_devices = False

    if f.source_format is not None:
        where.append("b.source_format = ?")
        params.append(str(f.source_format))
    if f.has_packed_depth is not None:
        where.append("b.has_packed_depth = ?")
        params.append(1 if bool(f.has_packed_depth) else 0)
    if f.scene_type is not None:
        where.append("b.scene_type = ?")
        params.append(str(f.scene_type))
    if f.operating_regime is not None:
        where.append("b.operating_regime = ?")
        params.append(str(f.operating_regime))
    if f.min_devices is not None:
        where.append("b.num_devices >= ?")
        params.append(int(f.min_devices))

    if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
        join_devices = True
        # Only consider devices that actually have packed depth index summarized.
        where.append("d.packed_depth_index_path IS NOT NULL")
        if f.packed_depth_min_frames is not None:
            where.append("d.packed_depth_frames_count >= ?")
            params.append(int(f.packed_depth_min_frames))
        if f.packed_depth_max_gaps is not None:
            where.append("d.packed_depth_gaps <= ?")
            params.append(int(f.packed_depth_max_gaps))

    wsql = ""
    if where:
        wsql = "WHERE " + " AND ".join(where)

    if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
        order_by = "bundle_dir"

    sql = (
        "SELECT DISTINCT b.bundle_dir AS bundle_dir "
        "FROM bundles b "
        + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
        + wsql
        + f" ORDER BY b.{order_by} ASC"
    )
    if limit is not None and int(limit) > 0:
        sql += " LIMIT ?"
        params.append(int(limit))

    conn = _connect_ro(Path(db_path))
    try:
        rows = conn.execute(sql, params).fetchall()
        return [str(r["bundle_dir"]) for r in rows]
    finally:
        conn.close()


def iter_bundle_shard_seeds(
    *,
    db_path: Path,
    filters: Optional[QueryFilters] = None,
    limit: Optional[int] = None,
    order_by: str = "bundle_dir",
) -> Iterator[Dict[str, Any]]:
    """
    Stream bundle "seed" rows needed for sample-index generation.

    Returns dicts with:
      - bundle_dir
      - num_devices
      - default_device_id
      - teacher_depth_count
    """
    f = filters or QueryFilters()
    params: List[Any] = []

    where: List[str] = []
    join_devices = False

    if f.source_format is not None:
        where.append("b.source_format = ?")
        params.append(str(f.source_format))
    if f.has_packed_depth is not None:
        where.append("b.has_packed_depth = ?")
        params.append(1 if bool(f.has_packed_depth) else 0)
    if f.scene_type is not None:
        where.append("b.scene_type = ?")
        params.append(str(f.scene_type))
    if f.operating_regime is not None:
        where.append("b.operating_regime = ?")
        params.append(str(f.operating_regime))
    if f.min_devices is not None:
        where.append("b.num_devices >= ?")
        params.append(int(f.min_devices))

    if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None:
        join_devices = True
        where.append("d.packed_depth_index_path IS NOT NULL")
        if f.packed_depth_min_frames is not None:
            where.append("d.packed_depth_frames_count >= ?")
            params.append(int(f.packed_depth_min_frames))
        if f.packed_depth_max_gaps is not None:
            where.append("d.packed_depth_gaps <= ?")
            params.append(int(f.packed_depth_max_gaps))

    wsql = ""
    if where:
        wsql = "WHERE " + " AND ".join(where)

    if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}:
        order_by = "bundle_dir"

    sql = (
        "SELECT DISTINCT "
        "b.bundle_dir AS bundle_dir, "
        "COALESCE(b.num_devices, 0) AS num_devices, "
        "b.default_device_id AS default_device_id, "
        "COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count "
        "FROM bundles b "
        + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "")
        + wsql
        + f" ORDER BY b.{order_by} ASC"
    )
    if limit is not None and int(limit) > 0:
        sql += " LIMIT ?"
        params.append(int(limit))

    conn = _connect_ro(Path(db_path))
    try:
        cur = conn.execute(sql, params)
        for r in cur:
            yield {
                "bundle_dir": str(r["bundle_dir"]),
                "num_devices": int(r["num_devices"] or 0),
                "default_device_id": r["default_device_id"],
                "teacher_depth_count": int(r["teacher_depth_count"] or 0),
            }
    finally:
        conn.close()


def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None:
    p = Path(output_path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else ""))


def export_rows_jsonl(
    *,
    db_path: Path,
    bundle_dirs: Sequence[str],
    output_path: Path,
) -> None:
    """
    Export stored row_json for specific bundle dirs as JSONL.
    """
    p = Path(output_path)
    p.parent.mkdir(parents=True, exist_ok=True)

    # Chunk to keep SQL param counts reasonable
    chunk = 500
    conn = _connect_ro(Path(db_path))
    try:
        with p.open("w") as f:
            for i in range(0, len(bundle_dirs), chunk):
                sub = [str(x) for x in bundle_dirs[i : i + chunk]]
                if not sub:
                    continue
                qmarks = ",".join(["?"] * len(sub))
                sql = (
                    "SELECT row_json FROM bundles "
                    f"WHERE bundle_dir IN ({qmarks}) "
                    "ORDER BY bundle_dir ASC"
                )
                rows = conn.execute(sql, sub).fetchall()
                for r in rows:
                    try:
                        # row_json is already JSON; normalize to single-line JSONL
                        obj = json.loads(r["row_json"])
                        f.write(json.dumps(obj, default=str) + "\n")
                    except Exception:
                        # fall back to raw blob
                        f.write(str(r["row_json"]) + "\n")
    finally:
        conn.close()