File size: 17,988 Bytes
722bda8
 
 
 
 
 
1776bed
722bda8
 
 
 
 
 
 
 
 
 
 
ab558da
722bda8
1776bed
722bda8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1776bed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
722bda8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89c5ec3
722bda8
 
 
 
 
 
 
89c5ec3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
722bda8
 
 
89c5ec3
 
722bda8
 
 
 
 
 
89c5ec3
722bda8
 
1776bed
 
 
ab558da
1776bed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab558da
1776bed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
722bda8
 
 
 
ab558da
722bda8
 
 
 
 
1776bed
 
 
 
 
 
 
 
 
 
 
 
 
 
722bda8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1776bed
 
 
 
 
 
 
 
 
 
 
 
 
722bda8
 
1776bed
722bda8
 
1776bed
722bda8
1776bed
722bda8
1776bed
722bda8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1776bed
722bda8
 
1776bed
722bda8
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
#!/usr/bin/env python3
"""Prepare JW Search data for a Hugging Face Space at container startup."""

from __future__ import annotations

import logging
import json
import os
from pathlib import Path
import shutil
import tarfile
from urllib.error import URLError
from urllib.request import urlopen, urlretrieve
import zipfile


LOGGER = logging.getLogger("prepare-runtime-data")
DEFAULT_DATA_ROOT = "/data/search-ui"
URL_TIMEOUT_SECONDS = 300
READY_MARKERS = ("database.db", "json")
READY_MARKER_FILE = ".search-ui-data-ready"
DATA_DIRECTORIES = (
    "json",
    "subtitles",
    "videos",
    "publications",
    "transcriptions",
)


def configure_logging() -> None:
    """Configure startup logging."""
    logging.basicConfig(level=logging.INFO, format="%(message)s")


def env_truthy(name: str) -> bool:
    """Return whether an environment variable is truthy."""
    return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}


def env_falsey(name: str) -> bool:
    """Return whether an environment variable is explicitly false."""
    return os.getenv(name, "").strip().lower() in {"0", "false", "no", "off"}


def has_runtime_data(data_root: Path) -> bool:
    """Return whether the expected data bundle markers already exist."""
    return any((data_root / marker).exists() for marker in READY_MARKERS)


def has_completed_runtime_data(data_root: Path) -> bool:
    """Return whether a bundle extraction completed successfully."""
    return (data_root / READY_MARKER_FILE).exists()


def mark_runtime_data_ready(data_root: Path) -> None:
    """Write the marker used to distinguish complete data from partial data."""
    (data_root / READY_MARKER_FILE).write_text("ready\n", encoding="utf-8")


def clear_runtime_data(data_root: Path) -> None:
    """Clear runtime data while preserving download caches."""
    for child in data_root.iterdir():
        if child.name == ".bundle-cache":
            continue
        if child.is_dir() and not child.is_symlink():
            shutil.rmtree(child)
        else:
            child.unlink()


def iter_database_files(source_dir: Path) -> list[Path]:
    """Return root-level SQLite database files from a data source."""
    return sorted(source_dir.glob("*.db"))


def link_or_copy(source: Path, target: Path, *, symlink: bool) -> None:
    """Place one source path at target by symlink or copy."""
    if target.exists() or target.is_symlink():
        return
    target.parent.mkdir(parents=True, exist_ok=True)
    if symlink:
        target.symlink_to(source)
    elif source.is_dir():
        shutil.copytree(source, target)
    else:
        shutil.copy2(source, target)


def prepare_from_source_dir(source_dir: Path, data_root: Path) -> None:
    """Prepare runtime data from an already-unpacked mounted directory."""
    if not source_dir.exists():
        raise RuntimeError(f"Configured data source does not exist: {source_dir}")
    if not source_dir.is_dir():
        raise RuntimeError(f"Configured data source is not a directory: {source_dir}")

    LOGGER.info("Preparing runtime data from mounted source: %s", source_dir)
    for database_path in iter_database_files(source_dir):
        target_path = data_root / database_path.name
        if not target_path.exists():
            LOGGER.info("Copying database %s", database_path.name)
            shutil.copy2(database_path, target_path)

    for directory_name in DATA_DIRECTORIES:
        source_path = source_dir / directory_name
        if not source_path.exists():
            continue
        target_path = data_root / directory_name
        if target_path.exists() or target_path.is_symlink():
            continue
        LOGGER.info("Linking data directory %s", directory_name)
        target_path.symlink_to(source_path)


