File size: 4,445 Bytes
906e104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""HF Space wrapper around scripts/run_pipeline.py.

Hardened for the "runtime ended β†’ models gone" failure mode:
  * Background HubPersistor uploads every 5 min (started by run_pipeline).
  * SIGTERM/SIGINT handlers do a final upload before exit.
  * `atexit` fallback if the OS kills us via SIGKILL after a SIGTERM warning.
  * `pip freeze` and `run_manifest.json` written for reproducibility.
  * Resilient: pipeline failure still triggers a best-effort artifact upload.

Required Space env vars (Settings β†’ Variables and secrets):
  HF_TOKEN  β€” fine-grained token with WRITE access to the model repo
  REPO_ID   β€” target model repo, e.g. "your-username/DAHS-Models"
  SPACE_ID  β€” (optional) "your-username/your-space-name" for auto-pause
"""
from __future__ import annotations

import http.server
import os
import socketserver
import subprocess
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path

ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT))

HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = os.environ.get("REPO_ID")
SPACE_ID = os.environ.get("SPACE_ID")  # set automatically inside a Space

# CPU-upgrade tier: 16 vCPUs. The pipeline is multiprocessing-bound, so we
# leave 1 core for the periodic uploader thread and use the rest for sims.
CPU_COUNT = os.cpu_count() or 8
WORKERS = str(max(2, CPU_COUNT - 1))

# Q1 budget: 5000 scenarios β†’ ~300k labeled snapshots; 1000 eval seeds
# (Friedman + Nemenyi over 1000 paired observations is well into asymptotic
# regime; Wilcoxon power on this n is essentially saturated).
SCENARIOS = os.environ.get("DAHS_SCENARIOS", "5000")
EVAL_SEEDS = os.environ.get("DAHS_EVAL_SEEDS", "1000")


def main() -> int:
    print("--- DAHS_2 HF RUNNER STARTING ---")
    print(f"Time : {datetime.now(timezone.utc).isoformat()}")
    print(f"CPUs : {CPU_COUNT}, workers={WORKERS}")
    print(f"Repo : {REPO_ID}")
    print(f"Space: {SPACE_ID}")

    if not HF_TOKEN or not REPO_ID:
        print("[FATAL] HF_TOKEN and REPO_ID env vars are required.")
        print("        Settings β†’ Variables and secrets β†’ add both.")
        return 1

    # Verify Hub access before burning compute.
    from src.hf_persistence import HubPersistor
    persistor = HubPersistor(repo_id=REPO_ID, token=HF_TOKEN)
    persistor.install_signal_handlers()
    persistor.install_atexit()
    persistor.start_periodic(interval_seconds=300)

    # Trick HF Space health check (port 7860 must respond to be "Running").
    def _start_dummy_server():
        try:
            handler = http.server.SimpleHTTPRequestHandler
            with socketserver.TCPServer(("", 7860), handler) as httpd:
                httpd.serve_forever()
        except Exception as e:  # noqa: BLE001
            print(f"[warn] dummy health server failed: {e}")

    threading.Thread(target=_start_dummy_server, daemon=True).start()
    print("[ok] dummy health server on :7860")

    print(
        f"\n--- PIPELINE: {SCENARIOS} scenarios, {EVAL_SEEDS} eval seeds, "
        f"{WORKERS} workers ---"
    )
    cmd = [
        sys.executable, "scripts/run_pipeline.py",
        "--scenarios",  SCENARIOS,
        "--eval-seeds", EVAL_SEEDS,
        "--workers",    WORKERS,
    ]

    rc = 1
    try:
        result = subprocess.run(cmd, cwd=str(ROOT))
        rc = result.returncode
    except Exception as e:  # noqa: BLE001
        print(f"[FATAL] pipeline subprocess raised: {e}")

    status = "SUCCESS" if rc == 0 else f"FAILED (exit {rc})"
    (ROOT / "results").mkdir(exist_ok=True)
    (ROOT / "results" / "run_status.txt").write_text(
        f"{status}\n{datetime.now(timezone.utc).isoformat()}\n",
        encoding="utf-8",
    )

    # Always do a final consolidated upload, success or fail.
    print("\n--- FINAL UPLOAD ---")
    persistor.stop_periodic()
    persistor.snapshot(msg=f"runner_final_{status.split()[0]}")

    # Pause the Space to stop billing β€” only after final upload.
    target_space = SPACE_ID
    if not target_space:
        print("[warn] SPACE_ID not set; skipping auto-pause. Pause manually in Settings.")
    else:
        try:
            persistor.api.pause_space(repo_id=target_space)
            print(f"[ok] paused {target_space}")
        except Exception as e:  # noqa: BLE001
            print(f"[warn] auto-pause failed: {e} β€” pause manually to stop billing.")

    return rc


if __name__ == "__main__":
    sys.exit(main())