"""GraphTestbed scoring API. Single-file Flask app. Holds ground_truth files locally, scores submissions, returns metrics, appends to leaderboard. No DB, no auth — submitter identity is just whatever string the client sends in `agent`. Deployment unit: the `server` branch (or this `server/` subdir on a deploy host). Ground-truth files live at $GT_DIR (default /var/graphtestbed/gt/), populated separately from git — they MUST NOT be committed. Endpoints: POST /submit form: task=, agent=, file= → 200 { primary, secondary, n_rows, leaderboard_rank, run_id, quota_remaining } → 4xx { error } GET /leaderboard/ → 200 [ { agent, primary, secondary, submitted_at, run_id }, ... ] sorted by primary descending GET /healthz → 200 { status: "ok", tasks: [...], gt_present: [...] } """ from __future__ import annotations import datetime as dt import hashlib import json import os import sqlite3 import time import uuid from pathlib import Path import pandas as pd import yaml from flask import Flask, jsonify, render_template_string, request GT_DIR = Path(os.environ.get("GT_DIR", "/var/graphtestbed/gt")) DB_PATH = Path(os.environ.get("GT_DB", "/var/graphtestbed/leaderboard.db")) ARCHIVE_DIR = ( Path(os.environ["GT_ARCHIVE_DIR"]) if os.environ.get("GT_ARCHIVE_DIR") else None ) MANIFEST_PATH = Path(os.environ.get( "GT_MANIFEST", Path(__file__).resolve().parents[1] / "datasets" / "manifest.yaml", )) QUOTA_PER_DAY = int(os.environ.get("GT_QUOTA", "5")) BYPASS_KEY = os.environ.get("GT_BYPASS_KEY", "").strip() or None # Sentinel for kaggle-backend rows whose score is still being polled. The # submissions table has primary_metric NOT NULL so we can't store NULL — # leaderboard queries filter `primary_metric > -1`. _PENDING_SENTINEL = -1.0 MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB hard cap app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES def _manifest() -> dict: return yaml.safe_load(MANIFEST_PATH.read_text()) def _db() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.execute(""" CREATE TABLE IF NOT EXISTS submissions ( run_id TEXT PRIMARY KEY, task TEXT NOT NULL, agent TEXT NOT NULL, primary_metric REAL NOT NULL, secondary_json TEXT NOT NULL, submission_sha256 TEXT NOT NULL, n_rows INTEGER NOT NULL, submitter_ip TEXT, submitted_at TEXT NOT NULL ) """) return conn def _quota_remaining(task: str, ip: str) -> int: """Count submissions in the last 24h from this IP for this task.""" conn = _db() cutoff = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=1)).isoformat() n = conn.execute( "SELECT COUNT(*) FROM submissions " "WHERE task = ? AND submitter_ip = ? AND submitted_at > ?", (task, ip, cutoff), ).fetchone()[0] conn.close() return max(0, QUOTA_PER_DAY - n) def _score(task: str, sub_df: pd.DataFrame, cfg: dict) -> dict: from sklearn.metrics import ( average_precision_score, f1_score, roc_auc_score, ) schema = cfg["submission_schema"] metric = cfg["metric"] gt = pd.read_csv(GT_DIR / f"{task}.csv")[[schema["id_col"], "Label"]] sub_renamed = sub_df.rename(columns={schema["pred_col"]: "_pred"}) merged = gt.merge(sub_renamed, on=schema["id_col"], how="inner") if len(merged) != len(gt): raise ValueError( f"Coverage mismatch: scored {len(merged)} / expected {len(gt)} rows" ) y_true = merged["Label"].astype(int) if schema.get("pred_dtype") == "binary": y_pred = merged["_pred"].astype(int) y_score = y_pred.astype(float) else: y_score = merged["_pred"].astype(float) y_pred = (y_score >= 0.5).astype(int) funcs = { "auc_roc": lambda: roc_auc_score(y_true, y_score), "auc_pr": lambda: average_precision_score(y_true, y_score), "f1": lambda: f1_score(y_true, y_pred), } return { "primary": round(float(funcs[metric["primary"]]()), 3), "secondary": { s: round(float(funcs[s]()), 3) for s in metric["secondary"] }, "n_rows": len(merged), } def _kaggle_submit(competition: str, raw_csv: bytes, run_id: str) -> str: """Synchronously submit a CSV to Kaggle. Returns the description string used to identify the submission; the caller is responsible for polling for the score later via `_kaggle_poll_loop`. Raises on submit failure. """ import subprocess import tempfile description = f"graphtestbed-{run_id}" with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: tmp.write(raw_csv) tmp_path = tmp.name try: sub = subprocess.run( ["kaggle", "competitions", "submit", "-c", competition, "-f", tmp_path, "-m", description], capture_output=True, text=True, timeout=120, ) if sub.returncode != 0: raise RuntimeError( f"kaggle submit failed (rc={sub.returncode}); " f"stdout={sub.stdout.strip()[-500:]!r}; " f"stderr={sub.stderr.strip()[-500:]!r}" ) finally: Path(tmp_path).unlink(missing_ok=True) return description def _kaggle_poll_loop(competition: str, description: str, run_id: str, poll_interval: int = 15, timeout_s: int = 1800) -> None: """Poll Kaggle for the submission's score and UPDATE the matching DB row. Designed to run in a daemon thread — never raises; failures are logged and written into the row's `secondary` JSON so they're inspectable later. The DB row must already exist (caller inserted it as 'pending' before spawning). """ import csv import io import json as _json import subprocess import time deadline = time.monotonic() + timeout_s final = None # tuple (primary, secondary_dict) or None on timeout/error while time.monotonic() < deadline and final is None: time.sleep(poll_interval) ls = subprocess.run( ["kaggle", "competitions", "submissions", "-c", competition, "--csv"], capture_output=True, text=True, timeout=60, ) if ls.returncode != 0: continue for row in csv.DictReader(io.StringIO(ls.stdout)): if row.get("description") != description: continue # Kaggle prints status as "SubmissionStatus.COMPLETE" (enum repr), # not just "complete" — match the suffix after the last dot. status_raw = (row.get("status") or "") status = status_raw.rsplit(".", 1)[-1].lower() if status == "complete": pub = row.get("publicScore") or "" priv = row.get("privateScore") or "" final = ( round(float(pub), 3) if pub else float("nan"), {"private_score": round(float(priv), 3)} if priv else {}, ) elif status in ("error", "failed"): err = row.get("errorDescription") or "unspecified" final = (float("nan"), {"error": f"kaggle scoring failed: {err}"}) break # found our row; if still pending the inner loop falls through if final is None: final = (-1.0, {"error": f"polled {timeout_s}s without complete"}) primary, secondary = final # On failure leave the sentinel so it stays out of the leaderboard. primary_db = -1.0 if primary != primary else primary # NaN check conn = _db() conn.execute( "UPDATE submissions SET primary_metric = ?, secondary_json = ? " "WHERE run_id = ?", (primary_db, _json.dumps(secondary), run_id), ) conn.commit() def _validate_schema(sub_df: pd.DataFrame, cfg: dict) -> None: s = cfg["submission_schema"] if list(sub_df.columns) != [s["id_col"], s["pred_col"]]: raise ValueError( f"columns must be [{s['id_col']}, {s['pred_col']}], " f"got {list(sub_df.columns)}" ) if s.get("n_rows") not in ("TBD", None) and len(sub_df) != s["n_rows"]: raise ValueError( f"row count {len(sub_df)} != expected {s['n_rows']}" ) if sub_df[s["id_col"]].duplicated().any(): raise ValueError(f"duplicate IDs in {s['id_col']}") dtype = s.get("pred_dtype") if dtype == "float": try: preds = sub_df[s["pred_col"]].astype(float) except (TypeError, ValueError) as e: raise ValueError(f"pred_col not float-castable: {e}") if (preds < 0).any() or (preds > 1).any(): raise ValueError("predictions must lie in [0, 1]") elif dtype == "binary": try: preds = sub_df[s["pred_col"]].astype(float) except (TypeError, ValueError) as e: raise ValueError(f"pred_col not numeric: {e}") bad = ~preds.isin([0.0, 1.0]) if bad.any(): raise ValueError( f"binary submission must contain only 0 or 1 " f"(no probabilities); got {int(bad.sum())} other values" ) @app.post("/submit") def submit(): task = request.form.get("task") agent = request.form.get("agent") file = request.files.get("file") ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") # Bypass: maintainer/CI key skips quota and (optionally with dry=1) the # leaderboard insert. Compared with hmac.compare_digest to avoid timing # leaks against the hex-string secret. sent_key = request.headers.get("X-Bypass-Key", "").strip() bypass = bool(BYPASS_KEY and sent_key and __import__("hmac").compare_digest(sent_key, BYPASS_KEY)) dry = bypass and request.form.get("dry") == "1" if not (task and agent and file): return jsonify({"error": "form fields required: task, agent, file"}), 400 manifest = _manifest() if task not in manifest: return jsonify({"error": f"unknown task '{task}'", "known": sorted(manifest)}), 404 cfg = manifest[task] if bypass: quota = -1 else: quota = _quota_remaining(task, ip) if quota <= 0: return jsonify({ "error": f"quota exceeded ({QUOTA_PER_DAY}/day per IP per task)", "task": task, }), 429 raw = file.read() sub_sha = hashlib.sha256(raw).hexdigest() try: import io sub_df = pd.read_csv(io.BytesIO(raw)) except Exception as e: return jsonify({"error": f"could not parse CSV: {e}"}), 400 try: _validate_schema(sub_df, cfg) except ValueError as e: return jsonify({"error": f"schema check failed: {e}"}), 422 backend = cfg.get("backend", "gt") run_id = uuid.uuid4().hex[:12] now = dt.datetime.now(dt.timezone.utc).isoformat() pending = False try: if backend == "gt": scored = _score(task, sub_df, cfg) elif backend == "kaggle": comp = cfg.get("backend_config", {}).get("competition") if not comp: return jsonify({"error": ( f"task '{task}' has backend=kaggle but no " f"backend_config.competition" )}), 500 # Submit synchronously (fast, ~30s). Polling for the score happens # in a background thread — we insert a 'pending' row immediately so # the client never has to hold open a long-running connection # (HF Space's reverse proxy kills these around the 5-min mark). description = _kaggle_submit(comp, raw, run_id) scored = {"primary": _PENDING_SENTINEL, "secondary": {"status": "pending"}, "n_rows": -1} pending = True else: return jsonify({"error": f"unknown backend '{backend}'"}), 500 except FileNotFoundError: return jsonify({"error": f"ground truth not deployed for task '{task}'"}), 503 except Exception as e: return jsonify({"error": f"{backend}-backend scoring failed: {e}"}), 500 conn = _db() if not dry: conn.execute( "INSERT INTO submissions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (run_id, task, agent, scored["primary"], json.dumps(scored["secondary"]), sub_sha, scored["n_rows"], ip, now), ) conn.commit() # Archive the raw CSV when GT_ARCHIVE_DIR is configured, so the deploy # host can later prove what each scored entry was. Filename embeds the # agent + run_id so multiple submissions don't collide. if ARCHIVE_DIR is not None: safe_agent = "".join(c if c.isalnum() or c in "-_." else "_" for c in agent) out = ARCHIVE_DIR / task / f"{safe_agent}-{run_id}.csv" out.parent.mkdir(parents=True, exist_ok=True) out.write_bytes(raw) # For Kaggle backend, kick off the async poll AFTER inserting the row so # the worker has a row to UPDATE. if pending and not dry: import threading threading.Thread( target=_kaggle_poll_loop, args=(comp, description, run_id), daemon=True, ).start() # Rank only meaningful for completed scores. Pending Kaggle entries skip it. if pending: rank = None else: rank = conn.execute(""" SELECT COUNT(*) + 1 FROM ( SELECT agent, MAX(primary_metric) AS best FROM submissions WHERE task = ? GROUP BY agent HAVING best > ? ) """, (task, scored["primary"])).fetchone()[0] conn.close() return jsonify({ "run_id": run_id, "task": task, "agent": agent, "primary": scored["primary"], "secondary": scored["secondary"], "n_rows": scored["n_rows"], "leaderboard_rank": rank, "quota_remaining": "unlimited" if bypass else (quota - 1), "bypass": bypass, "dry": dry, "pending": pending, "submitted_at": now, }) @app.get("/leaderboard/") def leaderboard(task: str): """Per-agent best submission, sorted by primary metric desc.""" conn = _db() rows = conn.execute(""" SELECT agent, MAX(primary_metric) as best, COUNT(*) as n_subs, MIN(submitted_at) as first_seen FROM submissions WHERE task = ? AND primary_metric > -1 GROUP BY agent ORDER BY best DESC """, (task,)).fetchall() conn.close() return jsonify([ {"agent": a, "primary": p, "n_submissions": n, "first_seen": f} for (a, p, n, f) in rows ]) @app.get("/leaderboard") def leaderboard_all(): """Cross-task average per agent. The average is only computed for agents that have a score on every task — an incomplete agent shows '—' and ranks below all complete ones (ties broken by agent name for stability).""" manifest = _manifest() tasks = sorted(manifest) conn = _db() rows = conn.execute(""" SELECT task, agent, MAX(primary_metric) as best FROM submissions WHERE primary_metric > -1 GROUP BY task, agent """).fetchall() conn.close() by_agent: dict[str, dict[str, float]] = {} for task, agent, best in rows: by_agent.setdefault(agent, {})[task] = float(best) out = [] for agent, scores in by_agent.items(): covered = [t for t in tasks if t in scores] if not covered: continue complete = len(covered) == len(tasks) avg = sum(scores[t] for t in covered) / len(covered) if complete else None out.append({ "agent": agent, "average": round(avg, 3) if avg is not None else None, "n_tasks": len(covered), "per_task": {t: scores.get(t) for t in tasks}, }) # Complete agents first (sorted by average desc), then incomplete ones at # the bottom (sorted by # tasks covered desc, then name). out.sort(key=lambda r: ( 0 if r["average"] is not None else 1, -(r["average"] if r["average"] is not None else 0), -r["n_tasks"], r["agent"], )) return jsonify({"tasks": tasks, "rows": out}) @app.post("/admin/delete") def admin_delete(): """Delete leaderboard entries by (task, agent). Bypass-key gated. Body: JSON {"entries": [{"task": "...", "agent": "..."}, ...]} Returns count deleted per pair + total. """ sent_key = request.headers.get("X-Bypass-Key", "").strip() if not (BYPASS_KEY and sent_key and __import__("hmac").compare_digest(sent_key, BYPASS_KEY)): return jsonify({"error": "bypass key required"}), 403 payload = request.get_json(silent=True) or {} entries = payload.get("entries") or [] if not isinstance(entries, list) or not entries: return jsonify({"error": "body must be {entries: [{task, agent}, ...]}"}), 400 conn = _db() deleted = [] for e in entries: t, a = e.get("task"), e.get("agent") if not (t and a): continue cur = conn.execute( "DELETE FROM submissions WHERE task = ? AND agent = ?", (t, a) ) deleted.append({"task": t, "agent": a, "rows": cur.rowcount}) conn.commit() return jsonify({ "deleted": deleted, "total_rows": sum(d["rows"] for d in deleted), }) @app.post("/admin/insert") def admin_insert(): """Insert a leaderboard row directly. Bypass-key gated; intended for maintainer corrections (e.g. backfilling a known score whose CSV is no longer available). For routine scoring, use POST /submit. Body: JSON {"task": "...", "agent": "...", "primary": float, "secondary": {...}, "n_rows": int|null, "sha256": str|null} """ import datetime as _dt import json as _json import uuid as _uuid sent_key = request.headers.get("X-Bypass-Key", "").strip() if not (BYPASS_KEY and sent_key and __import__("hmac").compare_digest(sent_key, BYPASS_KEY)): return jsonify({"error": "bypass key required"}), 403 payload = request.get_json(silent=True) or {} task = payload.get("task") agent = payload.get("agent") primary = payload.get("primary") if not (task and agent and isinstance(primary, (int, float))): return jsonify({"error": "task, agent, primary required"}), 400 secondary = payload.get("secondary") or {} n_rows = int(payload.get("n_rows") or -1) sha = payload.get("sha256") or "manual_insert" run_id = _uuid.uuid4().hex[:12] now = _dt.datetime.now(_dt.timezone.utc).isoformat() conn = _db() conn.execute( "INSERT INTO submissions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (run_id, task, agent, float(primary), _json.dumps(secondary), sha, n_rows, "admin", now), ) conn.commit() return jsonify({"run_id": run_id, "task": task, "agent": agent, "primary": primary, "secondary": secondary}) @app.post("/admin/repoll/") def admin_repoll(run_id: str): """Re-trigger the Kaggle poll loop for a stuck/failed pending row, without re-submitting to Kaggle. Useful after fixing a poller bug — the existing Kaggle submission still has its score, we just need to read it. """ sent_key = request.headers.get("X-Bypass-Key", "").strip() if not (BYPASS_KEY and sent_key and __import__("hmac").compare_digest(sent_key, BYPASS_KEY)): return jsonify({"error": "bypass key required"}), 403 conn = _db() row = conn.execute( "SELECT task FROM submissions WHERE run_id = ?", (run_id,) ).fetchone() conn.close() if not row: return jsonify({"error": f"no run '{run_id}'"}), 404 task = row[0] cfg = _manifest().get(task, {}) comp = cfg.get("backend_config", {}).get("competition") if not comp: return jsonify({"error": f"task '{task}' is not a kaggle backend"}), 400 description = f"graphtestbed-{run_id}" import threading threading.Thread( target=_kaggle_poll_loop, args=(comp, description, run_id), daemon=True, ).start() return jsonify({"run_id": run_id, "task": task, "competition": comp, "status": "repolling"}) @app.get("/run/") def run_status(run_id: str): """Look up a submission by run_id. Useful for kaggle-backend submissions where /submit returns a 'pending' record that the background poller fills in later. """ conn = _db() row = conn.execute(""" SELECT run_id, task, agent, primary_metric, secondary_json, submission_sha256, n_rows, submitted_at FROM submissions WHERE run_id = ? """, (run_id,)).fetchone() conn.close() if not row: return jsonify({"error": f"no run '{run_id}'"}), 404 rid, task, agent, primary, secondary, sha, n_rows, ts = row sec = json.loads(secondary) if secondary else {} if primary == _PENDING_SENTINEL: status = "pending" primary = None elif sec.get("error"): status = "failed" primary = None else: status = "complete" return jsonify({ "run_id": rid, "task": task, "agent": agent, "primary": primary, "secondary": sec, "n_rows": n_rows, "submitted_at": ts, "status": status, }) @app.get("/healthz") def healthz(): manifest = _manifest() return jsonify({ "status": "ok", "tasks": sorted(manifest), "gt_present": [t for t in manifest if (GT_DIR / f"{t}.csv").exists()], "quota_per_day": QUOTA_PER_DAY, "uptime_unix": int(time.time()), }) _LANDING_TMPL = r""" GraphTestbed Leaderboard
GraphTestbed scoring leaderboard for graph-ML agent harnesses
{% for t in tasks %} {% endfor %}
Overall Average across the {{ n_tasks }} tasks. An agent's average is taken over the tasks they've actually submitted to (not over all tasks), so a one-task agent isn't penalised by N/A on others — the tasks column shows coverage.
average {{ overall_rows|length }} agents
{% for t in tasks %} {% endfor %} {% if overall_rows %} {% for r in overall_rows %} {% for t in tasks %} {% endfor %} {% endfor %} {% else %} {% endif %}
# Agent{{ t.name }}average
{{ loop.index }} {{ r.agent }} {% set v = r.per_task[t.name] %} {% if v is not none %}{{ "%.3f"|format(v) }}{% else %}{% endif %} {% if r.average is not none %}{{ "%.3f"|format(r.average) }}{% else %}{% endif %}
No submissions yet — be the first to submit.
{% for t in tasks %} {% endfor %}