def download_url(url: str, cache_dir: Path) -> Path:
    """Download a data bundle URL into the runtime cache."""
    cache_dir.mkdir(parents=True, exist_ok=True)
    filename = Path(url.split("?", 1)[0]).name or "search-ui-data.tar"
    target = cache_dir / filename
    temp_target = target.with_suffix(target.suffix + ".tmp")

    if target.exists() and target.stat().st_size > 0:
        LOGGER.info("Using cached data bundle: %s", target)
        return target

    LOGGER.info("Downloading data bundle from %s", url)
    try:
        urlretrieve(url, temp_target)
    except (OSError, URLError) as exc:
        raise RuntimeError(f"Unable to download data bundle from {url}: {exc}") from exc

    temp_target.replace(target)
    return target


def download_hub_file(cache_dir: Path) -> Path | None:
    """Download a data bundle from a Hugging Face repo when configured."""
    repo_id = os.getenv("SEARCH_UI_DATA_BUNDLE_REPO_ID", "").strip()
    filename = os.getenv("SEARCH_UI_DATA_BUNDLE_FILENAME", "").strip()
    if not repo_id and not filename:
        return None
    if not repo_id or not filename:
        raise RuntimeError(
            "Set both SEARCH_UI_DATA_BUNDLE_REPO_ID and "
            "SEARCH_UI_DATA_BUNDLE_FILENAME, or neither."
        )

    from huggingface_hub import hf_hub_download

    repo_type = os.getenv("SEARCH_UI_DATA_BUNDLE_REPO_TYPE", "dataset").strip() or "dataset"
    revision = os.getenv("SEARCH_UI_DATA_BUNDLE_REVISION", "").strip() or None
    LOGGER.info("Downloading data bundle from Hugging Face %s repo %s", repo_type, repo_id)
    return Path(
        hf_hub_download(
            repo_id=repo_id,
            filename=filename,
            repo_type=repo_type,
            revision=revision,
            local_dir=str(cache_dir / "hub"),
        )
    )


def resolve_bundle(cache_dir: Path) -> Path | None:
    """Resolve the configured data bundle source."""
    local_path = os.getenv("SEARCH_UI_DATA_BUNDLE_PATH", "").strip()
    if local_path:
        bundle_path = Path(local_path).expanduser().resolve()
        if not bundle_path.exists():
            raise RuntimeError(f"Configured data bundle does not exist: {bundle_path}")
        return bundle_path

    hub_path = download_hub_file(cache_dir)
    if hub_path is not None:
        return hub_path

    url = os.getenv("SEARCH_UI_DATA_BUNDLE_URL", "").strip()
    if url:
        return download_url(url, cache_dir)

    return None


def validate_member_path(target_dir: Path, member_name: str) -> None:
    """Reject archive members that would write outside the data root."""
    target_dir = target_dir.resolve()
    destination = (target_dir / member_name).resolve()
    if destination == target_dir:
        return
    if not str(destination).startswith(str(target_dir) + os.sep):
        raise RuntimeError(f"Unsafe data bundle path rejected: {member_name}")


def is_ignorable_archive_member(member_name: str) -> bool:
    """Return whether a bundle member is safe to skip."""
    parts = [part for part in member_name.split("/") if part and part != "."]
    return any(part == "__MACOSX" or part.startswith("._") for part in parts)


def extract_tar_member(archive: tarfile.TarFile, member: tarfile.TarInfo, target_dir: Path) -> None:
    """Safely extract one tar member into the data root."""
    if is_ignorable_archive_member(member.name):
        return

    validate_member_path(target_dir, member.name)
    if member.issym() or member.islnk():
        raise RuntimeError(f"Unsafe tar link rejected: {member.name}")

    destination = (target_dir / member.name).resolve()
    if member.isdir():
        destination.mkdir(parents=True, exist_ok=True)
        return
    if not member.isfile():
        raise RuntimeError(f"Unsupported tar member rejected: {member.name}")

    source = archive.extractfile(member)
    if source is None:
        raise RuntimeError(f"Unable to read tar member: {member.name}")

    destination.parent.mkdir(parents=True, exist_ok=True)
    with source, destination.open("wb") as target:
        shutil.copyfileobj(source, target, length=1024 * 1024)


def extract_tar(archive_path: Path, target_dir: Path) -> None:
    """Safely extract a tar archive into the data root."""
    with tarfile.open(archive_path) as archive:
        for member in archive:
            extract_tar_member(archive, member, target_dir)


