feather-a10g-large-runtime / overlay /scripts /monitor_prod9_eval_train.py
icarus112's picture
Update Feather a10g-large training runtime image
c475135 verified
Raw
History Blame Contribute Delete
3.25 kB
#!/usr/bin/env python3
"""Monitor prod9 eval-then-train job and report English eval results.
Polls HF job logs, extracts:
- Eval phase: BPB, PPL, ROUGE, BLEU
- Training phase: step, loss, bpb, tps, val_bpb
"""
from __future__ import annotations
import json, os, re, subprocess, sys
from pathlib import Path
NAMESPACE = "GAInTech"
JOB_ID_FILE = Path(__file__).resolve().parents[1] / ".logs" / "last_job_id.txt"
def get_job_id() -> str:
if JOB_ID_FILE.exists():
return JOB_ID_FILE.read_text().strip()
return ""
def fetch_logs(job_id: str) -> str:
try:
r = subprocess.run(
["hf", "jobs", "logs", "--namespace", NAMESPACE, job_id, "--tail", "200"],
capture_output=True, text=True, timeout=60,
)
return r.stdout
except Exception as e:
return f"[ERROR] {e}"
def parse_eval_results(logs: str) -> dict | None:
"""Extract English eval metrics from log."""
# Look for [BASELINE] bpb=... ppl=...
m = re.search(r"\[BASELINE\] bpb=([\d\.]+) ppl=([\d\.]+)", logs)
if not m:
return None
bpb, ppl = float(m.group(1)), float(m.group(2))
# Look for [ENGLISH_EVAL] ROUGE-1=... ROUGE-2=... ROUGE-L=... BLEU=...
m2 = re.search(
r"\[ENGLISH_EVAL\] ROUGE-1=([\d\.]+) ROUGE-2=([\d\.]+) ROUGE-L=([\d\.]+) BLEU=([\d\.]+)",
logs,
)
rouge1 = rouge2 = rougeL = bleu = None
if m2:
rouge1, rouge2, rougeL, bleu = map(float, m2.groups())
return {
"bpb": bpb,
"ppl": ppl,
"rouge1": rouge1,
"rouge2": rouge2,
"rougeL": rougeL,
"bleu": bleu,
}
def parse_training_metrics(logs: str) -> list[dict]:
"""Extract step/loss/bpb/tps lines from training log."""
metrics = []
for line in logs.splitlines():
m = re.search(r"step=(\d+).*loss=([\d\.]+).*bpb=([\d\.]+).*tps=(\d+)", line)
if m:
metrics.append({
"step": int(m.group(1)),
"loss": float(m.group(2)),
"bpb": float(m.group(3)),
"tps": int(m.group(4)),
})
return metrics
def main() -> None:
job_id = get_job_id()
if not job_id:
print("[monitor] no job_id found", file=sys.stderr)
sys.exit(1)
logs = fetch_logs(job_id)
# Check eval results
eval_results = parse_eval_results(logs)
if eval_results:
print("[EVAL_RESULTS] baseline eval found:")
print(json.dumps(eval_results, indent=2))
else:
print("[monitor] eval phase not yet complete or not found in tail")
# Check training metrics
metrics = parse_training_metrics(logs)
if metrics:
latest = metrics[-1]
print(f"[TRAIN] latest step={latest['step']} loss={latest['loss']:.4f} bpb={latest['bpb']:.4f} tps={latest['tps']}")
if len(metrics) >= 2:
prev = metrics[-2]
bpb_delta = latest['bpb'] - prev['bpb']
print(f"[TRAIN] delta bpb={bpb_delta:+.4f} (lower=better)")
# Check for checkpoint saves
ckpt_matches = re.findall(r"\[ckpt\] saved .* \(step=(\d+)\)", logs)
if ckpt_matches:
print(f"[CKPT] latest checkpoint at step={ckpt_matches[-1]}")
if __name__ == "__main__":
main()