pmc-worker / app.py
Nottybro's picture
Update app.py
3c5cdcf verified
Raw
History Blame Contribute Delete
17.1 kB
"""
PMC Worker — Final
Key fixes:
1. Per-account push rate limiting (one push per account at a time, 2min between)
2. 429 backoff — wait 5min before retrying that account
3. No external API — everything in this one space
"""
import os,json,time,random,threading,shutil,tempfile,subprocess,requests
from pathlib import Path
from datetime import datetime,timezone
import gradio as gr
# ── PASTE YOUR ACCOUNTS ───────────────────────────────────────
KAGGLE_ACCOUNTS = [
{
"username": "nottybro",
"key": "5afaf2c20f92beea7a171bcd69dca17e"
},
{
"username": "imimpossible",
"key": "3a82dc27094177b4c75c34f71aabda0d"
},
{
"username": "krushnadere",
"key": "6cfe3c35e98a63bd735fc19401b16e93"
}
]
# ─────────────────────────────────────────────────────────────
TOTAL_SESSIONS = 15
MAX_PER_ACCOUNT = 5
SESSIONS_PER_ACC = MAX_PER_ACCOUNT
PUSH_GAP_MIN = 25 # global gap between any two pushes
PUSH_GAP_MAX = 35
ACC_PUSH_COOLDOWN = 120 # min seconds between pushes TO SAME ACCOUNT
MONITOR_SECS = 90
PUSH_COOLDOWN = 10*60 # don't check kernel status for 10min after push
BACKOFF_429 = 5*60 # wait 5min after 429 before retrying that account
SCRAPER_DIR = Path("scrapers")
sessions = {}
state_lock = threading.Lock()
global_push_lock = threading.Lock() # one push at a time globally
acc_locks = {i: threading.Lock() for i in range(len(KAGGLE_ACCOUNTS))}
last_push_t = 0.0 # global last push timestamp
acc_last_push = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account
acc_backoff = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account backoff until
log_lines = []
_mon_started = False
def log(msg):
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
ln = f"[{ts}] {msg}"
print(ln)
with state_lock:
log_lines.append(ln)
if len(log_lines) > 500: log_lines.pop(0)
def _now(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _slug(sid,run): return f"pmc-s{sid}-r{run}"
def _kurl(u,s): return f"https://www.kaggle.com/code/{u}/{s}"
def acc_idx(sid): return min(sid // SESSIONS_PER_ACC, len(KAGGLE_ACCOUNTS)-1)
def acc(sid): return KAGGLE_ACCOUNTS[acc_idx(sid)]
def chunk_size(sid):
f = SCRAPER_DIR / f"scraper_s{sid}.py"
if not f.exists(): return 0
try:
for line in f.read_text().splitlines()[:5]:
if "TOTAL_BUILDS" in line and "=" in line:
return int(line.split("=")[1].strip())
except: pass
return 0
for sid in range(TOTAL_SESSIONS):
sessions[sid] = {
"session_id":sid, "username":acc(sid)["username"],
"current_slug":None, "current_url":None, "run_number":0,
"status":"idle", "last_check":None, "last_push":None,
"last_push_ts":0.0, "start_idx":0, "current_idx":0,
"total_builds":chunk_size(sid),
"success":0, "skip":0, "fail":0, "complete":False,
"error_msg":"", "history":[],
}
def _kenv(a):
cfg = Path(f"/tmp/kcfg_{a['username']}")
cfg.mkdir(exist_ok=True)
cred = cfg/"kaggle.json"
cred.write_text(json.dumps({"username":a["username"],"key":a["key"]}))
os.chmod(str(cred), 0o600)
return {**os.environ, "KAGGLE_CONFIG_DIR":str(cfg)}
def kstatus(a, slug):
from requests.auth import HTTPBasicAuth
try:
r = requests.get(
f"https://www.kaggle.com/api/v1/kernels/{a['username']}/{slug}",
auth=HTTPBasicAuth(a["username"],a["key"]), timeout=20)
if r.status_code == 404: return "not_found"
r.raise_for_status()
return r.json().get("currentRunningVersion",{}).get("status","unknown")
except Exception as e: return f"api_error:{str(e)[:50]}"
def kdownload_state(a, slug):
try:
tmp = tempfile.mkdtemp()
subprocess.run(
["kaggle","kernels","output",f"{a['username']}/{slug}",
"--file-pattern","run_state.json","-p",tmp],
env=_kenv(a), capture_output=True, text=True, timeout=90)
for f in Path(tmp).rglob("run_state.json"):
return json.loads(f.read_text())
return None
except Exception as e:
log(f"State dl fail {slug}: {e}"); return None
def push_kernel(a, sid, run_num, start_idx):
"""Push with per-account rate limiting and 429 backoff."""
global last_push_t
aidx = acc_idx(sid)
sf = SCRAPER_DIR / f"scraper_s{sid}.py"
if not sf.exists():
return False, "", f"scraper_s{sid}.py missing"
with global_push_lock:
# Check per-account backoff (from previous 429)
backoff_until = acc_backoff.get(aidx, 0.0)
now = time.time()
if now < backoff_until:
wait = backoff_until - now
log(f"Account {aidx} in 429 backoff, {int(wait)}s remaining")
return False, "", f"account_backoff_{int(wait)}s"
# Global gap between any two pushes
global_wait = PUSH_GAP_MIN + random.random()*(PUSH_GAP_MAX-PUSH_GAP_MIN) - (now-last_push_t)
if global_wait > 0:
log(f"Global push gap {global_wait:.1f}s")
time.sleep(global_wait)
# Per-account gap
acc_wait = ACC_PUSH_COOLDOWN - (time.time() - acc_last_push.get(aidx, 0.0))
if acc_wait > 0:
log(f"Account {aidx} per-account gap {acc_wait:.1f}s")
time.sleep(acc_wait)
slug = _slug(sid, run_num)
url = _kurl(a["username"], slug)
script = sf.read_text(encoding="utf-8").replace("__START_IDX__", str(start_idx))
script = f"# PMC s{sid} r{run_num} start={start_idx} seed={random.randint(10000,99999)}\n" + script
tmp = tempfile.mkdtemp()
try:
(Path(tmp)/"main.py").write_text(script, encoding="utf-8")
meta = {
"id":f"{a['username']}/{slug}", "title":f"PMC s{sid} r{run_num}",
"code_file":"main.py", "language":"python", "kernel_type":"script",
"is_private":False, "enable_gpu":False, "enable_internet":True,
"dataset_sources":[], "kernel_sources":[],
}
(Path(tmp)/"kernel-metadata.json").write_text(json.dumps(meta))
result = subprocess.run(
["kaggle","kernels","push","-p",tmp],
env=_kenv(a), capture_output=True, text=True, timeout=120)
if result.returncode != 0:
err = result.stderr[:300] or result.stdout[:300]
# 429 — set per-account backoff
if "429" in err:
acc_backoff[aidx] = time.time() + BACKOFF_429
log(f"429 on account {aidx} — backoff {BACKOFF_429//60}min")
raise Exception(err)
last_push_t = time.time()
acc_last_push[aidx] = time.time()
log(f"Pushed {slug} (s{sid} r{run_num} start={start_idx}) -> {url}")
return True, slug, url
except Exception as e:
log(f"Push fail {slug}: {str(e)[:200]}")
return False, "", str(e)[:200]
finally:
shutil.rmtree(tmp, ignore_errors=True)
RESTART_ON = {"complete","error","cancelAcknowledged","failed"}
def handle(sid):
with state_lock:
if sessions[sid]["complete"]: return
s = sessions[sid]
a = acc(sid)
slug = s["current_slug"]
if slug is None:
with state_lock:
rn = sessions[sid]["run_number"] + 1
si = sessions[sid]["start_idx"]
sessions[sid].update({"run_number":rn, "status":"pushing"})
ok, ns, url = push_kernel(a, sid, rn, si)
with state_lock:
if ok:
sessions[sid].update({"current_slug":ns,"current_url":url,
"status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""})
sessions[sid]["history"].append({"run_number":rn,"slug":ns,
"url":url,"pushed_at":_now(),"start_idx":si})
else:
sessions[sid].update({"status":"push_failed","error_msg":url})
return
# Cooldown
with state_lock:
since = time.time() - sessions[sid]["last_push_ts"]
if since < PUSH_COOLDOWN:
return # silent — no need to spam log
ks = kstatus(a, slug)
with state_lock:
sessions[sid]["last_check"] = _now()
sessions[sid]["status"] = ks
if ks in {"running","queued","inQueue"}: return
if ks == "not_found" or ks.startswith("api_error"):
log(f"s{sid} transient {ks}"); return
if ks in RESTART_ON:
state = kdownload_state(a, slug)
with state_lock:
if state:
sessions[sid]["success"] += state.get("success",0)
sessions[sid]["skip"] += state.get("skip",0)
sessions[sid]["fail"] += state.get("fail",0)
next_idx = state.get("next_idx")
is_done = state.get("complete",False)
reason = state.get("stop_reason","unknown")
else:
cur = sessions[sid]["current_idx"]
next_idx = cur + 1
is_done = next_idx >= sessions[sid]["total_builds"]
reason = "no_state"
log(f"WARNING: no state for {slug}")
for h in sessions[sid]["history"]:
if h["slug"] == slug:
h.update({"stopped_at":_now(),"stop_reason":reason}); break
if is_done:
sessions[sid].update({"complete":True,"status":"done"})
log(f"s{sid} COMPLETE!"); return
sessions[sid]["start_idx"] = next_idx
with state_lock:
rn = sessions[sid]["run_number"] + 1
sessions[sid].update({"run_number":rn,"status":"pushing"})
log(f"s{sid}: {reason} -> restart idx={next_idx} run#{rn}")
ok, ns, url = push_kernel(a, sid, rn, next_idx)
with state_lock:
if ok:
sessions[sid].update({"current_slug":ns,"current_url":url,
"status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""})
sessions[sid]["history"].append({"run_number":rn,"slug":ns,
"url":url,"pushed_at":_now(),"start_idx":next_idx})
else:
sessions[sid].update({"status":"push_failed","error_msg":url})
def monitor():
log(f"Monitor: {TOTAL_SESSIONS} sessions | {MAX_PER_ACCOUNT}/account | "
f"acc_gap={ACC_PUSH_COOLDOWN}s global_gap={PUSH_GAP_MIN}-{PUSH_GAP_MAX}s")
# Stagger initial pushes — one per account at a time
def initial_push():
for sid in range(TOTAL_SESSIONS):
try: handle(sid)
except Exception as e: log(f"Init push err s{sid}: {e}")
threading.Thread(target=initial_push, daemon=True).start()
while True:
time.sleep(MONITOR_SECS)
for sid in list(sessions):
try: handle(sid)
except Exception as e: log(f"Monitor err s{sid}: {e}")
def start_monitor():
global _mon_started
if _mon_started: return
_mon_started = True
threading.Thread(target=monitor, daemon=True).start()
start_monitor()
# ── UI ────────────────────────────────────────────────────────
BADGE = {"idle":"⚪","pushing":"🔄","queued":"🟡","inQueue":"🟡",
"running":"🟢","complete":"✅","done":"🏁","error":"🔴",
"push_failed":"🔴","not_found":"⏳","starting":"⏳"}
def _b(st):
for k,v in BADGE.items():
if k in st: return f"{v} {st}"
return f"❓ {st}"
def sess_table():
with state_lock:
rows = []
for _,s in sorted(sessions.items()):
pct = round(s["current_idx"]/max(s["total_builds"],1)*100,1)
rows.append([
s["session_id"], s["username"][:14], _b(s["status"]),
s["run_number"], s["success"],
f'{s["current_idx"]}/{s["total_builds"]} ({pct}%)',
(s["current_url"] or "-")[:55],
(s["last_check"] or "-")[-8:],
])
return rows
def summary():
with state_lock:
vals = list(sessions.values())
done = sum(1 for s in vals if s["complete"])
run = sum(1 for s in vals if "running" in s["status"])
q = sum(1 for s in vals if "queue" in s["status"].lower()
or "starting" in s["status"])
ok = sum(s["success"] for s in vals)
total = sum(s["total_builds"] for s in vals)
curr = sum(s["current_idx"] for s in vals)
pct = round(curr/max(total,1)*100,1)
bar = "X"*int(pct/5)+"."*(20-int(pct/5))
# Show per-account backoff status
acc_status = []
for i,a in enumerate(KAGGLE_ACCOUNTS):
bf = acc_backoff.get(i,0.0)
remaining = max(0, int(bf-time.time()))
if remaining > 0:
acc_status.append(f"{a['username']}: 🔴backoff {remaining}s")
else:
acc_status.append(f"{a['username']}: ✅ready")
return (f"**{done}/{TOTAL_SESSIONS} done** | 🟢{run} running 🟡{q} queued | "
f"**{ok:,} built** | [{bar}] {pct}%\n\n"
f"Accounts: {' | '.join(acc_status)}")
def hist_table():
with state_lock:
rows = []
for _,s in sorted(sessions.items()):
for h in reversed(s["history"][-3:]):
rows.append([s["session_id"],h["run_number"],h["slug"],
h.get("start_idx","?"),h.get("stop_reason","-")[:25],
h.get("pushed_at","-")[-8:],h.get("stopped_at","-")[-8:],h["url"]])
return rows
def all_links():
with state_lock:
lines = []
for _,s in sorted(sessions.items()):
for h in s["history"]:
lines.append(f"s{s['session_id']} r{h['run_number']} "
f"idx={h.get('start_idx','?')} {h['url']}")
return "\n".join(lines) or "No kernels pushed yet."
def get_log():
with state_lock: return "\n".join(log_lines[-100:])
def force_restart(sid_str):
try: sid = int(sid_str.strip())
except: return "Invalid ID"
if sid not in sessions: return f"s{sid} not found"
with state_lock:
sessions[sid]["current_slug"] = None
sessions[sid]["last_push_ts"] = 0.0
threading.Thread(target=handle,args=(sid,),daemon=True).start()
return f"Restart triggered for s{sid}"
def reset_backoff(acc_str):
try: aidx = int(acc_str.strip())
except: return "Enter account index (0, 1, or 2)"
if aidx not in acc_backoff: return f"Account {aidx} not found"
acc_backoff[aidx] = 0.0
return f"Backoff cleared for account {aidx} ({KAGGLE_ACCOUNTS[aidx]['username']})"
R = 30
with gr.Blocks(title="PMC Worker") as demo:
gr.Markdown(
f"## PMC Worker — {TOTAL_SESSIONS} sessions "
f"({MAX_PER_ACCOUNT}/account × {len(KAGGLE_ACCOUNTS)} accounts)\n"
f"*Per-account push gap: {ACC_PUSH_COOLDOWN}s | "
f"429 backoff: {BACKOFF_429//60}min | "
f"Kernel cooldown: {PUSH_COOLDOWN//60}min*"
)
gr.Markdown(value=summary, every=R)
with gr.Tabs():
with gr.Tab("Sessions"):
gr.Dataframe(
headers=["Sess","Account","Status","Runs","Built",
"Progress","Kernel URL","Last check"],
value=sess_table, every=R, interactive=False, wrap=True)
with gr.Row():
sid_in = gr.Textbox(label="Session ID to restart", scale=1)
rb = gr.Button("Force restart", scale=1)
ro = gr.Textbox(label="Result", scale=3, interactive=False)
rb.click(force_restart, inputs=sid_in, outputs=ro)
with gr.Row():
acc_in = gr.Textbox(label="Account idx (0/1/2) to clear backoff", scale=1)
ab = gr.Button("Clear 429 backoff", scale=1)
ao = gr.Textbox(label="Result", scale=3, interactive=False)
ab.click(reset_backoff, inputs=acc_in, outputs=ao)
with gr.Tab("History"):
gr.Dataframe(
headers=["Sess","Run","Slug","Start idx","Stop reason",
"Pushed","Stopped","URL"],
value=hist_table, every=R, interactive=False, wrap=True)
with gr.Tab("All links"):
gr.Textbox(value=all_links, every=R, lines=40,
interactive=False, label="Every kernel URL")
with gr.Tab("Log"):
gr.Textbox(value=get_log, every=15, lines=30,
interactive=False, label="Log")
demo.queue()
demo.launch()