def extract_tar_stream(fileobj, target_dir: Path) -> None:
    """Safely stream-extract a tar archive into the data root."""
    with tarfile.open(fileobj=fileobj, mode="r|*") as archive:
        for member in archive:
            extract_tar_member(archive, member, target_dir)


def read_json_url(url: str) -> dict:
    """Read a JSON document from a URL."""
    try:
        with urlopen(url, timeout=URL_TIMEOUT_SECONDS) as response:
            return json.loads(response.read().decode("utf-8"))
    except (OSError, URLError, json.JSONDecodeError) as exc:
        raise RuntimeError(f"Unable to read data bundle manifest from {url}: {exc}") from exc


def resolve_part_urls(manifest: dict) -> list[str]:
    """Resolve data bundle part URLs from a manifest document."""
    raw_parts = manifest.get("parts")
    if not isinstance(raw_parts, list) or not raw_parts:
        raise RuntimeError("Data bundle parts manifest must include a non-empty parts list.")

    urls: list[str] = []
    for index, item in enumerate(raw_parts, start=1):
        if isinstance(item, str):
            url = item
        elif isinstance(item, dict) and isinstance(item.get("url"), str):
            url = item["url"]
        else:
            raise RuntimeError(f"Invalid data bundle part at index {index}.")

        if not url.startswith(("http://", "https://")):
            raise RuntimeError(f"Data bundle part URL must be HTTP(S): {url}")
        urls.append(url)
    return urls


def download_part(url: str, target_path: Path, *, index: int, total: int) -> Path:
    """Download one bundle part to a temporary local file before reading it."""
    target_path.parent.mkdir(parents=True, exist_ok=True)
    temp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    last_error: Exception | None = None

    for attempt in range(1, 4):
        if temp_path.exists():
            temp_path.unlink()
        try:
            LOGGER.info("Downloading data bundle part %s/%s", index, total)
            with urlopen(url, timeout=URL_TIMEOUT_SECONDS) as response, temp_path.open("wb") as output:
                shutil.copyfileobj(response, output, length=1024 * 1024)

            expected_size = response.headers.get("Content-Length")
            if expected_size and temp_path.stat().st_size != int(expected_size):
                raise RuntimeError(
                    f"part size mismatch: expected {expected_size}, got {temp_path.stat().st_size}"
                )

            temp_path.replace(target_path)
            return target_path
        except (OSError, URLError, RuntimeError) as exc:
            last_error = exc
            LOGGER.warning(
                "Failed to download data bundle part %s/%s on attempt %s: %s",
                index,
                total,
                attempt,
                exc,
            )

    raise RuntimeError(f"Unable to download data bundle part {index}/{total}: {last_error}")


class SequentialPartReader:
    """File-like reader that concatenates downloaded bundle part files."""

    def __init__(self, part_urls: list[str], cache_dir: Path):
        self.part_urls = part_urls
        self.cache_dir = cache_dir
        self.total = len(part_urls)
        self.index = 0
        self.current_file = None
        self.current_path: Path | None = None

    def close(self) -> None:
        if self.current_file is not None:
            self.current_file.close()
            self.current_file = None
        if self.current_path is not None and self.current_path.exists():
            self.current_path.unlink()
        self.current_path = None

    def _open_next_part(self) -> bool:
        self.close()
        if self.index >= self.total:
            return False

        self.index += 1
        target_path = self.cache_dir / f"bundle-part-{self.index:04d}"
        self.current_path = download_part(
            self.part_urls[self.index - 1],
            target_path,
            index=self.index,
            total=self.total,
        )
        self.current_file = self.current_path.open("rb")
        return True

    def read(self, size: int = -1) -> bytes:
        if size is None or size < 0:
            size = 1024 * 1024

        chunks: list[bytes] = []
        remaining = size
        while remaining > 0:
            if self.current_file is None and not self._open_next_part():
                break

            chunk = self.current_file.read(remaining)
            if chunk:
                chunks.append(chunk)
                remaining -= len(chunk)
                continue

            self.close()

        return b"".join(chunks)


def stream_extract_url(url: str, target_dir: Path) -> None:
    """Stream a remote tar bundle directly into the data root."""
    LOGGER.info("Streaming data bundle from %s", url)
    try:
        with urlopen(url, timeout=URL_TIMEOUT_SECONDS) as response:
            extract_tar_stream(response, target_dir)
    except (OSError, URLError, tarfile.TarError) as exc:
        raise RuntimeError(f"Unable to stream data bundle from {url}: {exc}") from exc


