zai / bootstrap.py
hern0425's picture
Deploy from App
f4dff88 verified
"""
Runtime bootstrapper.
Goal:
- Keep the Docker image stable (only runtime deps + this bootstrapper).
- At container startup, optionally download a zip payload, extract it, then run the entry script.
Security notes:
- Running remote code is risky. Prefer pinning with SCRIPT_ZIP_SHA256.
- Extraction is protected against ZipSlip path traversal.
"""
from __future__ import annotations
import hashlib
import os
import shutil
import subprocess
import sys
import time
import urllib.request
import uuid
import zipfile
from dataclasses import dataclass
from pathlib import Path
def _e(name: str, default: str | None = None) -> str | None:
v = os.getenv(name)
if v is None or v == "":
return default
return v
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def _safe_extract_zip(zip_path: Path, dest_dir: Path) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
dest_real = dest_dir.resolve()
with zipfile.ZipFile(zip_path) as zf:
for info in zf.infolist():
# Guard against ZipSlip: normalize and ensure within dest.
target = dest_dir / info.filename
try:
target_real = target.resolve()
except FileNotFoundError:
# Parent may not exist yet; resolve the parent.
target_real = (dest_dir / Path(info.filename).as_posix()).resolve()
if dest_real not in target_real.parents and target_real != dest_real:
raise RuntimeError(f"Unsafe zip entry path: {info.filename}")
zf.extractall(dest_dir)
@dataclass(frozen=True)
class BootstrapConfig:
zip_url: str | None
zip_sha256: str | None
allow_unverified: bool
extract_base: Path
tmp_dir: Path
entry_relpath: str
download_timeout_s: float
bearer_token: str | None
header_kv: str | None
def _load_config() -> BootstrapConfig:
zip_url = _e("SCRIPT_ZIP_URL")
zip_sha256 = _e("SCRIPT_ZIP_SHA256")
allow_unverified = _e("ALLOW_UNVERIFIED_ZIP", "0") == "1"
extract_base = Path(_e("SCRIPT_EXTRACT_DIR", "/opt/runtime") or "/opt/runtime")
tmp_dir = Path(_e("SCRIPT_TMP_DIR", str(extract_base / ".tmp")) or str(extract_base / ".tmp"))
# Where to run inside the extracted payload.
# Recommended: zip contains folder "app/" with openai.py inside.
workdir = _e("SCRIPT_WORKDIR", "app") or ""
entry = _e("SCRIPT_ENTRY", "openai.py") or "openai.py"
entry_relpath = str(Path(workdir) / entry) if workdir else entry
download_timeout_s = float(_e("SCRIPT_DOWNLOAD_TIMEOUT", "60") or "60")
# Auth/header options:
# - SCRIPT_ZIP_TOKEN: bearer token
# - HF_TOKEN: commonly present on Spaces; used as fallback
bearer_token = _e("SCRIPT_ZIP_TOKEN") or _e("HF_TOKEN")
# - SCRIPT_ZIP_HEADER: raw "Key: Value" header line, optional
header_kv = _e("SCRIPT_ZIP_HEADER")
return BootstrapConfig(
zip_url=zip_url,
zip_sha256=zip_sha256,
allow_unverified=allow_unverified,
extract_base=extract_base,
tmp_dir=tmp_dir,
entry_relpath=entry_relpath,
download_timeout_s=download_timeout_s,
bearer_token=bearer_token,
header_kv=header_kv,
)
def _download_zip(cfg: BootstrapConfig, out_path: Path) -> None:
assert cfg.zip_url
req = urllib.request.Request(cfg.zip_url, method="GET")
if cfg.bearer_token:
req.add_header("Authorization", f"Bearer {cfg.bearer_token}")
if cfg.header_kv and ":" in cfg.header_kv:
k, v = cfg.header_kv.split(":", 1)
req.add_header(k.strip(), v.strip())
with urllib.request.urlopen(req, timeout=cfg.download_timeout_s) as resp:
status = getattr(resp, "status", None)
if status is None and hasattr(resp, "getcode"):
status = resp.getcode()
if isinstance(status, int) and status >= 400:
raise RuntimeError(f"Download failed: HTTP {status}")
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("wb") as f:
shutil.copyfileobj(resp, f)
def _atomic_replace_dir(tmp_dir: Path, target_dir: Path) -> None:
if target_dir.exists():
shutil.rmtree(target_dir, ignore_errors=True)
target_dir.parent.mkdir(parents=True, exist_ok=True)
tmp_dir.rename(target_dir)
def main() -> int:
cfg = _load_config()
# Where the extracted payload will live.
current_dir = cfg.extract_base / "current"
if cfg.zip_url:
if not cfg.zip_sha256 and not cfg.allow_unverified:
raise RuntimeError(
"SCRIPT_ZIP_URL is set but SCRIPT_ZIP_SHA256 is missing. "
"Set SCRIPT_ZIP_SHA256 (recommended) or ALLOW_UNVERIFIED_ZIP=1."
)
cfg.tmp_dir.mkdir(parents=True, exist_ok=True)
td_path = cfg.tmp_dir / f"bootstrap_{uuid.uuid4().hex}"
td_path.mkdir(parents=True, exist_ok=True)
try:
zip_path = td_path / "payload.zip"
t0 = time.time()
print(f"[bootstrap] downloading zip from SCRIPT_ZIP_URL to {zip_path}", flush=True)
_download_zip(cfg, zip_path)
print(f"[bootstrap] download done in {time.time() - t0:.2f}s size={zip_path.stat().st_size}", flush=True)
if cfg.zip_sha256:
got = _sha256_file(zip_path)
want = cfg.zip_sha256.lower().strip()
if got != want:
raise RuntimeError(f"SHA256 mismatch: got={got} want={want}")
print("[bootstrap] sha256 verified", flush=True)
else:
print("[bootstrap] sha256 not provided; running unverified payload", flush=True)
tmp_extract = cfg.extract_base / f".extract_tmp_{uuid.uuid4().hex}"
if tmp_extract.exists():
shutil.rmtree(tmp_extract, ignore_errors=True)
print(f"[bootstrap] extracting zip to {tmp_extract}", flush=True)
_safe_extract_zip(zip_path, tmp_extract)
_atomic_replace_dir(tmp_extract, current_dir)
print(f"[bootstrap] extracted to {current_dir}", flush=True)
finally:
shutil.rmtree(td_path, ignore_errors=True)
else:
raise RuntimeError(
"SCRIPT_ZIP_URL is required in this HF deployment. "
"Set SCRIPT_ZIP_URL and SCRIPT_ZIP_SHA256 (recommended)."
)
entry_path = (current_dir / cfg.entry_relpath).resolve()
if not entry_path.exists():
raise RuntimeError(f"Entry script not found: {entry_path} (SCRIPT_WORKDIR/SCRIPT_ENTRY)")
print(f"[bootstrap] starting: {sys.executable} {entry_path}", flush=True)
proc = subprocess.run([sys.executable, str(entry_path)], cwd=str(entry_path.parent))
return int(proc.returncode)
if __name__ == "__main__":
raise SystemExit(main())