""" PMC Worker — Final Key fixes: 1. Per-account push rate limiting (one push per account at a time, 2min between) 2. 429 backoff — wait 5min before retrying that account 3. No external API — everything in this one space """ import os,json,time,random,threading,shutil,tempfile,subprocess,requests from pathlib import Path from datetime import datetime,timezone import gradio as gr # ── PASTE YOUR ACCOUNTS ─────────────────────────────────────── KAGGLE_ACCOUNTS = [ { "username": "nottybro", "key": "5afaf2c20f92beea7a171bcd69dca17e" }, { "username": "imimpossible", "key": "3a82dc27094177b4c75c34f71aabda0d" }, { "username": "krushnadere", "key": "6cfe3c35e98a63bd735fc19401b16e93" } ] # ───────────────────────────────────────────────────────────── TOTAL_SESSIONS = 15 MAX_PER_ACCOUNT = 5 SESSIONS_PER_ACC = MAX_PER_ACCOUNT PUSH_GAP_MIN = 25 # global gap between any two pushes PUSH_GAP_MAX = 35 ACC_PUSH_COOLDOWN = 120 # min seconds between pushes TO SAME ACCOUNT MONITOR_SECS = 90 PUSH_COOLDOWN = 10*60 # don't check kernel status for 10min after push BACKOFF_429 = 5*60 # wait 5min after 429 before retrying that account SCRAPER_DIR = Path("scrapers") sessions = {} state_lock = threading.Lock() global_push_lock = threading.Lock() # one push at a time globally acc_locks = {i: threading.Lock() for i in range(len(KAGGLE_ACCOUNTS))} last_push_t = 0.0 # global last push timestamp acc_last_push = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account acc_backoff = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account backoff until log_lines = [] _mon_started = False def log(msg): ts = datetime.now(timezone.utc).strftime("%H:%M:%S") ln = f"[{ts}] {msg}" print(ln) with state_lock: log_lines.append(ln) if len(log_lines) > 500: log_lines.pop(0) def _now(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _slug(sid,run): return f"pmc-s{sid}-r{run}" def _kurl(u,s): return f"https://www.kaggle.com/code/{u}/{s}" def acc_idx(sid): return min(sid // SESSIONS_PER_ACC, len(KAGGLE_ACCOUNTS)-1) def acc(sid): return KAGGLE_ACCOUNTS[acc_idx(sid)] def chunk_size(sid): f = SCRAPER_DIR / f"scraper_s{sid}.py" if not f.exists(): return 0 try: for line in f.read_text().splitlines()[:5]: if "TOTAL_BUILDS" in line and "=" in line: return int(line.split("=")[1].strip()) except: pass return 0 for sid in range(TOTAL_SESSIONS): sessions[sid] = { "session_id":sid, "username":acc(sid)["username"], "current_slug":None, "current_url":None, "run_number":0, "status":"idle", "last_check":None, "last_push":None, "last_push_ts":0.0, "start_idx":0, "current_idx":0, "total_builds":chunk_size(sid), "success":0, "skip":0, "fail":0, "complete":False, "error_msg":"", "history":[], } def _kenv(a): cfg = Path(f"/tmp/kcfg_{a['username']}") cfg.mkdir(exist_ok=True) cred = cfg/"kaggle.json" cred.write_text(json.dumps({"username":a["username"],"key":a["key"]})) os.chmod(str(cred), 0o600) return {**os.environ, "KAGGLE_CONFIG_DIR":str(cfg)} def kstatus(a, slug): from requests.auth import HTTPBasicAuth try: r = requests.get( f"https://www.kaggle.com/api/v1/kernels/{a['username']}/{slug}", auth=HTTPBasicAuth(a["username"],a["key"]), timeout=20) if r.status_code == 404: return "not_found" r.raise_for_status() return r.json().get("currentRunningVersion",{}).get("status","unknown") except Exception as e: return f"api_error:{str(e)[:50]}" def kdownload_state(a, slug): try: tmp = tempfile.mkdtemp() subprocess.run( ["kaggle","kernels","output",f"{a['username']}/{slug}", "--file-pattern","run_state.json","-p",tmp], env=_kenv(a), capture_output=True, text=True, timeout=90) for f in Path(tmp).rglob("run_state.json"): return json.loads(f.read_text()) return None except Exception as e: log(f"State dl fail {slug}: {e}"); return None def push_kernel(a, sid, run_num, start_idx): """Push with per-account rate limiting and 429 backoff.""" global last_push_t aidx = acc_idx(sid) sf = SCRAPER_DIR / f"scraper_s{sid}.py" if not sf.exists(): return False, "", f"scraper_s{sid}.py missing" with global_push_lock: # Check per-account backoff (from previous 429) backoff_until = acc_backoff.get(aidx, 0.0) now = time.time() if now < backoff_until: wait = backoff_until - now log(f"Account {aidx} in 429 backoff, {int(wait)}s remaining") return False, "", f"account_backoff_{int(wait)}s" # Global gap between any two pushes global_wait = PUSH_GAP_MIN + random.random()*(PUSH_GAP_MAX-PUSH_GAP_MIN) - (now-last_push_t) if global_wait > 0: log(f"Global push gap {global_wait:.1f}s") time.sleep(global_wait) # Per-account gap acc_wait = ACC_PUSH_COOLDOWN - (time.time() - acc_last_push.get(aidx, 0.0)) if acc_wait > 0: log(f"Account {aidx} per-account gap {acc_wait:.1f}s") time.sleep(acc_wait) slug = _slug(sid, run_num) url = _kurl(a["username"], slug) script = sf.read_text(encoding="utf-8").replace("__START_IDX__", str(start_idx)) script = f"# PMC s{sid} r{run_num} start={start_idx} seed={random.randint(10000,99999)}\n" + script tmp = tempfile.mkdtemp() try: (Path(tmp)/"main.py").write_text(script, encoding="utf-8") meta = { "id":f"{a['username']}/{slug}", "title":f"PMC s{sid} r{run_num}", "code_file":"main.py", "language":"python", "kernel_type":"script", "is_private":False, "enable_gpu":False, "enable_internet":True, "dataset_sources":[], "kernel_sources":[], } (Path(tmp)/"kernel-metadata.json").write_text(json.dumps(meta)) result = subprocess.run( ["kaggle","kernels","push","-p",tmp], env=_kenv(a), capture_output=True, text=True, timeout=120) if result.returncode != 0: err = result.stderr[:300] or result.stdout[:300] # 429 — set per-account backoff if "429" in err: acc_backoff[aidx] = time.time() + BACKOFF_429 log(f"429 on account {aidx} — backoff {BACKOFF_429//60}min") raise Exception(err) last_push_t = time.time() acc_last_push[aidx] = time.time() log(f"Pushed {slug} (s{sid} r{run_num} start={start_idx}) -> {url}") return True, slug, url except Exception as e: log(f"Push fail {slug}: {str(e)[:200]}") return False, "", str(e)[:200] finally: shutil.rmtree(tmp, ignore_errors=True) RESTART_ON = {"complete","error","cancelAcknowledged","failed"} def handle(sid): with state_lock: if sessions[sid]["complete"]: return s = sessions[sid] a = acc(sid) slug = s["current_slug"] if slug is None: with state_lock: rn = sessions[sid]["run_number"] + 1 si = sessions[sid]["start_idx"] sessions[sid].update({"run_number":rn, "status":"pushing"}) ok, ns, url = push_kernel(a, sid, rn, si) with state_lock: if ok: sessions[sid].update({"current_slug":ns,"current_url":url, "status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""}) sessions[sid]["history"].append({"run_number":rn,"slug":ns, "url":url,"pushed_at":_now(),"start_idx":si}) else: sessions[sid].update({"status":"push_failed","error_msg":url}) return # Cooldown with state_lock: since = time.time() - sessions[sid]["last_push_ts"] if since < PUSH_COOLDOWN: return # silent — no need to spam log ks = kstatus(a, slug) with state_lock: sessions[sid]["last_check"] = _now() sessions[sid]["status"] = ks if ks in {"running","queued","inQueue"}: return if ks == "not_found" or ks.startswith("api_error"): log(f"s{sid} transient {ks}"); return if ks in RESTART_ON: state = kdownload_state(a, slug) with state_lock: if state: sessions[sid]["success"] += state.get("success",0) sessions[sid]["skip"] += state.get("skip",0) sessions[sid]["fail"] += state.get("fail",0) next_idx = state.get("next_idx") is_done = state.get("complete",False) reason = state.get("stop_reason","unknown") else: cur = sessions[sid]["current_idx"] next_idx = cur + 1 is_done = next_idx >= sessions[sid]["total_builds"] reason = "no_state" log(f"WARNING: no state for {slug}") for h in sessions[sid]["history"]: if h["slug"] == slug: h.update({"stopped_at":_now(),"stop_reason":reason}); break if is_done: sessions[sid].update({"complete":True,"status":"done"}) log(f"s{sid} COMPLETE!"); return sessions[sid]["start_idx"] = next_idx with state_lock: rn = sessions[sid]["run_number"] + 1 sessions[sid].update({"run_number":rn,"status":"pushing"}) log(f"s{sid}: {reason} -> restart idx={next_idx} run#{rn}") ok, ns, url = push_kernel(a, sid, rn, next_idx) with state_lock: if ok: sessions[sid].update({"current_slug":ns,"current_url":url, "status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""}) sessions[sid]["history"].append({"run_number":rn,"slug":ns, "url":url,"pushed_at":_now(),"start_idx":next_idx}) else: sessions[sid].update({"status":"push_failed","error_msg":url}) def monitor(): log(f"Monitor: {TOTAL_SESSIONS} sessions | {MAX_PER_ACCOUNT}/account | " f"acc_gap={ACC_PUSH_COOLDOWN}s global_gap={PUSH_GAP_MIN}-{PUSH_GAP_MAX}s") # Stagger initial pushes — one per account at a time def initial_push(): for sid in range(TOTAL_SESSIONS): try: handle(sid) except Exception as e: log(f"Init push err s{sid}: {e}") threading.Thread(target=initial_push, daemon=True).start() while True: time.sleep(MONITOR_SECS) for sid in list(sessions): try: handle(sid) except Exception as e: log(f"Monitor err s{sid}: {e}") def start_monitor(): global _mon_started if _mon_started: return _mon_started = True threading.Thread(target=monitor, daemon=True).start() start_monitor() # ── UI ──────────────────────────────────────────────────────── BADGE = {"idle":"⚪","pushing":"🔄","queued":"🟡","inQueue":"🟡", "running":"🟢","complete":"✅","done":"🏁","error":"🔴", "push_failed":"🔴","not_found":"⏳","starting":"⏳"} def _b(st): for k,v in BADGE.items(): if k in st: return f"{v} {st}" return f"❓ {st}" def sess_table(): with state_lock: rows = [] for _,s in sorted(sessions.items()): pct = round(s["current_idx"]/max(s["total_builds"],1)*100,1) rows.append([ s["session_id"], s["username"][:14], _b(s["status"]), s["run_number"], s["success"], f'{s["current_idx"]}/{s["total_builds"]} ({pct}%)', (s["current_url"] or "-")[:55], (s["last_check"] or "-")[-8:], ]) return rows def summary(): with state_lock: vals = list(sessions.values()) done = sum(1 for s in vals if s["complete"]) run = sum(1 for s in vals if "running" in s["status"]) q = sum(1 for s in vals if "queue" in s["status"].lower() or "starting" in s["status"]) ok = sum(s["success"] for s in vals) total = sum(s["total_builds"] for s in vals) curr = sum(s["current_idx"] for s in vals) pct = round(curr/max(total,1)*100,1) bar = "X"*int(pct/5)+"."*(20-int(pct/5)) # Show per-account backoff status acc_status = [] for i,a in enumerate(KAGGLE_ACCOUNTS): bf = acc_backoff.get(i,0.0) remaining = max(0, int(bf-time.time())) if remaining > 0: acc_status.append(f"{a['username']}: 🔴backoff {remaining}s") else: acc_status.append(f"{a['username']}: ✅ready") return (f"**{done}/{TOTAL_SESSIONS} done** | 🟢{run} running 🟡{q} queued | " f"**{ok:,} built** | [{bar}] {pct}%\n\n" f"Accounts: {' | '.join(acc_status)}") def hist_table(): with state_lock: rows = [] for _,s in sorted(sessions.items()): for h in reversed(s["history"][-3:]): rows.append([s["session_id"],h["run_number"],h["slug"], h.get("start_idx","?"),h.get("stop_reason","-")[:25], h.get("pushed_at","-")[-8:],h.get("stopped_at","-")[-8:],h["url"]]) return rows def all_links(): with state_lock: lines = [] for _,s in sorted(sessions.items()): for h in s["history"]: lines.append(f"s{s['session_id']} r{h['run_number']} " f"idx={h.get('start_idx','?')} {h['url']}") return "\n".join(lines) or "No kernels pushed yet." def get_log(): with state_lock: return "\n".join(log_lines[-100:]) def force_restart(sid_str): try: sid = int(sid_str.strip()) except: return "Invalid ID" if sid not in sessions: return f"s{sid} not found" with state_lock: sessions[sid]["current_slug"] = None sessions[sid]["last_push_ts"] = 0.0 threading.Thread(target=handle,args=(sid,),daemon=True).start() return f"Restart triggered for s{sid}" def reset_backoff(acc_str): try: aidx = int(acc_str.strip()) except: return "Enter account index (0, 1, or 2)" if aidx not in acc_backoff: return f"Account {aidx} not found" acc_backoff[aidx] = 0.0 return f"Backoff cleared for account {aidx} ({KAGGLE_ACCOUNTS[aidx]['username']})" R = 30 with gr.Blocks(title="PMC Worker") as demo: gr.Markdown( f"## PMC Worker — {TOTAL_SESSIONS} sessions " f"({MAX_PER_ACCOUNT}/account × {len(KAGGLE_ACCOUNTS)} accounts)\n" f"*Per-account push gap: {ACC_PUSH_COOLDOWN}s | " f"429 backoff: {BACKOFF_429//60}min | " f"Kernel cooldown: {PUSH_COOLDOWN//60}min*" ) gr.Markdown(value=summary, every=R) with gr.Tabs(): with gr.Tab("Sessions"): gr.Dataframe( headers=["Sess","Account","Status","Runs","Built", "Progress","Kernel URL","Last check"], value=sess_table, every=R, interactive=False, wrap=True) with gr.Row(): sid_in = gr.Textbox(label="Session ID to restart", scale=1) rb = gr.Button("Force restart", scale=1) ro = gr.Textbox(label="Result", scale=3, interactive=False) rb.click(force_restart, inputs=sid_in, outputs=ro) with gr.Row(): acc_in = gr.Textbox(label="Account idx (0/1/2) to clear backoff", scale=1) ab = gr.Button("Clear 429 backoff", scale=1) ao = gr.Textbox(label="Result", scale=3, interactive=False) ab.click(reset_backoff, inputs=acc_in, outputs=ao) with gr.Tab("History"): gr.Dataframe( headers=["Sess","Run","Slug","Start idx","Stop reason", "Pushed","Stopped","URL"], value=hist_table, every=R, interactive=False, wrap=True) with gr.Tab("All links"): gr.Textbox(value=all_links, every=R, lines=40, interactive=False, label="Every kernel URL") with gr.Tab("Log"): gr.Textbox(value=get_log, every=15, lines=30, interactive=False, label="Log") demo.queue() demo.launch()