feather-a10g-gt80k-runtime-public / scripts /monitor_feather_cron.py
icarus112's picture
Upload folder using huggingface_hub
422445b verified
#!/usr/bin/env python3
import os
import subprocess
import json
import time
NAMESPACE = "GAInTech"
JOB_ID = os.environ.get("FEATHER_ACTIVE_JOB_ID")
def get_job_status(job_id):
try:
raw = subprocess.check_output(["hf", "jobs", "inspect", "--namespace", NAMESPACE, job_id, "--format", "json"], text=True)
data = json.loads(raw)
if not data: return None
return data[0]
except:
return None
def get_job_logs(job_id, lines=50):
try:
return subprocess.check_output(["hf", "jobs", "logs", "--namespace", NAMESPACE, job_id, "--tail", str(lines)], text=True)
except:
return ""
def main():
if not JOB_ID:
print("FEATHER_ACTIVE_JOB_ID not set. Checking for running/pending jobs...")
raw = subprocess.check_output(["hf", "jobs", "ps", "-a", "--namespace", NAMESPACE, "--format", "json"], text=True)
jobs = json.loads(raw)
if not jobs:
print("No jobs found in namespace.")
return
# Filter for RUNNING/PENDING/SCHEDULING/INITIALIZING
active_stages = {"RUNNING", "PENDING", "SCHEDULING", "INITIALIZING"}
active_jobs = [j for j in jobs if j.get("status", {}).get("stage") in active_stages]
if not active_jobs:
print(f"No active jobs found. Latest job: {jobs[0]['id']} ({jobs[0]['status']['stage']})")
return
job_id = active_jobs[0]["id"]
else:
job_id = JOB_ID
status_data = get_job_status(job_id)
if not status_data:
print(f"Job {job_id} not found.")
return
stage = status_data.get("status", {}).get("stage", "UNKNOWN")
print(f"Job: {job_id} | Stage: {stage}")
if stage in ["ERROR", "FAILED", "CANCELLED", "COMPLETED"]:
print(f"TERMINAL STATE: {stage}. Intervention required.")
return
logs = get_job_logs(job_id)
last_step_line = ""
for line in logs.splitlines():
if "step=" in line:
last_step_line = line
if last_step_line:
print(f"LATEST TELEMETRY: {last_step_line}")
# Parse TPS and BPB
try:
parts = last_step_line.split()
tps = 0
bpb = 0
for p in parts:
if p.startswith("tps="): tps = float(p.split("=")[1])
if p.startswith("bpb="): bpb = float(p.split("=")[1])
if tps < 100000 and tps > 0:
print(f"CRITICAL: TPS is {tps}, which is below 150k target. Checking bottlenecks...")
if bpb > 3.5:
print(f"WARNING: BPB is {bpb}, high divergence risk.")
except:
pass
else:
print("No telemetry found in logs yet.")
if __name__ == "__main__":
main()