Spaces:
Sleeping
Sleeping
File size: 4,445 Bytes
906e104 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | """HF Space wrapper around scripts/run_pipeline.py.
Hardened for the "runtime ended β models gone" failure mode:
* Background HubPersistor uploads every 5 min (started by run_pipeline).
* SIGTERM/SIGINT handlers do a final upload before exit.
* `atexit` fallback if the OS kills us via SIGKILL after a SIGTERM warning.
* `pip freeze` and `run_manifest.json` written for reproducibility.
* Resilient: pipeline failure still triggers a best-effort artifact upload.
Required Space env vars (Settings β Variables and secrets):
HF_TOKEN β fine-grained token with WRITE access to the model repo
REPO_ID β target model repo, e.g. "your-username/DAHS-Models"
SPACE_ID β (optional) "your-username/your-space-name" for auto-pause
"""
from __future__ import annotations
import http.server
import os
import socketserver
import subprocess
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT))
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = os.environ.get("REPO_ID")
SPACE_ID = os.environ.get("SPACE_ID") # set automatically inside a Space
# CPU-upgrade tier: 16 vCPUs. The pipeline is multiprocessing-bound, so we
# leave 1 core for the periodic uploader thread and use the rest for sims.
CPU_COUNT = os.cpu_count() or 8
WORKERS = str(max(2, CPU_COUNT - 1))
# Q1 budget: 5000 scenarios β ~300k labeled snapshots; 1000 eval seeds
# (Friedman + Nemenyi over 1000 paired observations is well into asymptotic
# regime; Wilcoxon power on this n is essentially saturated).
SCENARIOS = os.environ.get("DAHS_SCENARIOS", "5000")
EVAL_SEEDS = os.environ.get("DAHS_EVAL_SEEDS", "1000")
def main() -> int:
print("--- DAHS_2 HF RUNNER STARTING ---")
print(f"Time : {datetime.now(timezone.utc).isoformat()}")
print(f"CPUs : {CPU_COUNT}, workers={WORKERS}")
print(f"Repo : {REPO_ID}")
print(f"Space: {SPACE_ID}")
if not HF_TOKEN or not REPO_ID:
print("[FATAL] HF_TOKEN and REPO_ID env vars are required.")
print(" Settings β Variables and secrets β add both.")
return 1
# Verify Hub access before burning compute.
from src.hf_persistence import HubPersistor
persistor = HubPersistor(repo_id=REPO_ID, token=HF_TOKEN)
persistor.install_signal_handlers()
persistor.install_atexit()
persistor.start_periodic(interval_seconds=300)
# Trick HF Space health check (port 7860 must respond to be "Running").
def _start_dummy_server():
try:
handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", 7860), handler) as httpd:
httpd.serve_forever()
except Exception as e: # noqa: BLE001
print(f"[warn] dummy health server failed: {e}")
threading.Thread(target=_start_dummy_server, daemon=True).start()
print("[ok] dummy health server on :7860")
print(
f"\n--- PIPELINE: {SCENARIOS} scenarios, {EVAL_SEEDS} eval seeds, "
f"{WORKERS} workers ---"
)
cmd = [
sys.executable, "scripts/run_pipeline.py",
"--scenarios", SCENARIOS,
"--eval-seeds", EVAL_SEEDS,
"--workers", WORKERS,
]
rc = 1
try:
result = subprocess.run(cmd, cwd=str(ROOT))
rc = result.returncode
except Exception as e: # noqa: BLE001
print(f"[FATAL] pipeline subprocess raised: {e}")
status = "SUCCESS" if rc == 0 else f"FAILED (exit {rc})"
(ROOT / "results").mkdir(exist_ok=True)
(ROOT / "results" / "run_status.txt").write_text(
f"{status}\n{datetime.now(timezone.utc).isoformat()}\n",
encoding="utf-8",
)
# Always do a final consolidated upload, success or fail.
print("\n--- FINAL UPLOAD ---")
persistor.stop_periodic()
persistor.snapshot(msg=f"runner_final_{status.split()[0]}")
# Pause the Space to stop billing β only after final upload.
target_space = SPACE_ID
if not target_space:
print("[warn] SPACE_ID not set; skipping auto-pause. Pause manually in Settings.")
else:
try:
persistor.api.pause_space(repo_id=target_space)
print(f"[ok] paused {target_space}")
except Exception as e: # noqa: BLE001
print(f"[warn] auto-pause failed: {e} β pause manually to stop billing.")
return rc
if __name__ == "__main__":
sys.exit(main())
|