Zhu Jiajun (jz28583) Claude Opus 4.7 (1M context) commited on
Commit
9cb903d
·
1 Parent(s): 00bf799

Async Kaggle scoring: submit + insert pending row + background poll

Browse files

The HF Space reverse proxy kills any request that holds the connection
open past ~5 min, so synchronous Kaggle scoring (which can take 10+ min
end-to-end) was failing even when the underlying submit + scoring both
succeeded — the response just never reached the client.

New flow for kaggle backend:
1. Server submits to Kaggle synchronously (~30s upload).
2. Inserts a 'pending' row into submissions (NULL primary_metric).
3. Spawns a daemon thread that polls Kaggle every 15s for up to 30 min
and UPDATEs the row when complete (or with secondary.error on fail).
4. Returns 200 immediately with pending=true and the run_id.
5. New GET /run/<run_id> lets clients poll for the resolved score.

Leaderboard queries now filter `WHERE primary_metric IS NOT NULL` so
pending rows don't pollute rankings.

Client (gtb submit) recognizes pending response and prints a follow-up
hint instead of trying to print scores it doesn't have yet.

Removed /admin/insert — the only honest path into the leaderboard is
through the scoring backend.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Files changed (2) hide show
  1. graphtestbed/submit.py +6 -0
  2. server/api.py +124 -106
graphtestbed/submit.py CHANGED
@@ -141,6 +141,12 @@ def submit(
141
 
142
  out = resp.json()
143
  print()
 
 
 
 
 
 
144
  print(f"✓ Scored (run_id={out['run_id']})")
145
  print(f" primary ({task_config(task)['metric']['primary']}): "
146
  f"{out['primary']}")
 
141
 
142
  out = resp.json()
143
  print()
144
+ if out.get("pending"):
145
+ print(f"✓ Submitted to Kaggle (run_id={out['run_id']})")
146
+ print(f" Scoring runs async (typically 5–15 min). Check back via:")
147
+ print(f" curl {API_URL}/run/{out['run_id']}")
148
+ print(f" gtb leaderboard {task}")
149
+ return
150
  print(f"✓ Scored (run_id={out['run_id']})")
151
  print(f" primary ({task_config(task)['metric']['primary']}): "
152
  f"{out['primary']}")
server/api.py CHANGED
@@ -130,28 +130,18 @@ def _score(task: str, sub_df: pd.DataFrame, cfg: dict) -> dict:
130
  }
131
 
132
 
133
- def _score_kaggle(competition: str, raw_csv: bytes, run_id: str,
134
- poll_interval: int = 15, timeout_s: int = 600) -> dict:
135
- """Forward to Kaggle's grading API. Returns once Kaggle reports complete.
136
-
137
- Submits via `kaggle competitions submit` with description=graphtestbed-<run_id>
138
- so we can locate the entry in `kaggle competitions submissions`. Polls every
139
- `poll_interval` seconds until the submission's status is `complete` or until
140
- `timeout_s` elapses. Public/private scores both surface (private is what
141
- counts for the historical Kaggle leaderboard).
142
  """
143
- import csv
144
- import io
145
  import subprocess
146
  import tempfile
147
- import time
148
 
149
  description = f"graphtestbed-{run_id}"
150
-
151
  with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
152
  tmp.write(raw_csv)
153
  tmp_path = tmp.name
154
-
155
  try:
156
  sub = subprocess.run(
157
  ["kaggle", "competitions", "submit",
@@ -164,42 +154,62 @@ def _score_kaggle(competition: str, raw_csv: bytes, run_id: str,
164
  f"stdout={sub.stdout.strip()[-500:]!r}; "
165
  f"stderr={sub.stderr.strip()[-500:]!r}"
166
  )
167
-
168
- deadline = time.monotonic() + timeout_s
169
- while time.monotonic() < deadline:
170
- time.sleep(poll_interval)
171
- ls = subprocess.run(
172
- ["kaggle", "competitions", "submissions",
173
- "-c", competition, "--csv"],
174
- capture_output=True, text=True, timeout=60,
175
- )
176
- if ls.returncode != 0:
177
- continue
178
- for row in csv.DictReader(io.StringIO(ls.stdout)):
179
- if row.get("description") != description:
180
- continue
181
- status = (row.get("status") or "").lower()
182
- if status == "complete":
183
- pub = row.get("publicScore") or ""
184
- priv = row.get("privateScore") or ""
185
- pub_f = float(pub) if pub else float("nan")
186
- return {
187
- "primary": round(pub_f, 3),
188
- "secondary": (
189
- {"private_score": round(float(priv), 3)} if priv
190
- else {}
191
- ),
192
- "n_rows": -1, # Kaggle doesn't report row count
193
- }
194
- if status in ("error", "failed"):
195
- err = row.get("errorDescription") or "unspecified"
196
- raise RuntimeError(f"kaggle scoring failed: {err}")
197
- break # found our row but pending — keep polling
198
- raise TimeoutError(
199
- f"kaggle scoring on {competition} did not complete within {timeout_s}s"
200
- )
201
  finally:
202
  Path(tmp_path).unlink(missing_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
 
204
 
205
  def _validate_schema(sub_df: pd.DataFrame, cfg: dict) -> None:
@@ -283,6 +293,10 @@ def submit():
283
  return jsonify({"error": f"schema check failed: {e}"}), 422
284
 
285
  backend = cfg.get("backend", "gt")
 
 
 
 
286
  try:
287
  if backend == "gt":
288
  scored = _score(task, sub_df, cfg)
@@ -293,7 +307,14 @@ def submit():
293
  f"task '{task}' has backend=kaggle but no "
294
  f"backend_config.competition"
295
  )}), 500