def stream_extract_parts_manifest(manifest_url: str, target_dir: Path, cache_dir: Path) -> None:
    """Stream-extract a split tar bundle described by a manifest URL."""
    LOGGER.info("Streaming split data bundle from manifest %s", manifest_url)
    manifest = read_json_url(manifest_url)
    part_urls = resolve_part_urls(manifest)
    reader = SequentialPartReader(part_urls, cache_dir / "parts")
    try:
        extract_tar_stream(reader, target_dir)
    except (OSError, URLError, tarfile.TarError) as exc:
        raise RuntimeError(f"Unable to stream split data bundle: {exc}") from exc
    finally:
        reader.close()


def extract_zip(archive_path: Path, target_dir: Path) -> None:
    """Safely extract a zip archive into the data root."""
    with zipfile.ZipFile(archive_path) as archive:
        for member_name in archive.namelist():
            validate_member_path(target_dir, member_name)
        archive.extractall(target_dir)


def extract_bundle(archive_path: Path, target_dir: Path) -> None:
    """Extract a supported data bundle archive."""
    LOGGER.info("Extracting data bundle %s into %s", archive_path, target_dir)
    if zipfile.is_zipfile(archive_path):
        extract_zip(archive_path, target_dir)
        return
    if tarfile.is_tarfile(archive_path):
        extract_tar(archive_path, target_dir)
        return
    raise RuntimeError(f"Unsupported data bundle format: {archive_path}")


def main() -> int:
    """Prepare runtime data and return a process exit code."""
    configure_logging()
    data_root = Path(os.getenv("SEARCH_UI_DATA_ROOT", DEFAULT_DATA_ROOT)).resolve()
    cache_dir = data_root / ".bundle-cache"
    force = env_truthy("SEARCH_UI_FORCE_DATA_BUNDLE")

    data_root.mkdir(parents=True, exist_ok=True)
    source_dir = os.getenv("SEARCH_UI_DATA_SOURCE_DIR", "").strip()
    if source_dir:
        if has_runtime_data(data_root) and not force:
            LOGGER.info("Using existing runtime data in %s", data_root)
        else:
            if force:
                LOGGER.info("Clearing existing runtime data before source preparation.")
                for child in data_root.iterdir():
                    if child.name == ".bundle-cache":
                        continue
                    if child.is_dir() and not child.is_symlink():
                        shutil.rmtree(child)
                    else:
                        child.unlink()
            prepare_from_source_dir(Path(source_dir).expanduser().resolve(), data_root)
        return 0

    parts_manifest_url = os.getenv("SEARCH_UI_DATA_BUNDLE_PARTS_MANIFEST_URL", "").strip()
    if parts_manifest_url:
        if has_completed_runtime_data(data_root) and not force:
            LOGGER.info("Using existing runtime data in %s", data_root)
            return 0
        if force or has_runtime_data(data_root):
            LOGGER.info("Clearing existing runtime data before split bundle extraction.")
            clear_runtime_data(data_root)
        stream_extract_parts_manifest(parts_manifest_url, data_root, cache_dir)
        mark_runtime_data_ready(data_root)
        LOGGER.info("Runtime data is ready in %s", data_root)
        return 0

    url = os.getenv("SEARCH_UI_DATA_BUNDLE_URL", "").strip()
    if url and not env_falsey("SEARCH_UI_STREAM_DATA_BUNDLE"):
        if has_completed_runtime_data(data_root) and not force:
            LOGGER.info("Using existing runtime data in %s", data_root)
            return 0
        if force or has_runtime_data(data_root):
            LOGGER.info("Clearing existing runtime data before stream extraction.")
            clear_runtime_data(data_root)
        stream_extract_url(url, data_root)
        mark_runtime_data_ready(data_root)
        LOGGER.info("Runtime data is ready in %s", data_root)
        return 0

    bundle_path = resolve_bundle(cache_dir)
    if bundle_path is None:
        if has_runtime_data(data_root):
            LOGGER.info("Using existing runtime data in %s", data_root)
        else:
            LOGGER.warning(
                "No data bundle configured. The app will start, but searches will be empty."
            )
        return 0

    if has_runtime_data(data_root) and not force:
        LOGGER.info(
            "Runtime data already exists in %s. Set SEARCH_UI_FORCE_DATA_BUNDLE=1 "
            "to re-extract the configured bundle.",
            data_root,
        )
        return 0

    if force:
        LOGGER.info("Clearing existing runtime data before bundle extraction.")
        clear_runtime_data(data_root)

    extract_bundle(bundle_path, data_root)
    mark_runtime_data_ready(data_root)
    LOGGER.info("Runtime data is ready in %s", data_root)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())