""" Runtime bootstrapper. Goal: - Keep the Docker image stable (only runtime deps + this bootstrapper). - At container startup, optionally download a zip payload, extract it, then run the entry script. Security notes: - Running remote code is risky. Prefer pinning with SCRIPT_ZIP_SHA256. - Extraction is protected against ZipSlip path traversal. """ from __future__ import annotations import hashlib import os import shutil import subprocess import sys import time import urllib.request import uuid import zipfile from dataclasses import dataclass from pathlib import Path def _e(name: str, default: str | None = None) -> str | None: v = os.getenv(name) if v is None or v == "": return default return v def _sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def _safe_extract_zip(zip_path: Path, dest_dir: Path) -> None: dest_dir.mkdir(parents=True, exist_ok=True) dest_real = dest_dir.resolve() with zipfile.ZipFile(zip_path) as zf: for info in zf.infolist(): # Guard against ZipSlip: normalize and ensure within dest. target = dest_dir / info.filename try: target_real = target.resolve() except FileNotFoundError: # Parent may not exist yet; resolve the parent. target_real = (dest_dir / Path(info.filename).as_posix()).resolve() if dest_real not in target_real.parents and target_real != dest_real: raise RuntimeError(f"Unsafe zip entry path: {info.filename}") zf.extractall(dest_dir) @dataclass(frozen=True) class BootstrapConfig: zip_url: str | None zip_sha256: str | None allow_unverified: bool extract_base: Path tmp_dir: Path entry_relpath: str download_timeout_s: float bearer_token: str | None header_kv: str | None def _load_config() -> BootstrapConfig: zip_url = _e("SCRIPT_ZIP_URL") zip_sha256 = _e("SCRIPT_ZIP_SHA256") allow_unverified = _e("ALLOW_UNVERIFIED_ZIP", "0") == "1" extract_base = Path(_e("SCRIPT_EXTRACT_DIR", "/opt/runtime") or "/opt/runtime") tmp_dir = Path(_e("SCRIPT_TMP_DIR", str(extract_base / ".tmp")) or str(extract_base / ".tmp")) # Where to run inside the extracted payload. # Recommended: zip contains folder "app/" with openai.py inside. workdir = _e("SCRIPT_WORKDIR", "app") or "" entry = _e("SCRIPT_ENTRY", "openai.py") or "openai.py" entry_relpath = str(Path(workdir) / entry) if workdir else entry download_timeout_s = float(_e("SCRIPT_DOWNLOAD_TIMEOUT", "60") or "60") # Auth/header options: # - SCRIPT_ZIP_TOKEN: bearer token # - HF_TOKEN: commonly present on Spaces; used as fallback bearer_token = _e("SCRIPT_ZIP_TOKEN") or _e("HF_TOKEN") # - SCRIPT_ZIP_HEADER: raw "Key: Value" header line, optional header_kv = _e("SCRIPT_ZIP_HEADER") return BootstrapConfig( zip_url=zip_url, zip_sha256=zip_sha256, allow_unverified=allow_unverified, extract_base=extract_base, tmp_dir=tmp_dir, entry_relpath=entry_relpath, download_timeout_s=download_timeout_s, bearer_token=bearer_token, header_kv=header_kv, ) def _download_zip(cfg: BootstrapConfig, out_path: Path) -> None: assert cfg.zip_url req = urllib.request.Request(cfg.zip_url, method="GET") if cfg.bearer_token: req.add_header("Authorization", f"Bearer {cfg.bearer_token}") if cfg.header_kv and ":" in cfg.header_kv: k, v = cfg.header_kv.split(":", 1) req.add_header(k.strip(), v.strip()) with urllib.request.urlopen(req, timeout=cfg.download_timeout_s) as resp: status = getattr(resp, "status", None) if status is None and hasattr(resp, "getcode"): status = resp.getcode() if isinstance(status, int) and status >= 400: raise RuntimeError(f"Download failed: HTTP {status}") out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("wb") as f: shutil.copyfileobj(resp, f) def _atomic_replace_dir(tmp_dir: Path, target_dir: Path) -> None: if target_dir.exists(): shutil.rmtree(target_dir, ignore_errors=True) target_dir.parent.mkdir(parents=True, exist_ok=True) tmp_dir.rename(target_dir) def main() -> int: cfg = _load_config() # Where the extracted payload will live. current_dir = cfg.extract_base / "current" if cfg.zip_url: if not cfg.zip_sha256 and not cfg.allow_unverified: raise RuntimeError( "SCRIPT_ZIP_URL is set but SCRIPT_ZIP_SHA256 is missing. " "Set SCRIPT_ZIP_SHA256 (recommended) or ALLOW_UNVERIFIED_ZIP=1." ) cfg.tmp_dir.mkdir(parents=True, exist_ok=True) td_path = cfg.tmp_dir / f"bootstrap_{uuid.uuid4().hex}" td_path.mkdir(parents=True, exist_ok=True) try: zip_path = td_path / "payload.zip" t0 = time.time() print(f"[bootstrap] downloading zip from SCRIPT_ZIP_URL to {zip_path}", flush=True) _download_zip(cfg, zip_path) print(f"[bootstrap] download done in {time.time() - t0:.2f}s size={zip_path.stat().st_size}", flush=True) if cfg.zip_sha256: got = _sha256_file(zip_path) want = cfg.zip_sha256.lower().strip() if got != want: raise RuntimeError(f"SHA256 mismatch: got={got} want={want}") print("[bootstrap] sha256 verified", flush=True) else: print("[bootstrap] sha256 not provided; running unverified payload", flush=True) tmp_extract = cfg.extract_base / f".extract_tmp_{uuid.uuid4().hex}" if tmp_extract.exists(): shutil.rmtree(tmp_extract, ignore_errors=True) print(f"[bootstrap] extracting zip to {tmp_extract}", flush=True) _safe_extract_zip(zip_path, tmp_extract) _atomic_replace_dir(tmp_extract, current_dir) print(f"[bootstrap] extracted to {current_dir}", flush=True) finally: shutil.rmtree(td_path, ignore_errors=True) else: raise RuntimeError( "SCRIPT_ZIP_URL is required in this HF deployment. " "Set SCRIPT_ZIP_URL and SCRIPT_ZIP_SHA256 (recommended)." ) entry_path = (current_dir / cfg.entry_relpath).resolve() if not entry_path.exists(): raise RuntimeError(f"Entry script not found: {entry_path} (SCRIPT_WORKDIR/SCRIPT_ENTRY)") print(f"[bootstrap] starting: {sys.executable} {entry_path}", flush=True) proc = subprocess.run([sys.executable, str(entry_path)], cwd=str(entry_path.parent)) return int(proc.returncode) if __name__ == "__main__": raise SystemExit(main())