296
- scored = _score_kaggle(comp, raw, uuid.uuid4().hex[:12])
 
 
 
 
 
 
 
297
  else:
298
  return jsonify({"error": f"unknown backend '{backend}'"}), 500
299
  except FileNotFoundError:
@@ -301,8 +322,6 @@ def submit():
301
  except Exception as e:
302
  return jsonify({"error": f"{backend}-backend scoring failed: {e}"}), 500
303
 
304
- run_id = uuid.uuid4().hex[:12]
305
- now = dt.datetime.now(dt.timezone.utc).isoformat()
306
  conn = _db()
307
  if not dry:
308
  conn.execute(
@@ -321,19 +340,29 @@ def submit():
321
  out.parent.mkdir(parents=True, exist_ok=True)
322
  out.write_bytes(raw)
323
 
324
- # Rank = how many distinct agents have a strictly better best-score on
325
- # this task. The just-inserted row contributes to that count only if the
326
- # SAME agent had a better prior submission (in which case rank doesn't
327
- # change for them on this submission).
328
- rank = conn.execute("""
329
- SELECT COUNT(*) + 1 FROM (
330
- SELECT agent, MAX(primary_metric) AS best
331
- FROM submissions
332
- WHERE task = ?
333
- GROUP BY agent
334
- HAVING best > ?
335
- )
336
- """, (task, scored["primary"])).fetchone()[0]
 
 
 
 
 
 
 
 
 
 
337
  conn.close()
338
 
339
  return jsonify({
@@ -347,6 +376,7 @@ def submit():
347
  "quota_remaining": "unlimited" if bypass else (quota - 1),
348
  "bypass": bypass,
349
  "dry": dry,
 
350
  "submitted_at": now,
351
  })
352
 
@@ -359,7 +389,7 @@ def leaderboard(task: str):
359
  SELECT agent, MAX(primary_metric) as best, COUNT(*) as n_subs,
360
  MIN(submitted_at) as first_seen
361
  FROM submissions
362
- WHERE task = ?
363
  GROUP BY agent
364
  ORDER BY best DESC
365
  """, (task,)).fetchall()
@@ -381,6 +411,7 @@ def leaderboard_all():
381
  rows = conn.execute("""
382
  SELECT task, agent, MAX(primary_metric) as best
383
  FROM submissions
 
384
  GROUP BY task, agent
385
  """).fetchall()
386
  conn.close()
@@ -435,49 +466,35 @@ def admin_delete():
435
  })
436
 
437
 
438
- @app.post("/admin/insert")
439
- def admin_insert():
440
- """Insert a leaderboard row directly. Bypass-key gated.
441
-
442
- Use for backends we can't proxy server-side (e.g. when Kaggle creds are
443
- only available on the maintainer's machine — they run the submit + poll
444
- locally and POST the resulting score here).
445
-
446
- Body: JSON {"task": "...", "agent": "...", "primary": float,
447
- "secondary": {...}, "n_rows": int|null, "sha256": str|null}
448
  """
449
- import datetime as dt
450
- import json as _json
451
- import uuid as _uuid
452
-
453
- sent_key = request.headers.get("X-Bypass-Key", "").strip()
454
- if not (BYPASS_KEY and sent_key
455
- and __import__("hmac").compare_digest(sent_key, BYPASS_KEY)):
456
- return jsonify({"error": "bypass key required"}), 403
457
- payload = request.get_json(silent=True) or {}
458
- task = payload.get("task")
459
- agent = payload.get("agent")
460
- primary = payload.get("primary")
461
- if not (task and agent and isinstance(primary, (int, float))):
462
- return jsonify({"error": "task, agent, primary required"}), 400
463
- secondary = payload.get("secondary") or {}
464
- n_rows = int(payload.get("n_rows") or -1)
465
- sha = payload.get("sha256") or "manual_insert"
466
- run_id = _uuid.uuid4().hex[:12]
467
- now = dt.datetime.now(dt.timezone.utc).isoformat()
468
  conn = _db()
469
- conn.execute(
470
- "INSERT INTO submissions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
471
- (run_id, task, agent, float(primary), _json.dumps(secondary),
472
- sha, n_rows, "admin", now),
473
- )
474
- conn.commit()
 
 
 
 
 
 
 
 
 
 
 
475
  return jsonify({
476
- "run_id": run_id,
477
- "task": task,
478
- "agent": agent,
479
- "primary": primary,
480
- "secondary": secondary,
481
  })
482
 
483
 
@@ -1249,7 +1266,8 @@ def landing():
1249
  rows = conn.execute("""
