| """ |
| Runtime bootstrapper. |
| |
| Goal: |
| - Keep the Docker image stable (only runtime deps + this bootstrapper). |
| - At container startup, optionally download a zip payload, extract it, then run the entry script. |
| |
| Security notes: |
| - Running remote code is risky. Prefer pinning with SCRIPT_ZIP_SHA256. |
| - Extraction is protected against ZipSlip path traversal. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import time |
| import urllib.request |
| import uuid |
| import zipfile |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
|
|
| def _e(name: str, default: str | None = None) -> str | None: |
| v = os.getenv(name) |
| if v is None or v == "": |
| return default |
| return v |
|
|
|
|
| def _sha256_file(path: Path) -> str: |
| h = hashlib.sha256() |
| with path.open("rb") as f: |
| for chunk in iter(lambda: f.read(1024 * 1024), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
|
|
| def _safe_extract_zip(zip_path: Path, dest_dir: Path) -> None: |
| dest_dir.mkdir(parents=True, exist_ok=True) |
| dest_real = dest_dir.resolve() |
|
|
| with zipfile.ZipFile(zip_path) as zf: |
| for info in zf.infolist(): |
| |
| target = dest_dir / info.filename |
| try: |
| target_real = target.resolve() |
| except FileNotFoundError: |
| |
| target_real = (dest_dir / Path(info.filename).as_posix()).resolve() |
|
|
| if dest_real not in target_real.parents and target_real != dest_real: |
| raise RuntimeError(f"Unsafe zip entry path: {info.filename}") |
|
|
| zf.extractall(dest_dir) |
|
|
|
|
| @dataclass(frozen=True) |
| class BootstrapConfig: |
| zip_url: str | None |
| zip_sha256: str | None |
| allow_unverified: bool |
| extract_base: Path |
| tmp_dir: Path |
| entry_relpath: str |
| download_timeout_s: float |
| bearer_token: str | None |
| header_kv: str | None |
|
|
|
|
| def _load_config() -> BootstrapConfig: |
| zip_url = _e("SCRIPT_ZIP_URL") |
| zip_sha256 = _e("SCRIPT_ZIP_SHA256") |
| allow_unverified = _e("ALLOW_UNVERIFIED_ZIP", "0") == "1" |
| extract_base = Path(_e("SCRIPT_EXTRACT_DIR", "/opt/runtime") or "/opt/runtime") |
| tmp_dir = Path(_e("SCRIPT_TMP_DIR", str(extract_base / ".tmp")) or str(extract_base / ".tmp")) |
|
|
| |
| |
| workdir = _e("SCRIPT_WORKDIR", "app") or "" |
| entry = _e("SCRIPT_ENTRY", "openai.py") or "openai.py" |
| entry_relpath = str(Path(workdir) / entry) if workdir else entry |
|
|
| download_timeout_s = float(_e("SCRIPT_DOWNLOAD_TIMEOUT", "60") or "60") |
|
|
| |
| |
| |
| bearer_token = _e("SCRIPT_ZIP_TOKEN") or _e("HF_TOKEN") |
| |
| header_kv = _e("SCRIPT_ZIP_HEADER") |
|
|
| return BootstrapConfig( |
| zip_url=zip_url, |
| zip_sha256=zip_sha256, |
| allow_unverified=allow_unverified, |
| extract_base=extract_base, |
| tmp_dir=tmp_dir, |
| entry_relpath=entry_relpath, |
| download_timeout_s=download_timeout_s, |
| bearer_token=bearer_token, |
| header_kv=header_kv, |
| ) |
|
|
|
|
| def _download_zip(cfg: BootstrapConfig, out_path: Path) -> None: |
| assert cfg.zip_url |
| req = urllib.request.Request(cfg.zip_url, method="GET") |
| if cfg.bearer_token: |
| req.add_header("Authorization", f"Bearer {cfg.bearer_token}") |
| if cfg.header_kv and ":" in cfg.header_kv: |
| k, v = cfg.header_kv.split(":", 1) |
| req.add_header(k.strip(), v.strip()) |
|
|
| with urllib.request.urlopen(req, timeout=cfg.download_timeout_s) as resp: |
| status = getattr(resp, "status", None) |
| if status is None and hasattr(resp, "getcode"): |
| status = resp.getcode() |
| if isinstance(status, int) and status >= 400: |
| raise RuntimeError(f"Download failed: HTTP {status}") |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| with out_path.open("wb") as f: |
| shutil.copyfileobj(resp, f) |
|
|
|
|
| def _atomic_replace_dir(tmp_dir: Path, target_dir: Path) -> None: |
| if target_dir.exists(): |
| shutil.rmtree(target_dir, ignore_errors=True) |
| target_dir.parent.mkdir(parents=True, exist_ok=True) |
| tmp_dir.rename(target_dir) |
|
|
|
|
| def main() -> int: |
| cfg = _load_config() |
|
|
| |
| current_dir = cfg.extract_base / "current" |
|
|
| if cfg.zip_url: |
| if not cfg.zip_sha256 and not cfg.allow_unverified: |
| raise RuntimeError( |
| "SCRIPT_ZIP_URL is set but SCRIPT_ZIP_SHA256 is missing. " |
| "Set SCRIPT_ZIP_SHA256 (recommended) or ALLOW_UNVERIFIED_ZIP=1." |
| ) |
|
|
| cfg.tmp_dir.mkdir(parents=True, exist_ok=True) |
| td_path = cfg.tmp_dir / f"bootstrap_{uuid.uuid4().hex}" |
| td_path.mkdir(parents=True, exist_ok=True) |
| try: |
| zip_path = td_path / "payload.zip" |
|
|
| t0 = time.time() |
| print(f"[bootstrap] downloading zip from SCRIPT_ZIP_URL to {zip_path}", flush=True) |
| _download_zip(cfg, zip_path) |
| print(f"[bootstrap] download done in {time.time() - t0:.2f}s size={zip_path.stat().st_size}", flush=True) |
|
|
| if cfg.zip_sha256: |
| got = _sha256_file(zip_path) |
| want = cfg.zip_sha256.lower().strip() |
| if got != want: |
| raise RuntimeError(f"SHA256 mismatch: got={got} want={want}") |
| print("[bootstrap] sha256 verified", flush=True) |
| else: |
| print("[bootstrap] sha256 not provided; running unverified payload", flush=True) |
|
|
| tmp_extract = cfg.extract_base / f".extract_tmp_{uuid.uuid4().hex}" |
| if tmp_extract.exists(): |
| shutil.rmtree(tmp_extract, ignore_errors=True) |
|
|
| print(f"[bootstrap] extracting zip to {tmp_extract}", flush=True) |
| _safe_extract_zip(zip_path, tmp_extract) |
| _atomic_replace_dir(tmp_extract, current_dir) |
| print(f"[bootstrap] extracted to {current_dir}", flush=True) |
| finally: |
| shutil.rmtree(td_path, ignore_errors=True) |
|
|
| else: |
| raise RuntimeError( |
| "SCRIPT_ZIP_URL is required in this HF deployment. " |
| "Set SCRIPT_ZIP_URL and SCRIPT_ZIP_SHA256 (recommended)." |
| ) |
|
|
| entry_path = (current_dir / cfg.entry_relpath).resolve() |
| if not entry_path.exists(): |
| raise RuntimeError(f"Entry script not found: {entry_path} (SCRIPT_WORKDIR/SCRIPT_ENTRY)") |
|
|
| print(f"[bootstrap] starting: {sys.executable} {entry_path}", flush=True) |
| proc = subprocess.run([sys.executable, str(entry_path)], cwd=str(entry_path.parent)) |
| return int(proc.returncode) |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|