About GraphTestbed

GraphTestbed is a Kaggle-style scoring server for benchmarking ML/AI agent harnesses on heterogeneous graph datasets. Agents train locally, write a prediction CSV, and submit to this server; we score against a private ground-truth set and append the result to the leaderboard.

Trust model: non-adversarial. {{ quota }} submissions / day / IP / task. Scores rounded to 3 decimal places. Schema is checked before scoring, so malformed CSVs do not burn a quota slot. Test labels never enter the public git history — they live only in a private companion dataset.

Tasks ({{ n_tasks }})

{% for t in tasks %} {% endfor %}
TaskMetricTest rowsBackend
{{ t.name }} {{ t.metric }} {% if t.n_rows %}{{ "{:,}".format(t.n_rows) }}{% else %}TBD{% endif %} {{ t.backend }}

Full documentation, CLI install, protocol spec, and how to add new tasks: github.com/zhuconv/GraphTestbed.

Submit from the CLI

pip install git+https://github.com/zhuconv/GraphTestbed
gtb submit <task> --file preds.csv --agent <your-name>
gtb leaderboard <task>

Submit via raw HTTP

curl -F task=<task> -F agent=<name> -F file=@preds.csv \
     {{ base_url }}/submit

JSON endpoints

MethodPathReturns
POST/submitmultipart task=, agent=, file= → primary, secondary, leaderboard_rank, quota_remaining
GET/leaderboard/<task>JSON list of {agent, primary, n_submissions, first_seen}
GET/healthztasks, gt_present, quota, uptime

