Spaces:
Sleeping
Sleeping
| """ | |
| PMC Worker — Final | |
| Key fixes: | |
| 1. Per-account push rate limiting (one push per account at a time, 2min between) | |
| 2. 429 backoff — wait 5min before retrying that account | |
| 3. No external API — everything in this one space | |
| """ | |
| import os,json,time,random,threading,shutil,tempfile,subprocess,requests | |
| from pathlib import Path | |
| from datetime import datetime,timezone | |
| import gradio as gr | |
| # ── PASTE YOUR ACCOUNTS ─────────────────────────────────────── | |
| KAGGLE_ACCOUNTS = [ | |
| { | |
| "username": "nottybro", | |
| "key": "5afaf2c20f92beea7a171bcd69dca17e" | |
| }, | |
| { | |
| "username": "imimpossible", | |
| "key": "3a82dc27094177b4c75c34f71aabda0d" | |
| }, | |
| { | |
| "username": "krushnadere", | |
| "key": "6cfe3c35e98a63bd735fc19401b16e93" | |
| } | |
| ] | |
| # ───────────────────────────────────────────────────────────── | |
| TOTAL_SESSIONS = 15 | |
| MAX_PER_ACCOUNT = 5 | |
| SESSIONS_PER_ACC = MAX_PER_ACCOUNT | |
| PUSH_GAP_MIN = 25 # global gap between any two pushes | |
| PUSH_GAP_MAX = 35 | |
| ACC_PUSH_COOLDOWN = 120 # min seconds between pushes TO SAME ACCOUNT | |
| MONITOR_SECS = 90 | |
| PUSH_COOLDOWN = 10*60 # don't check kernel status for 10min after push | |
| BACKOFF_429 = 5*60 # wait 5min after 429 before retrying that account | |
| SCRAPER_DIR = Path("scrapers") | |
| sessions = {} | |
| state_lock = threading.Lock() | |
| global_push_lock = threading.Lock() # one push at a time globally | |
| acc_locks = {i: threading.Lock() for i in range(len(KAGGLE_ACCOUNTS))} | |
| last_push_t = 0.0 # global last push timestamp | |
| acc_last_push = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account | |
| acc_backoff = {i: 0.0 for i in range(len(KAGGLE_ACCOUNTS))} # per-account backoff until | |
| log_lines = [] | |
| _mon_started = False | |
| def log(msg): | |
| ts = datetime.now(timezone.utc).strftime("%H:%M:%S") | |
| ln = f"[{ts}] {msg}" | |
| print(ln) | |
| with state_lock: | |
| log_lines.append(ln) | |
| if len(log_lines) > 500: log_lines.pop(0) | |
| def _now(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| def _slug(sid,run): return f"pmc-s{sid}-r{run}" | |
| def _kurl(u,s): return f"https://www.kaggle.com/code/{u}/{s}" | |
| def acc_idx(sid): return min(sid // SESSIONS_PER_ACC, len(KAGGLE_ACCOUNTS)-1) | |
| def acc(sid): return KAGGLE_ACCOUNTS[acc_idx(sid)] | |
| def chunk_size(sid): | |
| f = SCRAPER_DIR / f"scraper_s{sid}.py" | |
| if not f.exists(): return 0 | |
| try: | |
| for line in f.read_text().splitlines()[:5]: | |
| if "TOTAL_BUILDS" in line and "=" in line: | |
| return int(line.split("=")[1].strip()) | |
| except: pass | |
| return 0 | |
| for sid in range(TOTAL_SESSIONS): | |
| sessions[sid] = { | |
| "session_id":sid, "username":acc(sid)["username"], | |
| "current_slug":None, "current_url":None, "run_number":0, | |
| "status":"idle", "last_check":None, "last_push":None, | |
| "last_push_ts":0.0, "start_idx":0, "current_idx":0, | |
| "total_builds":chunk_size(sid), | |
| "success":0, "skip":0, "fail":0, "complete":False, | |
| "error_msg":"", "history":[], | |
| } | |
| def _kenv(a): | |
| cfg = Path(f"/tmp/kcfg_{a['username']}") | |
| cfg.mkdir(exist_ok=True) | |
| cred = cfg/"kaggle.json" | |
| cred.write_text(json.dumps({"username":a["username"],"key":a["key"]})) | |
| os.chmod(str(cred), 0o600) | |
| return {**os.environ, "KAGGLE_CONFIG_DIR":str(cfg)} | |
| def kstatus(a, slug): | |
| from requests.auth import HTTPBasicAuth | |
| try: | |
| r = requests.get( | |
| f"https://www.kaggle.com/api/v1/kernels/{a['username']}/{slug}", | |
| auth=HTTPBasicAuth(a["username"],a["key"]), timeout=20) | |
| if r.status_code == 404: return "not_found" | |
| r.raise_for_status() | |
| return r.json().get("currentRunningVersion",{}).get("status","unknown") | |
| except Exception as e: return f"api_error:{str(e)[:50]}" | |
| def kdownload_state(a, slug): | |
| try: | |
| tmp = tempfile.mkdtemp() | |
| subprocess.run( | |
| ["kaggle","kernels","output",f"{a['username']}/{slug}", | |
| "--file-pattern","run_state.json","-p",tmp], | |
| env=_kenv(a), capture_output=True, text=True, timeout=90) | |
| for f in Path(tmp).rglob("run_state.json"): | |
| return json.loads(f.read_text()) | |
| return None | |
| except Exception as e: | |
| log(f"State dl fail {slug}: {e}"); return None | |
| def push_kernel(a, sid, run_num, start_idx): | |
| """Push with per-account rate limiting and 429 backoff.""" | |
| global last_push_t | |
| aidx = acc_idx(sid) | |
| sf = SCRAPER_DIR / f"scraper_s{sid}.py" | |
| if not sf.exists(): | |
| return False, "", f"scraper_s{sid}.py missing" | |
| with global_push_lock: | |
| # Check per-account backoff (from previous 429) | |
| backoff_until = acc_backoff.get(aidx, 0.0) | |
| now = time.time() | |
| if now < backoff_until: | |
| wait = backoff_until - now | |
| log(f"Account {aidx} in 429 backoff, {int(wait)}s remaining") | |
| return False, "", f"account_backoff_{int(wait)}s" | |
| # Global gap between any two pushes | |
| global_wait = PUSH_GAP_MIN + random.random()*(PUSH_GAP_MAX-PUSH_GAP_MIN) - (now-last_push_t) | |
| if global_wait > 0: | |
| log(f"Global push gap {global_wait:.1f}s") | |
| time.sleep(global_wait) | |
| # Per-account gap | |
| acc_wait = ACC_PUSH_COOLDOWN - (time.time() - acc_last_push.get(aidx, 0.0)) | |
| if acc_wait > 0: | |
| log(f"Account {aidx} per-account gap {acc_wait:.1f}s") | |
| time.sleep(acc_wait) | |
| slug = _slug(sid, run_num) | |
| url = _kurl(a["username"], slug) | |
| script = sf.read_text(encoding="utf-8").replace("__START_IDX__", str(start_idx)) | |
| script = f"# PMC s{sid} r{run_num} start={start_idx} seed={random.randint(10000,99999)}\n" + script | |
| tmp = tempfile.mkdtemp() | |
| try: | |
| (Path(tmp)/"main.py").write_text(script, encoding="utf-8") | |
| meta = { | |
| "id":f"{a['username']}/{slug}", "title":f"PMC s{sid} r{run_num}", | |
| "code_file":"main.py", "language":"python", "kernel_type":"script", | |
| "is_private":False, "enable_gpu":False, "enable_internet":True, | |
| "dataset_sources":[], "kernel_sources":[], | |
| } | |
| (Path(tmp)/"kernel-metadata.json").write_text(json.dumps(meta)) | |
| result = subprocess.run( | |
| ["kaggle","kernels","push","-p",tmp], | |
| env=_kenv(a), capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| err = result.stderr[:300] or result.stdout[:300] | |
| # 429 — set per-account backoff | |
| if "429" in err: | |
| acc_backoff[aidx] = time.time() + BACKOFF_429 | |
| log(f"429 on account {aidx} — backoff {BACKOFF_429//60}min") | |
| raise Exception(err) | |
| last_push_t = time.time() | |
| acc_last_push[aidx] = time.time() | |
| log(f"Pushed {slug} (s{sid} r{run_num} start={start_idx}) -> {url}") | |
| return True, slug, url | |
| except Exception as e: | |
| log(f"Push fail {slug}: {str(e)[:200]}") | |
| return False, "", str(e)[:200] | |
| finally: | |
| shutil.rmtree(tmp, ignore_errors=True) | |
| RESTART_ON = {"complete","error","cancelAcknowledged","failed"} | |
| def handle(sid): | |
| with state_lock: | |
| if sessions[sid]["complete"]: return | |
| s = sessions[sid] | |
| a = acc(sid) | |
| slug = s["current_slug"] | |
| if slug is None: | |
| with state_lock: | |
| rn = sessions[sid]["run_number"] + 1 | |
| si = sessions[sid]["start_idx"] | |
| sessions[sid].update({"run_number":rn, "status":"pushing"}) | |
| ok, ns, url = push_kernel(a, sid, rn, si) | |
| with state_lock: | |
| if ok: | |
| sessions[sid].update({"current_slug":ns,"current_url":url, | |
| "status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""}) | |
| sessions[sid]["history"].append({"run_number":rn,"slug":ns, | |
| "url":url,"pushed_at":_now(),"start_idx":si}) | |
| else: | |
| sessions[sid].update({"status":"push_failed","error_msg":url}) | |
| return | |
| # Cooldown | |
| with state_lock: | |
| since = time.time() - sessions[sid]["last_push_ts"] | |
| if since < PUSH_COOLDOWN: | |
| return # silent — no need to spam log | |
| ks = kstatus(a, slug) | |
| with state_lock: | |
| sessions[sid]["last_check"] = _now() | |
| sessions[sid]["status"] = ks | |
| if ks in {"running","queued","inQueue"}: return | |
| if ks == "not_found" or ks.startswith("api_error"): | |
| log(f"s{sid} transient {ks}"); return | |
| if ks in RESTART_ON: | |
| state = kdownload_state(a, slug) | |
| with state_lock: | |
| if state: | |
| sessions[sid]["success"] += state.get("success",0) | |
| sessions[sid]["skip"] += state.get("skip",0) | |
| sessions[sid]["fail"] += state.get("fail",0) | |
| next_idx = state.get("next_idx") | |
| is_done = state.get("complete",False) | |
| reason = state.get("stop_reason","unknown") | |
| else: | |
| cur = sessions[sid]["current_idx"] | |
| next_idx = cur + 1 | |
| is_done = next_idx >= sessions[sid]["total_builds"] | |
| reason = "no_state" | |
| log(f"WARNING: no state for {slug}") | |
| for h in sessions[sid]["history"]: | |
| if h["slug"] == slug: | |
| h.update({"stopped_at":_now(),"stop_reason":reason}); break | |
| if is_done: | |
| sessions[sid].update({"complete":True,"status":"done"}) | |
| log(f"s{sid} COMPLETE!"); return | |
| sessions[sid]["start_idx"] = next_idx | |
| with state_lock: | |
| rn = sessions[sid]["run_number"] + 1 | |
| sessions[sid].update({"run_number":rn,"status":"pushing"}) | |
| log(f"s{sid}: {reason} -> restart idx={next_idx} run#{rn}") | |
| ok, ns, url = push_kernel(a, sid, rn, next_idx) | |
| with state_lock: | |
| if ok: | |
| sessions[sid].update({"current_slug":ns,"current_url":url, | |
| "status":"queued","last_push":_now(),"last_push_ts":time.time(),"error_msg":""}) | |
| sessions[sid]["history"].append({"run_number":rn,"slug":ns, | |
| "url":url,"pushed_at":_now(),"start_idx":next_idx}) | |
| else: | |
| sessions[sid].update({"status":"push_failed","error_msg":url}) | |
| def monitor(): | |
| log(f"Monitor: {TOTAL_SESSIONS} sessions | {MAX_PER_ACCOUNT}/account | " | |
| f"acc_gap={ACC_PUSH_COOLDOWN}s global_gap={PUSH_GAP_MIN}-{PUSH_GAP_MAX}s") | |
| # Stagger initial pushes — one per account at a time | |
| def initial_push(): | |
| for sid in range(TOTAL_SESSIONS): | |
| try: handle(sid) | |
| except Exception as e: log(f"Init push err s{sid}: {e}") | |
| threading.Thread(target=initial_push, daemon=True).start() | |
| while True: | |
| time.sleep(MONITOR_SECS) | |
| for sid in list(sessions): | |
| try: handle(sid) | |
| except Exception as e: log(f"Monitor err s{sid}: {e}") | |
| def start_monitor(): | |
| global _mon_started | |
| if _mon_started: return | |
| _mon_started = True | |
| threading.Thread(target=monitor, daemon=True).start() | |
| start_monitor() | |
| # ── UI ──────────────────────────────────────────────────────── | |
| BADGE = {"idle":"⚪","pushing":"🔄","queued":"🟡","inQueue":"🟡", | |
| "running":"🟢","complete":"✅","done":"🏁","error":"🔴", | |
| "push_failed":"🔴","not_found":"⏳","starting":"⏳"} | |
| def _b(st): | |
| for k,v in BADGE.items(): | |
| if k in st: return f"{v} {st}" | |
| return f"❓ {st}" | |
| def sess_table(): | |
| with state_lock: | |
| rows = [] | |
| for _,s in sorted(sessions.items()): | |
| pct = round(s["current_idx"]/max(s["total_builds"],1)*100,1) | |
| rows.append([ | |
| s["session_id"], s["username"][:14], _b(s["status"]), | |
| s["run_number"], s["success"], | |
| f'{s["current_idx"]}/{s["total_builds"]} ({pct}%)', | |
| (s["current_url"] or "-")[:55], | |
| (s["last_check"] or "-")[-8:], | |
| ]) | |
| return rows | |
| def summary(): | |
| with state_lock: | |
| vals = list(sessions.values()) | |
| done = sum(1 for s in vals if s["complete"]) | |
| run = sum(1 for s in vals if "running" in s["status"]) | |
| q = sum(1 for s in vals if "queue" in s["status"].lower() | |
| or "starting" in s["status"]) | |
| ok = sum(s["success"] for s in vals) | |
| total = sum(s["total_builds"] for s in vals) | |
| curr = sum(s["current_idx"] for s in vals) | |
| pct = round(curr/max(total,1)*100,1) | |
| bar = "X"*int(pct/5)+"."*(20-int(pct/5)) | |
| # Show per-account backoff status | |
| acc_status = [] | |
| for i,a in enumerate(KAGGLE_ACCOUNTS): | |
| bf = acc_backoff.get(i,0.0) | |
| remaining = max(0, int(bf-time.time())) | |
| if remaining > 0: | |
| acc_status.append(f"{a['username']}: 🔴backoff {remaining}s") | |
| else: | |
| acc_status.append(f"{a['username']}: ✅ready") | |
| return (f"**{done}/{TOTAL_SESSIONS} done** | 🟢{run} running 🟡{q} queued | " | |
| f"**{ok:,} built** | [{bar}] {pct}%\n\n" | |
| f"Accounts: {' | '.join(acc_status)}") | |
| def hist_table(): | |
| with state_lock: | |
| rows = [] | |
| for _,s in sorted(sessions.items()): | |
| for h in reversed(s["history"][-3:]): | |
| rows.append([s["session_id"],h["run_number"],h["slug"], | |
| h.get("start_idx","?"),h.get("stop_reason","-")[:25], | |
| h.get("pushed_at","-")[-8:],h.get("stopped_at","-")[-8:],h["url"]]) | |
| return rows | |
| def all_links(): | |
| with state_lock: | |
| lines = [] | |
| for _,s in sorted(sessions.items()): | |
| for h in s["history"]: | |
| lines.append(f"s{s['session_id']} r{h['run_number']} " | |
| f"idx={h.get('start_idx','?')} {h['url']}") | |
| return "\n".join(lines) or "No kernels pushed yet." | |
| def get_log(): | |
| with state_lock: return "\n".join(log_lines[-100:]) | |
| def force_restart(sid_str): | |
| try: sid = int(sid_str.strip()) | |
| except: return "Invalid ID" | |
| if sid not in sessions: return f"s{sid} not found" | |
| with state_lock: | |
| sessions[sid]["current_slug"] = None | |
| sessions[sid]["last_push_ts"] = 0.0 | |
| threading.Thread(target=handle,args=(sid,),daemon=True).start() | |
| return f"Restart triggered for s{sid}" | |
| def reset_backoff(acc_str): | |
| try: aidx = int(acc_str.strip()) | |
| except: return "Enter account index (0, 1, or 2)" | |
| if aidx not in acc_backoff: return f"Account {aidx} not found" | |
| acc_backoff[aidx] = 0.0 | |
| return f"Backoff cleared for account {aidx} ({KAGGLE_ACCOUNTS[aidx]['username']})" | |
| R = 30 | |
| with gr.Blocks(title="PMC Worker") as demo: | |
| gr.Markdown( | |
| f"## PMC Worker — {TOTAL_SESSIONS} sessions " | |
| f"({MAX_PER_ACCOUNT}/account × {len(KAGGLE_ACCOUNTS)} accounts)\n" | |
| f"*Per-account push gap: {ACC_PUSH_COOLDOWN}s | " | |
| f"429 backoff: {BACKOFF_429//60}min | " | |
| f"Kernel cooldown: {PUSH_COOLDOWN//60}min*" | |
| ) | |
| gr.Markdown(value=summary, every=R) | |
| with gr.Tabs(): | |
| with gr.Tab("Sessions"): | |
| gr.Dataframe( | |
| headers=["Sess","Account","Status","Runs","Built", | |
| "Progress","Kernel URL","Last check"], | |
| value=sess_table, every=R, interactive=False, wrap=True) | |
| with gr.Row(): | |
| sid_in = gr.Textbox(label="Session ID to restart", scale=1) | |
| rb = gr.Button("Force restart", scale=1) | |
| ro = gr.Textbox(label="Result", scale=3, interactive=False) | |
| rb.click(force_restart, inputs=sid_in, outputs=ro) | |
| with gr.Row(): | |
| acc_in = gr.Textbox(label="Account idx (0/1/2) to clear backoff", scale=1) | |
| ab = gr.Button("Clear 429 backoff", scale=1) | |
| ao = gr.Textbox(label="Result", scale=3, interactive=False) | |
| ab.click(reset_backoff, inputs=acc_in, outputs=ao) | |
| with gr.Tab("History"): | |
| gr.Dataframe( | |
| headers=["Sess","Run","Slug","Start idx","Stop reason", | |
| "Pushed","Stopped","URL"], | |
| value=hist_table, every=R, interactive=False, wrap=True) | |
| with gr.Tab("All links"): | |
| gr.Textbox(value=all_links, every=R, lines=40, | |
| interactive=False, label="Every kernel URL") | |
| with gr.Tab("Log"): | |
| gr.Textbox(value=get_log, every=15, lines=30, | |
| interactive=False, label="Log") | |
| demo.queue() | |
| demo.launch() |