1250
  SELECT agent, MAX(primary_metric) AS p, COUNT(*) AS n,
1251
  MIN(submitted_at) AS f
1252
- FROM submissions WHERE task = ?
 
1253
  GROUP BY agent ORDER BY p DESC
1254
  """, (name,)).fetchall()
1255
  n_rows_cfg = s.get("n_rows")
 
130
  }
131
 
132
 
133
+ def _kaggle_submit(competition: str, raw_csv: bytes, run_id: str) -> str:
134
+ """Synchronously submit a CSV to Kaggle. Returns the description string used
135
+ to identify the submission; the caller is responsible for polling for the
136
+ score later via `_kaggle_poll_loop`. Raises on submit failure.
 
 
 
 
 
137
  """
 
 
138
  import subprocess
139
  import tempfile
 
140
 
141
  description = f"graphtestbed-{run_id}"
 
142
  with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
143
  tmp.write(raw_csv)
144
  tmp_path = tmp.name
 
145
  try:
146
  sub = subprocess.run(
147
  ["kaggle", "competitions", "submit",
 
154
  f"stdout={sub.stdout.strip()[-500:]!r}; "
155
  f"stderr={sub.stderr.strip()[-500:]!r}"
156
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  finally:
158
  Path(tmp_path).unlink(missing_ok=True)
159
+ return description
160
+
161
+
162
+ def _kaggle_poll_loop(competition: str, description: str, run_id: str,
163
+ poll_interval: int = 15, timeout_s: int = 1800) -> None:
164
+ """Poll Kaggle for the submission's score and UPDATE the matching DB row.
165
+
166
+ Designed to run in a daemon thread — never raises; failures are logged and
167
+ written into the row's `secondary` JSON so they're inspectable later. The
168
+ DB row must already exist (caller inserted it as 'pending' before spawning).
169
+ """
170
+ import csv
171
+ import io
172
+ import json as _json
173
+ import subprocess
174
+ import time
175
+
176
+ deadline = time.monotonic() + timeout_s
177
+ final = None # tuple (primary, secondary_dict) or None on timeout/error
178
+ while time.monotonic() < deadline and final is None:
179
+ time.sleep(poll_interval)
180
+ ls = subprocess.run(
181
+ ["kaggle", "competitions", "submissions", "-c", competition, "--csv"],
182
+ capture_output=True, text=True, timeout=60,
183
+ )
184
+ if ls.returncode != 0:
185
+ continue
186
+ for row in csv.DictReader(io.StringIO(ls.stdout)):
187
+ if row.get("description") != description:
188
+ continue
189
+ status = (row.get("status") or "").lower()
190
+ if status == "complete":
191
+ pub = row.get("publicScore") or ""
192
+ priv = row.get("privateScore") or ""
193
+ final = (
194
+ round(float(pub), 3) if pub else float("nan"),
195
+ {"private_score": round(float(priv), 3)} if priv else {},
196
+ )
197
+ elif status in ("error", "failed"):
198
+ err = row.get("errorDescription") or "unspecified"
199
+ final = (float("nan"), {"error": f"kaggle scoring failed: {err}"})
200
+ break # found our row; if still pending the inner loop falls through
201
+ if final is None:
202
+ final = (float("nan"), {"error": f"polled {timeout_s}s without complete"})
203
+ primary, secondary = final
204
+ # NaN can't go through SQLite NUMERIC; persist as NULL when scoring failed.
205
+ primary_db = None if primary != primary else primary # NaN check
206
+ conn = _db()
207
+ conn.execute(
208
+ "UPDATE submissions SET primary_metric = ?, secondary = ? "
209
+ "WHERE run_id = ?",
210
+ (primary_db, _json.dumps(secondary), run_id),
211
+ )
212
+ conn.commit()
213
 
214
 
215
  def _validate_schema(sub_df: pd.DataFrame, cfg: dict) -> None:
 
293
  return jsonify({"error": f"schema check failed: {e}"}), 422
294
 
295
  backend = cfg.get("backend", "gt")
296
+ run_id = uuid.uuid4().hex[:12]
297
+ now = dt.datetime.now(dt.timezone.utc).isoformat()
298
+ pending = False
299
+
300
  try:
301
  if backend == "gt":
302
  scored = _score(task, sub_df, cfg)
 
307
  f"task '{task}' has backend=kaggle but no "
308
  f"backend_config.competition"
309
  )}), 500
310
+ # Submit synchronously (fast, ~30s). Polling for the score happens
311
+ # in a background thread — we insert a 'pending' row immediately so
312
+ # the client never has to hold open a long-running connection
313
+ # (HF Space's reverse proxy kills these around the 5-min mark).
314
+ description = _kaggle_submit(comp, raw, run_id)
315
+ scored = {"primary": None, "secondary": {"status": "pending"},
316
+ "n_rows": -1}
317
+ pending = True
318
  else:
319
  return jsonify({"error": f"unknown backend '{backend}'"}), 500
320
  except FileNotFoundError:
 
322
  except Exception as e:
323
  return jsonify({"error": f"{backend}-backend scoring failed: {e}"}), 500
324
 
 
 
325
  conn = _db()
326
  if not dry:
327
  conn.execute(
 
340
  out.parent.mkdir(parents=True, exist_ok=True)
341
  out.write_bytes(raw)
342
 
343
+ # For Kaggle backend, kick off the async poll AFTER inserting the row so
344
+ # the worker has a row to UPDATE.
345
+ if pending and not dry:
346
+ import threading
347
+ threading.Thread(
348
+ target=_kaggle_poll_loop,
349
+ args=(comp, description, run_id),
350
+ daemon=True,
351
+ ).start()
352
+
353
+ # Rank only meaningful for completed scores. Pending Kaggle entries skip it.
354
+ if pending:
355
+ rank = None
356
+ else:
357
+ rank = conn.execute("""
358
+ SELECT COUNT(*) + 1 FROM (
359
+ SELECT agent, MAX(primary_metric) AS best
360
+ FROM submissions
361
+ WHERE task = ?
362
+ GROUP BY agent
363
+ HAVING best > ?
364
+ )
365
+ """, (task, scored["primary"])).fetchone()[0]
366
  conn.close()
367
 
368
  return jsonify({
 
376
  "quota_remaining": "unlimited" if bypass else (quota - 1),
377
  "bypass": bypass,
378
  "dry": dry,
379
+ "pending": pending,
380
  "submitted_at": now,
381
  })
382
 
 
389
  SELECT agent, MAX(primary_metric) as best, COUNT(*) as n_subs,
390
  MIN(submitted_at) as first_seen
391
  FROM submissions
392
+ WHERE task = ? AND primary_metric IS NOT NULL
393
  GROUP BY agent
394
  ORDER BY best DESC
395
  """, (task,)).fetchall()
 
