blt-reasoner-pilot1 / code /scripts /hf_auto_push.py
LauraGG's picture
Refresh code/ with latest BLT-Reasoner sources (post-campaign)
bc7101b verified
"""Auto-push new pilot artifacts to HF as they appear.
Designed to run as a long-lived (~8 h) nohup'd daemon on the training box.
Token is read from stdin and held only in process memory.
Polls every 2 min for:
- new local ckpts (ckpt-step10000, ckpt-step12000): pushes each as
`ckpts/<name>/` under the repo.
- `final/` directory + `final/ablation_n200.json` (the auto-eval result):
pushes both, regenerates the README with the full ablation table.
Exits after final/ is pushed, or after `--max_hours` (default 8) regardless.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import sys
import time
from pathlib import Path
def push_folder(api, folder, path_in_repo, repo, msg):
api.upload_folder(
folder_path=str(folder),
path_in_repo=path_in_repo,
repo_id=repo,
repo_type="model",
commit_message=msg,
)
def push_file(api, fpath, path_in_repo, repo, msg):
api.upload_file(
path_or_fileobj=str(fpath),
path_in_repo=path_in_repo,
repo_id=repo,
repo_type="model",
commit_message=msg,
)
def ckpt_complete(ckpt_dir: Path) -> bool:
"""A ckpt is complete iff all 3 expected artifacts exist with nonzero size."""
for rel in ("model/adapter_model.safetensors", "projector.pt", "head.pt"):
p = ckpt_dir / rel
if not p.exists() or p.stat().st_size == 0:
return False
return True
def regen_readme(pilot_dir: Path, code_dir: Path, repo: str) -> str:
"""Re-import the upload helper from the local code dir."""
sys.path.insert(0, str(code_dir.parent))
from experiments.blt_reasoner.scripts.hf_upload_pilot import build_readme
return build_readme(pilot_dir, code_dir, repo)
def main():
p = argparse.ArgumentParser()
p.add_argument("--repo", required=True)
p.add_argument("--pilot_dir", required=True)
p.add_argument("--code_dir", required=True)
p.add_argument("--train_pid", type=int, required=True)
p.add_argument("--max_hours", type=float, default=8.0)
p.add_argument("--log", default=None)
p.add_argument("--ckpts_to_watch", default="ckpt-step10000,ckpt-step12000")
args = p.parse_args()
log_path = args.log or os.path.join(args.pilot_dir, "auto_push.log")
log_f = open(log_path, "a", buffering=1)
def log(m):
line = f"[{time.strftime('%H:%M:%S')}] {m}"
print(line, flush=True)
log_f.write(line + "\n")
# Token source: env var BLT_HF_TOKEN (used when launched via `nohup &`,
# which detaches stdin). Falls back to stdin for interactive launches.
token = os.environ.pop("BLT_HF_TOKEN", "").strip()
if not token:
try:
token = sys.stdin.read().strip()
except Exception:
token = ""
if not token.startswith("hf_"):
log("ERROR: no hf_ token in BLT_HF_TOKEN env or stdin; aborting")
sys.exit(2)
from huggingface_hub import HfApi
api = HfApi(token=token)
pilot = Path(args.pilot_dir)
code = Path(args.code_dir)
watchlist = [c.strip() for c in args.ckpts_to_watch.split(",") if c.strip()]
pushed = set()
log(f"daemon start: repo={args.repo} pilot={pilot} train_pid={args.train_pid} watching={watchlist}")
deadline = time.time() + args.max_hours * 3600
final_pushed = False
while time.time() < deadline and not final_pushed:
# 1. Push any newly-complete ckpts
for ckpt_name in watchlist:
if ckpt_name in pushed:
continue
ckpt = pilot / ckpt_name
if ckpt.exists() and ckpt_complete(ckpt):
# Wait a bit in case files are still flushing
time.sleep(15)
try:
log(f"pushing {ckpt_name}")
push_folder(api, ckpt, f"ckpts/{ckpt_name}", args.repo,
f"Add {ckpt_name}")
pushed.add(ckpt_name)
log(f" ok: {ckpt_name}")
except Exception as e:
log(f" ERROR pushing {ckpt_name}: {e!r}; will retry")
# 2. Check if training has exited and final is ready
try:
os.kill(args.train_pid, 0)
train_alive = True
except ProcessLookupError:
train_alive = False
except PermissionError:
train_alive = True
if not train_alive:
# Auto-eval poller may still be running. Wait until ablation_n200.json appears.
final_dir = pilot / "final"
final_abl = final_dir / "ablation_n200.json"
if final_dir.exists() and final_abl.exists():
# Wait a bit for any final flushes
time.sleep(30)
try:
log("pushing final/ (ckpt + ablation_n200.json)")
push_folder(api, final_dir, "final", args.repo,
"Add final ckpt + n=200 pre-registered z-ablation")
# Also push the auto-eval logs
for name in ("auto_eval.log", "run.log", "metrics.jsonl"):
f = pilot / name
if f.exists():
try:
push_file(api, f, f"logs/{name}", args.repo,
f"Refresh logs/{name} at end of pilot")
except Exception as e:
log(f" warn: log push {name}: {e!r}")
# Regenerate README with final ablation table
try:
readme = regen_readme(pilot, code, args.repo)
tmp = Path("/tmp/blt_final_readme.md")
tmp.write_text(readme)
push_file(api, tmp, "README.md", args.repo,
"Regenerate README with final ablation results")
tmp.unlink(missing_ok=True)
except Exception as e:
log(f" warn: README regen: {e!r}")
final_pushed = True
log("DONE: final pushed; daemon exiting")
except Exception as e:
log(f" ERROR pushing final: {e!r}; will retry")
else:
log(f"train PID gone but final/ablation_n200.json not yet present; waiting")
if not final_pushed:
time.sleep(120)
if not final_pushed:
log(f"deadline reached or daemon exiting without final push (pushed={pushed})")
log_f.close()
if __name__ == "__main__":
main()