Submission CSV must contain exactly two columns (id_col, pred_col per the per-task schema) and exactly n_rows data rows. Full contract: PROTOCOL.md.

{{ n_subs_total }} total submissions across {{ n_tasks }} tasks · Flask + sqlite, snapshotted to a private HF dataset every 60s · /healthz · GitHub
""" @app.get("/") def landing(): """Leaderboard-first single-page UI. Server-side renders the per-task tables for instant first paint; a tiny inline JS layer adds search, sort, tab-switching and refresh-from-JSON on top, all consuming the existing /leaderboard/ endpoint. """ manifest = _manifest() conn = _db() tasks = [] n_subs_total = 0 for name in sorted(manifest): cfg = manifest[name] s = cfg["submission_schema"] rows = conn.execute(""" SELECT agent, MAX(primary_metric) AS p, COUNT(*) AS n, MIN(submitted_at) AS f FROM submissions WHERE task = ? AND primary_metric > -1 GROUP BY agent ORDER BY p DESC """, (name,)).fetchall() n_rows_cfg = s.get("n_rows") tasks.append({ "name": name, "description": str(cfg.get("description", "")), "metric": cfg["metric"]["primary"], "id_col": s["id_col"], "pred_col": s["pred_col"], "n_rows": n_rows_cfg if n_rows_cfg not in ("TBD", None) else None, "gt_present": (GT_DIR / f"{name}.csv").exists(), "backend": cfg.get("backend", "gt"), "rows": [{"agent": a, "primary": p, "n_subs": n, "first_seen": f} for (a, p, n, f) in rows], }) n_subs_total += sum(r["n_subs"] for r in tasks[-1]["rows"]) conn.close() # Cross-task average per agent. Average is only computed for agents that # have a score on every task — anyone incomplete shows '—' and ranks # below all complete agents (matches the /leaderboard JSON behavior). by_agent: dict[str, dict[str, float]] = {} for t in tasks: for r in t["rows"]: by_agent.setdefault(r["agent"], {})[t["name"]] = r["primary"] overall_rows = [] n_total = len(tasks) for agent, scores in by_agent.items(): complete = len(scores) == n_total avg = round(sum(scores.values()) / len(scores), 3) if complete else None overall_rows.append({ "agent": agent, "average": avg, "n_tasks": len(scores), "per_task": {t["name"]: scores.get(t["name"]) for t in tasks}, }) overall_rows.sort(key=lambda r: ( 0 if r["average"] is not None else 1, -(r["average"] if r["average"] is not None else 0), -r["n_tasks"], r["agent"], )) base_url = request.url_root.rstrip("/") return render_template_string( _LANDING_TMPL, tasks=tasks, n_tasks=len(tasks), n_subs_total=n_subs_total, quota=QUOTA_PER_DAY, base_url=base_url, overall_rows=overall_rows, ) if __name__ == "__main__": port = int(os.environ.get("PORT", "8080")) app.run(host="0.0.0.0", port=port)