411
  rows = conn.execute("""
412
  SELECT task, agent, MAX(primary_metric) as best
413
  FROM submissions
414
+ WHERE primary_metric IS NOT NULL
415
  GROUP BY task, agent
416
  """).fetchall()
417
  conn.close()
 
466
  })
467
 
468
 
469
+ @app.get("/run/<run_id>")
470
+ def run_status(run_id: str):
471
+ """Look up a submission by run_id. Useful for kaggle-backend submissions
472
+ where /submit returns a 'pending' record that the background poller fills
473
+ in later.
 
 
 
 
 
474
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
475
  conn = _db()
476
+ row = conn.execute("""
477
+ SELECT run_id, task, agent, primary_metric, secondary, sha256,
478
+ n_rows, ts
479
+ FROM submissions WHERE run_id = ?
480
+ """, (run_id,)).fetchone()
481
+ conn.close()
482
+ if not row:
483
+ return jsonify({"error": f"no run '{run_id}'"}), 404
484
+ rid, task, agent, primary, secondary, sha, n_rows, ts = row
485
+ sec = json.loads(secondary) if secondary else {}
486
+ if primary is None:
487
+ # Kaggle backend, still polling
488
+ status = "pending"
489
+ elif sec.get("error"):
490
+ status = "failed"
491
+ else:
492
+ status = "complete"
493
  return jsonify({
494
+ "run_id": rid, "task": task, "agent": agent,
495
+ "primary": primary, "secondary": sec,
496
+ "n_rows": n_rows, "submitted_at": ts,
497
+ "status": status,
 
498
  })
499
 
500
 
 
1266
  rows = conn.execute("""
1267
  SELECT agent, MAX(primary_metric) AS p, COUNT(*) AS n,
1268
  MIN(submitted_at) AS f
1269
+ FROM submissions
1270
+ WHERE task = ? AND primary_metric IS NOT NULL
1271
  GROUP BY agent ORDER BY p DESC
1272
  """, (name,)).fetchall()
1273
  n_rows_cfg = s.get("n_rows")