File size: 6,981 Bytes
f4dff88 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
Runtime bootstrapper.
Goal:
- Keep the Docker image stable (only runtime deps + this bootstrapper).
- At container startup, optionally download a zip payload, extract it, then run the entry script.
Security notes:
- Running remote code is risky. Prefer pinning with SCRIPT_ZIP_SHA256.
- Extraction is protected against ZipSlip path traversal.
"""
from __future__ import annotations
import hashlib
import os
import shutil
import subprocess
import sys
import time
import urllib.request
import uuid
import zipfile
from dataclasses import dataclass
from pathlib import Path
def _e(name: str, default: str | None = None) -> str | None:
v = os.getenv(name)
if v is None or v == "":
return default
return v
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def _safe_extract_zip(zip_path: Path, dest_dir: Path) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
dest_real = dest_dir.resolve()
with zipfile.ZipFile(zip_path) as zf:
for info in zf.infolist():
# Guard against ZipSlip: normalize and ensure within dest.
target = dest_dir / info.filename
try:
target_real = target.resolve()
except FileNotFoundError:
# Parent may not exist yet; resolve the parent.
target_real = (dest_dir / Path(info.filename).as_posix()).resolve()
if dest_real not in target_real.parents and target_real != dest_real:
raise RuntimeError(f"Unsafe zip entry path: {info.filename}")
zf.extractall(dest_dir)
@dataclass(frozen=True)
class BootstrapConfig:
zip_url: str | None
zip_sha256: str | None
allow_unverified: bool
extract_base: Path
tmp_dir: Path
entry_relpath: str
download_timeout_s: float
bearer_token: str | None
header_kv: str | None
def _load_config() -> BootstrapConfig:
zip_url = _e("SCRIPT_ZIP_URL")
zip_sha256 = _e("SCRIPT_ZIP_SHA256")
allow_unverified = _e("ALLOW_UNVERIFIED_ZIP", "0") == "1"
extract_base = Path(_e("SCRIPT_EXTRACT_DIR", "/opt/runtime") or "/opt/runtime")
tmp_dir = Path(_e("SCRIPT_TMP_DIR", str(extract_base / ".tmp")) or str(extract_base / ".tmp"))
# Where to run inside the extracted payload.
# Recommended: zip contains folder "app/" with openai.py inside.
workdir = _e("SCRIPT_WORKDIR", "app") or ""
entry = _e("SCRIPT_ENTRY", "openai.py") or "openai.py"
entry_relpath = str(Path(workdir) / entry) if workdir else entry
download_timeout_s = float(_e("SCRIPT_DOWNLOAD_TIMEOUT", "60") or "60")
# Auth/header options:
# - SCRIPT_ZIP_TOKEN: bearer token
# - HF_TOKEN: commonly present on Spaces; used as fallback
bearer_token = _e("SCRIPT_ZIP_TOKEN") or _e("HF_TOKEN")
# - SCRIPT_ZIP_HEADER: raw "Key: Value" header line, optional
header_kv = _e("SCRIPT_ZIP_HEADER")
return BootstrapConfig(
zip_url=zip_url,
zip_sha256=zip_sha256,
allow_unverified=allow_unverified,
extract_base=extract_base,
tmp_dir=tmp_dir,
entry_relpath=entry_relpath,
download_timeout_s=download_timeout_s,
bearer_token=bearer_token,
header_kv=header_kv,
)
def _download_zip(cfg: BootstrapConfig, out_path: Path) -> None:
assert cfg.zip_url
req = urllib.request.Request(cfg.zip_url, method="GET")
if cfg.bearer_token:
req.add_header("Authorization", f"Bearer {cfg.bearer_token}")
if cfg.header_kv and ":" in cfg.header_kv:
k, v = cfg.header_kv.split(":", 1)
req.add_header(k.strip(), v.strip())
with urllib.request.urlopen(req, timeout=cfg.download_timeout_s) as resp:
status = getattr(resp, "status", None)
if status is None and hasattr(resp, "getcode"):
status = resp.getcode()
if isinstance(status, int) and status >= 400:
raise RuntimeError(f"Download failed: HTTP {status}")
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("wb") as f:
shutil.copyfileobj(resp, f)
def _atomic_replace_dir(tmp_dir: Path, target_dir: Path) -> None:
if target_dir.exists():
shutil.rmtree(target_dir, ignore_errors=True)
target_dir.parent.mkdir(parents=True, exist_ok=True)
tmp_dir.rename(target_dir)
def main() -> int:
cfg = _load_config()
# Where the extracted payload will live.
current_dir = cfg.extract_base / "current"
if cfg.zip_url:
if not cfg.zip_sha256 and not cfg.allow_unverified:
raise RuntimeError(
"SCRIPT_ZIP_URL is set but SCRIPT_ZIP_SHA256 is missing. "
"Set SCRIPT_ZIP_SHA256 (recommended) or ALLOW_UNVERIFIED_ZIP=1."
)
cfg.tmp_dir.mkdir(parents=True, exist_ok=True)
td_path = cfg.tmp_dir / f"bootstrap_{uuid.uuid4().hex}"
td_path.mkdir(parents=True, exist_ok=True)
try:
zip_path = td_path / "payload.zip"
t0 = time.time()
print(f"[bootstrap] downloading zip from SCRIPT_ZIP_URL to {zip_path}", flush=True)
_download_zip(cfg, zip_path)
print(f"[bootstrap] download done in {time.time() - t0:.2f}s size={zip_path.stat().st_size}", flush=True)
if cfg.zip_sha256:
got = _sha256_file(zip_path)
want = cfg.zip_sha256.lower().strip()
if got != want:
raise RuntimeError(f"SHA256 mismatch: got={got} want={want}")
print("[bootstrap] sha256 verified", flush=True)
else:
print("[bootstrap] sha256 not provided; running unverified payload", flush=True)
tmp_extract = cfg.extract_base / f".extract_tmp_{uuid.uuid4().hex}"
if tmp_extract.exists():
shutil.rmtree(tmp_extract, ignore_errors=True)
print(f"[bootstrap] extracting zip to {tmp_extract}", flush=True)
_safe_extract_zip(zip_path, tmp_extract)
_atomic_replace_dir(tmp_extract, current_dir)
print(f"[bootstrap] extracted to {current_dir}", flush=True)
finally:
shutil.rmtree(td_path, ignore_errors=True)
else:
raise RuntimeError(
"SCRIPT_ZIP_URL is required in this HF deployment. "
"Set SCRIPT_ZIP_URL and SCRIPT_ZIP_SHA256 (recommended)."
)
entry_path = (current_dir / cfg.entry_relpath).resolve()
if not entry_path.exists():
raise RuntimeError(f"Entry script not found: {entry_path} (SCRIPT_WORKDIR/SCRIPT_ENTRY)")
print(f"[bootstrap] starting: {sys.executable} {entry_path}", flush=True)
proc = subprocess.run([sys.executable, str(entry_path)], cwd=str(entry_path.parent))
return int(proc.returncode)
if __name__ == "__main__":
raise SystemExit(main())
|