import json import html import os import re import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any DEFAULT_SPACE_ID = "nvidia/AlpasimE2EClosedLoopChallenge2026" if os.environ.get("SPACE_HOST"): os.environ.setdefault("SYSTEM", "spaces") os.environ.setdefault("SPACE_ID", DEFAULT_SPACE_ID) import gradio as gr from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles API_BASE_URL = os.environ.get( "ALPASIM_API_BASE_URL", "", ).rstrip("/") REGISTRATION_SECRET = os.environ.get("ALPASIM_REGISTRATION_SECRET", "") ACTIVE_REGISTRATION_STATUSES = {"pending", "approved", "registered"} REVIEW_STATUS_CHOICES = ["pending", "approved", "rejected", "all"] LEADERBOARD_HEADERS = [ "Rank", "Team", "Team ID", "Policy Capability Score", "Avg Distance Between At-Fault Incidents", "Image Tag", "Submission ID", "Submitted At", ] LEADERBOARD_SORT_CHOICES = LEADERBOARD_HEADERS MOCKED_HF_USER_IDS = {"11111111111111111111111"} NVIDIA_GREEN = "#76B900" NVIDIA_GREEN_DARK = "#5F9500" BANNER_PATH = "assets/auto-alpasim-end-to-end-closed-loop-driving-1600x400.mp4" BANNER_DIR = Path(__file__).resolve().parent / "assets" BANNER_FILE = Path(__file__).resolve().parent / BANNER_PATH gr.set_static_paths(paths=[BANNER_DIR]) APP_THEME = gr.themes.Default(primary_hue="lime").set( button_primary_background_fill=NVIDIA_GREEN, button_primary_background_fill_hover=NVIDIA_GREEN_DARK, button_primary_text_color="white", loader_color=NVIDIA_GREEN, slider_color=NVIDIA_GREEN, ) BANNER_HTML = f"""
AlpaSim. E2E Closed Loop Challenge
2026 · NVIDIA AV Research · KE:SAI

AlpaSim E2E Closed Loop Challenge

A closed-loop benchmark for evaluating autonomous driving policies.

""" CHALLENGE_OVERVIEW_HTML = """

01 Overview

Welcome to the AlpaSim End to End Closed Loop Challenge. This competition invites teams to build autonomous driving policies and compare them head-to-head in realistic closed-loop simulation, where each policy's decisions shape the future scene it must handle.

Autonomous driving research has made major progress, but it remains hard to compare policies across labs and companies in a realistic, reproducible way. Open-loop evaluation is useful, but it misses the compounding effects that make driving hard: a small planning error can change future observations, interactions, and risk.

AlpaSim provides a shared simulator, public development data, starter tools, baseline policies, and a common containerized submission interface. Organizer-managed evaluation workers run submissions on private held-out scenarios and publish leaderboard results with both a policy capability score and a safety metric.

Across both tracks, the goal is not just to crown a winner. We want to learn which policy families are robust under distribution shift, where they fail, and how the community can make AV evaluation more trustworthy.

02 Challenge Tracks

The competition has two complementary tracks, covering both large-scale geographically diverse driving data and a lower-barrier entry point for teams working with established nuPlan-style workflows.

Track 1 - PAI-AV

Physical AI AV Track

The larger-scale setting for testing whether promising policies hold up as scenario diversity and long-tail coverage increase.

Track 2 - nuPlan

nuPlan Track

A lower-barrier track for teams building on the widely used nuPlan ecosystem or NAVSIM-style development workflows.

03 Timeline

  1. 2026-06-15 Competition goes live Registration, public data, starter tools, and submission instructions open.
  2. 2026-08-15 Mid-competition checkpoint FAQ updates, leaderboard-health notes, and non-breaking clarifications.
  3. 2026-09-15 Rules and submission format freeze Final rules, metric implementation, Docker base image, and public submission format freeze except for critical fixes.
  4. 2026-10-31 Public leaderboard closes Teams select one final valid container per track. Technical reports are due.
  5. 2026-11-15 Final results released Final results and award decisions are released to participants.
  6. NeurIPS 2026 Competition track workshop Winners and selected participants present results; organizers share final analysis and lessons learned.

04 Prizes

Each track will award two NVIDIA DGX Spark prizes: one for first place and one for an innovative solution.

DGX

PAI-AV Track

Two NVIDIA DGX Spark prizes awarded.

DGX

nuPlan Track

Two NVIDIA DGX Spark prizes awarded.

05 Documentation Links

06 Organizers

This competition is hosted by NVIDIA's Autonomous Vehicle Research Group and KE:SAI.

""" APP_EMBED_HTML = """

07 Leaderboard

""" BANNER_CSS = """ html { scroll-behavior: smooth; } body { margin: 0; background: var(--bg); color: var(--text); font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; } :root, .gradio-container { --bg: #ffffff; --panel: #f8f9fb; --panel-2: #f1f3f7; --border: #e4e7ec; --text: #1a1d23; --muted: #6b7280; --button-primary-background-fill: #76B900; --button-primary-background-fill-hover: #5F9500; --button-primary-border-color: #76B900; --button-primary-text-color: white; --color-accent: #76B900; --link-text-color: #1f6feb; --loader-color: #76B900; --slider-color: #76B900; color: var(--text); } .gradio-container { background: var(--bg); max-width: none !important; padding-top: 0 !important; } .contain { max-width: none !important; } .topline { position: sticky; top: 0; z-index: 50; display: flex; align-items: center; justify-content: space-between; gap: 20px; padding: 12px 24px; margin: 0; background: rgba(255, 255, 255, 0.92); backdrop-filter: blur(8px); border-bottom: 1px solid var(--border); } .brand { font-weight: 700; letter-spacing: 0.3px; } .brand span { color: #76B900; } .navlinks { display: flex; gap: 18px; flex-wrap: wrap; } .navlinks a { color: var(--muted); text-decoration: none; } .navlinks a:hover { color: var(--text); text-decoration: none; } .alpasim-hero { position: relative; overflow: hidden; margin: 0; background: #000000; border-bottom: 1px solid var(--border); } .alpasim-hero video { display: block; width: 100%; height: auto; } .hero-overlay { position: absolute; inset: 0; display: flex; align-items: flex-end; background: linear-gradient(180deg, rgba(0, 0, 0, 0.10) 0%, rgba(0, 0, 0, 0.55) 100%); pointer-events: none; } .hero-overlay-inner { width: 100%; max-width: 1100px; margin: 0 auto; padding: 40px 24px; color: #ffffff; } .hero-eyebrow { color: #d6f3a3; font-size: 0.85em; font-weight: 600; letter-spacing: 0.14em; text-transform: uppercase; } .hero-title { margin: 8px 0 10px; color: #ffffff; font-size: clamp(28px, 4vw, 46px); line-height: 1.1; } .hero-lead { max-width: 760px; margin: 0; color: #e7eaef; font-size: 1.05em; } .challenge-info { max-width: 1100px; margin: 0 auto; padding: 0 24px; } .challenge-info section { padding: 48px 0; border-bottom: 1px solid var(--border); scroll-margin-top: 72px; } .challenge-info section:last-child { border-bottom: 0; } .challenge-info h2 { display: flex; align-items: baseline; gap: 12px; margin: 0 0 10px; font-size: 28px; line-height: 1.2; color: var(--text); } .challenge-info h2 span { color: #76B900; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 0.7em; letter-spacing: 0.08em; } .challenge-info .lead, .info-card p, .prize p { color: #3c4250; line-height: 1.6; } .challenge-info .lead { max-width: 860px; margin: 0 0 14px; } .track-grid, .prize-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); gap: 18px; margin-top: 18px; } .info-card { background: var(--panel); border: 1px solid var(--border); border-radius: 10px; padding: 22px; } .info-card h3, .prize h3 { margin: 0 0 8px; font-size: 18px; color: var(--text); } .info-card p, .prize p { margin: 0; } .organizer-card, .doc-card { display: block; color: inherit; text-decoration: none; } .organizer-card:hover, .doc-card:hover { border-color: rgba(118, 185, 0, 0.45); text-decoration: none; } .pill { display: inline-block; margin-bottom: 10px; padding: 3px 9px; border-radius: 999px; background: rgba(118, 185, 0, 0.15); color: #5F9500; border: 1px solid rgba(118, 185, 0, 0.3); font-size: 0.78em; letter-spacing: 0.04em; } .timeline { list-style: none; padding: 0; margin: 18px 0 0; } .timeline li { position: relative; padding: 12px 14px 12px 38px; border-left: 2px solid var(--border); margin-left: 8px; } .timeline li::before { content: ""; position: absolute; left: -7px; top: 18px; width: 12px; height: 12px; border-radius: 50%; background: #76B900; } .timeline .date { color: #5F9500; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 0.85em; letter-spacing: 0.03em; } .timeline b, .timeline .desc { display: block; } .timeline .desc { color: var(--muted); font-size: 0.92em; } .prize { display: flex; align-items: center; gap: 18px; padding: 24px; background: linear-gradient(135deg, rgba(118, 185, 0, 0.10), rgba(118, 185, 0, 0.04)); border: 1px solid rgba(118, 185, 0, 0.35); border-radius: 12px; } .prize .icon { width: 64px; height: 64px; flex: 0 0 64px; display: flex; align-items: center; justify-content: center; border-radius: 14px; background: #76B900; color: #ffffff; font-weight: 700; } .identity-strip { max-width: 1100px; margin: 0 auto; padding: 0 24px 18px; } .identity-strip p { margin: 0; color: var(--muted); } .tabs { max-width: 1100px; margin: 0 auto; } .challenge-app { max-width: 1100px; margin: 0 auto; padding: 48px 24px 72px; scroll-margin-top: 72px; } .challenge-app h2 { display: flex; align-items: baseline; gap: 12px; margin: 0 0 18px; font-size: 28px; } .challenge-app h2 span { color: #76B900; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 0.7em; letter-spacing: 0.08em; } .gradio-frame { display: block; width: 100%; min-height: 940px; border: 1px solid var(--border); border-radius: 8px; background: #ffffff; } @media (max-width: 720px) { .topline { align-items: flex-start; flex-direction: column; gap: 8px; } .navlinks { gap: 12px; } .hero-overlay-inner { padding: 12px 18px; } .hero-eyebrow { font-size: 0.62em; letter-spacing: 0.08em; } .hero-title { margin: 4px 0 0; font-size: clamp(16px, 5vw, 22px); line-height: 1.05; } .hero-lead { display: none; } .challenge-info { padding: 0 18px; } .challenge-info section, .challenge-app { scroll-margin-top: 112px; } .challenge-app { padding: 48px 18px 64px; } .gradio-frame { min-height: 1040px; } .identity-strip { padding: 0 18px 18px; } } """ GRADIO_CSS = """ :root, .gradio-container { --bg: #ffffff; --panel: #f8f9fb; --panel-2: #f1f3f7; --border: #e4e7ec; --text: #1a1d23; --muted: #6b7280; --button-primary-background-fill: #76B900; --button-primary-background-fill-hover: #5F9500; --button-primary-border-color: #76B900; --button-primary-text-color: white; --color-accent: #76B900; --link-text-color: #1f6feb; --loader-color: #76B900; --slider-color: #76B900; color: var(--text); } .gradio-container { background: var(--bg); max-width: none !important; padding-top: 0 !important; } .identity-strip { max-width: 1100px; margin: 0 auto; padding: 0 24px 18px; } .identity-strip p { margin: 0; color: var(--muted); } .tabs { max-width: 1100px; margin: 0 auto; } .leaderboard-controls { margin: 0 0 14px; max-width: 560px; } .leaderboard-controls .form { border: 1px solid var(--border); border-radius: 8px; background: #fbfcfd; padding: 12px; } .leaderboard-controls button { width: 100%; } .leaderboard-table-wrap { width: 100%; overflow-x: auto; border: 1px solid var(--border); border-radius: 8px; background: #ffffff; } .leaderboard-table { width: 100%; border-collapse: collapse; table-layout: fixed; font-size: 12px; } .leaderboard-table th, .leaderboard-table td { padding: 10px 12px; border-bottom: 1px solid #edf0f4; vertical-align: top; color: var(--text); } .leaderboard-table th { position: sticky; top: 0; z-index: 1; background: #f7f9fb; color: #3c4250; font-size: 12px; font-weight: 700; text-align: left; white-space: normal; } .leaderboard-table td { overflow-wrap: anywhere; line-height: 1.35; } .leaderboard-table tbody tr:nth-child(even) { background: #fafbfc; } .leaderboard-table tbody tr:hover { background: #f3f8ea; } .leaderboard-table .rank { width: 6%; } .leaderboard-table .team { width: 18%; } .leaderboard-table .score { width: 14%; } .leaderboard-table .incident-distance { width: 21%; } .leaderboard-table .image-tag { width: 25%; } .leaderboard-table .submitted-at { width: 15%; } .leaderboard-table .number { text-align: right; font-variant-numeric: tabular-nums; } .leaderboard-main, .leaderboard-sub { display: block; } .leaderboard-sub { margin-top: 3px; color: var(--muted); font-size: 11px; line-height: 1.25; } .leaderboard-empty { padding: 24px; color: var(--muted); } @media (max-width: 720px) { .identity-strip { padding: 0 18px 18px; } .leaderboard-controls { max-width: none; } } """ LANDING_HTML = f""" AlpaSim E2E Closed Loop Challenge 2026 {BANNER_HTML} {CHALLENGE_OVERVIEW_HTML} {APP_EMBED_HTML} """ def split_multivalue(value: str) -> list[str]: items: list[str] = [] seen: set[str] = set() for item in re.split(r"[\n,]", value or ""): cleaned = item.strip() if not cleaned or cleaned in seen: continue items.append(cleaned) seen.add(cleaned) return items def profile_value(profile: gr.OAuthProfile, *keys: str) -> str: for key in keys: value = profile.get(key) if value: text = str(value) if key == "sub" and text in MOCKED_HF_USER_IDS: return "" return text return "" def current_identity(profile: gr.OAuthProfile | None) -> str: if profile is None: return ( "To register a team, connect this Space to your Hugging Face account. " "If you are already signed in on huggingface.co, this just authorizes " "the Space to read your Hugging Face profile. " "[Connect Hugging Face account](/login/huggingface?_target_url=/gradio/)" ) username = profile_value(profile, "preferred_username", "name") user_id = profile_value(profile, "sub") email = profile_value(profile, "email") parts = [f"Connected to this Space as Hugging Face user `{username}`."] if user_id: parts.append(f"Stable HF user ID: `{user_id}`.") if email: parts.append(f"HF email: `{email}`.") parts.append("[Disconnect](/logout?_target_url=/gradio/).") return " ".join(parts) def empty_review_state() -> tuple[list[list[str]], list[dict[str, Any]], object, str, str]: return ( [], [], gr.update(choices=[], value=None), "No registration selected.", "", ) def load_initial_review_state( profile: gr.OAuthProfile | None, ) -> tuple[object, list[list[str]], list[dict[str, Any]], object, str, str]: if profile is None or not REGISTRATION_SECRET: rows, registrations, selected, details, result = empty_review_state() return gr.update(visible=False), rows, registrations, selected, details, result try: body = review_api_json( "GET", "/review/registrations", profile, query={"status": "pending"}, ) except (urllib.error.HTTPError, urllib.error.URLError, gr.Error): rows, registrations, selected, details, result = empty_review_state() return gr.update(visible=False), rows, registrations, selected, details, result registrations = body.get("registrations") or [] choices = registration_choices(registrations) selected = choices[0] if choices else None item = find_registration(registrations, selected_team_id(selected)) result = f"Loaded {body.get('count', len(registrations))} pending registration(s)." return ( gr.update(visible=True), registration_rows(registrations), registrations, gr.update(choices=choices, value=selected), review_details(item), result, ) def session_state( profile: gr.OAuthProfile | None, ) -> tuple[ str, object, list[list[str]], list[dict[str, Any]], object, str, str, ]: review_state = load_initial_review_state(profile) return (current_identity(profile), *review_state) def api_json(method: str, path: str, payload: dict | None = None) -> dict: if not API_BASE_URL: raise gr.Error("Space is missing ALPASIM_API_BASE_URL.") data = None if payload is None else json.dumps(payload).encode("utf-8") request = urllib.request.Request( f"{API_BASE_URL}{path}", data=data, method=method, headers={ "Content-Type": "application/json", "Accept": "application/json", "X-AlpaSim-Registration-Secret": REGISTRATION_SECRET, }, ) with urllib.request.urlopen(request, timeout=15) as response: return json.loads(response.read().decode("utf-8")) def api_error_summary(action: str, exc: Exception) -> str: if isinstance(exc, urllib.error.HTTPError): details = exc.read().decode("utf-8", errors="replace") return f"{action} failed with HTTP {exc.code}. {details}".strip() if isinstance(exc, urllib.error.URLError): return f"{action} failed: {exc.reason}" return f"{action} failed: {exc}" def has_active_registered_team(user: dict[str, Any] | None) -> bool: if not user: return False team_id = str(user.get("team_id") or "") status = str(user.get("registration_status") or "").strip().lower() return bool(team_id) and status != "disabled" def registered_team_status_detail( profile: gr.OAuthProfile | None, ) -> tuple[str, str, bool]: if profile is None: return "Sign in with Hugging Face to view team access.", "", False if not REGISTRATION_SECRET: return "Registration Space is missing ALPASIM_REGISTRATION_SECRET.", "", False hf_user_id = profile_value(profile, "sub") hf_username = profile_value(profile, "preferred_username", "name") if not hf_user_id or not hf_username: return "Could not read your Hugging Face identity.", "", False query = urllib.parse.urlencode( { "hf_user_id": hf_user_id, "hf_username": hf_username, } ) try: body = api_json("GET", f"/registrations/submitter-invitations?{query}") except urllib.error.HTTPError as exc: details = exc.read().decode("utf-8", errors="replace") return f"Could not load team access status. HTTP {exc.code}.", details, False except urllib.error.URLError as exc: return f"Could not load team access status: {exc.reason}", "", False registered_user = body.get("registered_user") if not has_active_registered_team(registered_user): return ( "No approved team access found for your Hugging Face account.", json.dumps(body, indent=2), False, ) team_id = registered_user.get("team_id") role = registered_user.get("role") or "member" status = registered_user.get("registration_status") or "unknown" return ( f"You already have `{status}` access to team `{team_id}` as `{role}`.", json.dumps(body, indent=2), True, ) def registration_status_detail(profile: gr.OAuthProfile | None) -> tuple[str, str, bool]: if profile is None: return "Sign in with Hugging Face to view registration status.", "", False if not REGISTRATION_SECRET: return "Registration Space is missing ALPASIM_REGISTRATION_SECRET.", "", False captain_hf_user_id = profile_value(profile, "sub") if not captain_hf_user_id: return "Could not read your stable Hugging Face user ID.", "", False query = urllib.parse.urlencode({"captain_hf_user_id": captain_hf_user_id}) try: body = api_json("GET", f"/registrations/me?{query}") except urllib.error.HTTPError as exc: details = exc.read().decode("utf-8", errors="replace") return f"Could not load registration status. HTTP {exc.code}.", details, False except urllib.error.URLError as exc: return f"Could not load registration status: {exc.reason}", "", False registrations = body.get("registrations") or [] if not registrations: return ( "No team registration found for your Hugging Face account.", json.dumps(body, indent=2), False, ) latest = registrations[0] status = str(latest.get("registration_status") or "unknown").strip().lower() team = latest.get("team_id") or "" display_name = latest.get("team_display_name") or team if status == "pending": summary = f"Registration pending review: {display_name} ({team})." elif status == "approved": summary = f"Team approved: {display_name} ({team})." elif status == "registered": summary = f"Team registered: {display_name} ({team})." elif status == "rejected": reason = latest.get("rejection_reason") or "No reason provided." summary = f"Registration rejected: {display_name} ({team}). Reason: {reason}" else: summary = f"Registration status for {display_name} ({team}): {status}." return summary, json.dumps(body, indent=2), status in ACTIVE_REGISTRATION_STATUSES def registration_panel_status(profile: gr.OAuthProfile | None): summary, raw_json, has_active_registration = registration_status_detail(profile) needs_connection = profile is None if not needs_connection and not has_active_registration: member_summary, member_raw_json, has_active_team = registered_team_status_detail( profile ) if has_active_team: summary = member_summary raw_json = member_raw_json has_active_registration = True return ( summary, raw_json, gr.update(visible=not needs_connection and not has_active_registration), ) def submitter_invitation_status( profile: gr.OAuthProfile | None, ) -> tuple[str, list[dict[str, Any]], object, object]: if profile is None: return ( "Connect your Hugging Face account to check submitter invitations.", [], gr.update(choices=[], value=None), gr.update(visible=False), ) if not REGISTRATION_SECRET: return ( "Registration Space is missing ALPASIM_REGISTRATION_SECRET.", [], gr.update(choices=[], value=None), gr.update(visible=False), ) hf_user_id = profile_value(profile, "sub") hf_username = profile_value(profile, "preferred_username", "name") if not hf_user_id or not hf_username: return ( "Could not read your Hugging Face identity.", [], gr.update(choices=[], value=None), gr.update(visible=False), ) query = urllib.parse.urlencode( { "hf_user_id": hf_user_id, "hf_username": hf_username, } ) try: body = api_json("GET", f"/registrations/submitter-invitations?{query}") except (urllib.error.HTTPError, urllib.error.URLError) as exc: return api_error_summary("Loading submitter invitations", exc), [], gr.update(choices=[], value=None), gr.update(visible=False) registered_user = body.get("registered_user") or {} invitations = body.get("invitations") or [] choices = registration_choices(invitations) if invitations: summary = ( f"You are listed as an approved submitter for {len(invitations)} team(s). " "Select a team and join it to enable challenge CLI, ECR login, and submissions." ) elif has_active_registered_team(registered_user): summary = "" else: summary = "No approved submitter invitations found for your Hugging Face username." return ( summary, invitations, gr.update(choices=choices, value=choices[0] if choices else None), gr.update(visible=bool(choices)), ) def claim_submitter_invitation( choice: str, profile: gr.OAuthProfile | None, ) -> str: if profile is None: raise gr.Error("Connect your Hugging Face account first.") team_id = selected_team_id(choice) if not team_id: raise gr.Error("Select a team invitation first.") payload = { "team_id": team_id, "hf_user_id": profile_value(profile, "sub"), "hf_username": profile_value(profile, "preferred_username", "name"), "email": profile_value(profile, "email"), } try: body = api_json("POST", "/registrations/claim-submitter", payload) except (urllib.error.HTTPError, urllib.error.URLError) as exc: raise gr.Error(api_error_summary(f"Joining {team_id}", exc)) from exc return str(body.get("message") or f"Joined {team_id}.") def leaderboard_html(rows: list[list[Any]]) -> str: if not rows: return '
No successful submissions yet.
' visible_headers = [ "Rank", "Team", "Policy Capability Score", "Avg Distance Between At-Fault Incidents", "Image Tag / Submission ID", "Submitted At", ] column_classes = [ "rank number", "team", "score number", "incident-distance number", "image-tag", "submitted-at", ] header_cells = "".join( f'{html.escape(header)}' for header, css_class in zip(visible_headers, column_classes) ) body_rows = [] for row in rows: rank, team, team_id, score, incident_distance, image_tag, submission_id, submitted_at = row def escaped_text(value: Any) -> str: return html.escape("" if value is None else str(value)) team_title = html.escape( " / ".join(str(value) for value in (team, team_id) if value not in ("", None)) ) image_title = html.escape( " / ".join(str(value) for value in (image_tag, submission_id) if value not in ("", None)) ) cells = [ f'{escaped_text(rank)}', ( f'' f'{escaped_text(team)}' f'{escaped_text(team_id)}' "" ), f'{escaped_text(score)}', ( '{escaped_text(incident_distance)}' ), ( f'' f'{escaped_text(image_tag)}' f'{escaped_text(submission_id)}' "" ), ( f'' f"{escaped_text(submitted_at)}" ), ] body_rows.append(f"{''.join(cells)}") return ( '
' '' f"{header_cells}" f"{''.join(body_rows)}" "
" "
" ) def leaderboard( show_all_successful: bool = False, sort_by: str = "Rank", sort_ascending: bool = True, track: str = "pai", ) -> tuple[str, str, str]: try: body = api_json("GET", f"/leaderboard?{urllib.parse.urlencode({'track': track})}") except urllib.error.HTTPError as exc: details = exc.read().decode("utf-8", errors="replace") return f"Could not load leaderboard. HTTP {exc.code}.", leaderboard_html([]), details except urllib.error.URLError as exc: return f"Could not load leaderboard: {exc.reason}", leaderboard_html([]), "" entries = body.get("leaderboard") or [] if not show_all_successful: score_order = str(body.get("score_order") or "desc").lower() best_by_team: dict[str, dict[str, Any]] = {} for index, entry in enumerate(entries): team_id = str( entry.get("team_id") or entry.get("team_display_name") or entry.get("submission_id") or f"row-{index}" ) policy_score = entry.get("policy_capability_score") if policy_score is None: policy_score = entry.get("best_score") try: numeric_score = float(policy_score) except (TypeError, ValueError): numeric_score = None current = best_by_team.get(team_id) current_score = current.get("_numeric_policy_score") if current else None if current is None: best_entry = dict(entry) best_entry["_numeric_policy_score"] = numeric_score best_entry["_leaderboard_source_index"] = index best_by_team[team_id] = best_entry elif numeric_score is not None and ( current_score is None or (score_order == "asc" and numeric_score < current_score) or (score_order != "asc" and numeric_score > current_score) ): best_entry = dict(entry) best_entry["_numeric_policy_score"] = numeric_score best_entry["_leaderboard_source_index"] = index best_by_team[team_id] = best_entry entries = sorted( best_by_team.values(), key=lambda entry: entry.get("_leaderboard_source_index", 0), ) def metric_value(value: Any) -> float | str: if value is None: return "" try: return round(float(value), 3) except (TypeError, ValueError): return str(value) rows: list[list[Any]] = [] for row_rank, entry in enumerate(entries, start=1): displayed_rank = (entry.get("rank") or row_rank) if show_all_successful else row_rank policy_score = entry.get("policy_capability_score") if policy_score is None: policy_score = entry.get("best_score") incident_distance = entry.get("avg_dist_between_incidents_at_fault") if incident_distance is None: incident_distance = entry.get("legacy_score") rows.append( [ displayed_rank, entry.get("team_display_name") or entry.get("team_id") or "", entry.get("team_id") or "", metric_value(policy_score), metric_value(incident_distance), entry.get("image_tag") or "", entry.get("submission_id") or "", entry.get("submitted_at") or "", ] ) if sort_by in LEADERBOARD_HEADERS: sort_index = LEADERBOARD_HEADERS.index(sort_by) def sort_key(row: list[Any]) -> Any: value = row[sort_index] if isinstance(value, (int, float)): return value return str(value).lower() populated_rows = [row for row in rows if row[sort_index] not in ("", None)] empty_rows = [row for row in rows if row[sort_index] in ("", None)] rows = sorted(populated_rows, key=sort_key, reverse=not sort_ascending) + empty_rows summary = ( f"{track.upper()} leaderboard entries: {len(entries)} shown" f" out of {body.get('count', 0)}. " f"Score order: {body.get('score_order', 'desc')}." ) return summary, leaderboard_html(rows), json.dumps(body, indent=2) def submit_registration( team_display_name: str, team_id: str, captain_name: str, contact_email: str, additional_submitters: str, credited_contributors: str, organization: str, country_region: str, notes: str, roster_ack: bool, limit_ack: bool, rules_ack: bool, profile: gr.OAuthProfile | None, ) -> tuple[str, str, str, object]: if profile is None: return "Connect your Hugging Face account first.", "", "", gr.update(visible=False) if not REGISTRATION_SECRET: return "Registration Space is missing ALPASIM_REGISTRATION_SECRET.", "", "", gr.update(visible=True) if not roster_ack or not limit_ack or not rules_ack: return "Please accept all required acknowledgments.", "", "", gr.update(visible=True) status_summary, raw_json, form_update = registration_panel_status(profile) if isinstance(form_update, dict) and form_update.get("visible") is False: return ( "You already have an active team registration or team access.", raw_json, status_summary, form_update, ) captain_hf_user_id = profile_value(profile, "sub") captain_hf_username = profile_value(profile, "preferred_username", "name") captain_email = contact_email.strip() or profile_value(profile, "email") payload = { "team_display_name": team_display_name.strip(), "team_id": team_id.strip(), "captain_name": captain_name.strip(), "captain_email": captain_email, "captain_hf_user_id": captain_hf_user_id, "captain_hf_username": captain_hf_username, "submitter_hf_usernames": split_multivalue(additional_submitters), "credited_contributors": split_multivalue(credited_contributors), "organization": organization.strip(), "country_region": country_region.strip(), "notes": notes.strip(), } try: body = api_json("POST", "/registrations", payload) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") status_summary, _, form_update = registration_panel_status(profile) return f"Registration failed with HTTP {exc.code}.", body, status_summary, form_update except urllib.error.URLError as exc: status_summary, _, form_update = registration_panel_status(profile) return f"Registration request failed: {exc.reason}", "", status_summary, form_update status_summary, _, form_update = registration_panel_status(profile) return "Registration submitted for admin review.", json.dumps(body, indent=2), status_summary, form_update def reviewer_identity(profile: gr.OAuthProfile | None) -> dict[str, str]: if profile is None: raise gr.Error("Sign in with Hugging Face before reviewing registrations.") reviewer_hf_user_id = profile_value(profile, "sub") reviewer_hf_username = profile_value(profile, "preferred_username", "name") if not reviewer_hf_user_id: raise gr.Error("Could not read your stable Hugging Face user ID.") return { "reviewer_hf_user_id": reviewer_hf_user_id, "reviewer_hf_username": reviewer_hf_username, } def review_api_json( method: str, path: str, profile: gr.OAuthProfile | None, payload: dict | None = None, query: dict | None = None, ) -> dict: reviewer = reviewer_identity(profile) query_with_reviewer = {**reviewer, **(query or {})} encoded_query = urllib.parse.urlencode(query_with_reviewer) path_with_query = f"{path}?{encoded_query}" if encoded_query else path payload_with_reviewer = {**reviewer, **(payload or {})} if payload is not None else None return api_json(method, path_with_query, payload_with_reviewer) def registration_rows(registrations: list[dict[str, Any]]) -> list[list[str]]: return [ [ str(item.get("team_id") or ""), str(item.get("team_display_name") or ""), str(item.get("registration_status") or ""), str(item.get("captain_hf_username") or ""), str(item.get("organization") or ""), str(item.get("updated_at") or item.get("created_at") or ""), ] for item in registrations ] def registration_choices(registrations: list[dict[str, Any]]) -> list[str]: choices = [] for item in registrations: team_id = str(item.get("team_id") or "") display_name = str(item.get("team_display_name") or team_id) if team_id: choices.append(f"{team_id} | {display_name}") return choices def selected_team_id(choice: str | None) -> str: return (choice or "").split("|", 1)[0].strip() def find_registration( registrations: list[dict[str, Any]], team_id: str, ) -> dict[str, Any] | None: for item in registrations: if str(item.get("team_id") or "") == team_id: return item return None def format_list(value: Any) -> str: if isinstance(value, list): return ", ".join(str(part) for part in value if str(part)) or "-" return str(value) if value not in (None, "") else "-" def review_details(item: dict[str, Any] | None) -> str: if not item: return "No registration selected." def value(key: str) -> str: return format_list(item.get(key)) lines = [ f"### {value('team_display_name')} (`{value('team_id')}`)", f"Status: `{value('registration_status')}`", f"Source: `{value('registration_source')}`", "", f"Captain: {value('captain_name')} / `{value('captain_hf_username')}`", f"Captain email: {value('captain_email')}", f"Captain HF user ID: `{value('captain_hf_user_id')}`", "", f"Submitters: {value('submitter_hf_usernames')}", f"Credited contributors: {value('credited_contributors')}", f"Organization: {value('organization')}", f"Country/region: {value('country_region')}", "", f"Notes: {value('notes')}", "", f"Created: {value('created_at')}", f"Updated: {value('updated_at')}", f"Approved: {value('approved_at')}", ] if item.get("rejection_reason"): lines.append(f"Rejection reason: {value('rejection_reason')}") if item.get("reviewed_by_hf_username") or item.get("reviewed_by_hf_user_id"): lines.append( "Reviewed by: " f"{value('reviewed_by_hf_username')} (`{value('reviewed_by_hf_user_id')}`)" ) return "\n".join(lines) def refresh_review_registrations( status_filter: str, profile: gr.OAuthProfile | None, ) -> tuple[list[list[str]], list[dict[str, Any]], object, str, str]: if not REGISTRATION_SECRET: raise gr.Error("Registration Space is missing ALPASIM_REGISTRATION_SECRET.") try: body = review_api_json( "GET", "/review/registrations", profile, query={"status": status_filter or "pending"}, ) except (urllib.error.HTTPError, urllib.error.URLError) as exc: raise gr.Error(api_error_summary("Loading review registrations", exc)) from exc registrations = body.get("registrations") or [] choices = registration_choices(registrations) selected = choices[0] if choices else None item = find_registration(registrations, selected_team_id(selected)) result = f"Loaded {body.get('count', len(registrations))} registration(s)." return ( registration_rows(registrations), registrations, gr.update(choices=choices, value=selected), review_details(item), result, ) def refresh_review_registrations_without_result( status_filter: str, profile: gr.OAuthProfile | None, ) -> tuple[list[list[str]], list[dict[str, Any]], object, str]: rows, registrations, selected, details, _ = refresh_review_registrations( status_filter, profile, ) return rows, registrations, selected, details def select_review_registration( choice: str, registrations: list[dict[str, Any]], ) -> str: item = find_registration(registrations or [], selected_team_id(choice)) return review_details(item) def approve_review_registration( choice: str, profile: gr.OAuthProfile | None, ) -> str: team_id = selected_team_id(choice) if not team_id: raise gr.Error("Select a registration first.") try: body = review_api_json( "POST", "/review/registrations/approve", profile, {"team_id": team_id}, ) except (urllib.error.HTTPError, urllib.error.URLError) as exc: raise gr.Error(api_error_summary(f"Approving {team_id}", exc)) from exc return str(body.get("message") or f"Approved {team_id}.") def reject_review_registration( choice: str, reason: str, profile: gr.OAuthProfile | None, ) -> str: team_id = selected_team_id(choice) if not team_id: raise gr.Error("Select a registration first.") reason = reason.strip() if not reason: raise gr.Error("Enter a rejection reason.") try: body = review_api_json( "POST", "/review/registrations/reject", profile, {"team_id": team_id, "reason": reason}, ) except (urllib.error.HTTPError, urllib.error.URLError) as exc: raise gr.Error(api_error_summary(f"Rejecting {team_id}", exc)) from exc return str(body.get("message") or f"Rejected {team_id}.") def add_oauth_route_marker() -> None: if os.environ.get("SYSTEM") != "spaces" and not os.environ.get("OAUTH_CLIENT_ID"): return original_activate = gr.LoginButton.activate gr.LoginButton.activate = lambda self: None try: gr.LoginButton(visible=False) finally: gr.LoginButton.activate = original_activate with gr.Blocks( title="AlpaSim E2E Closed Loop Challenge 2026", theme=APP_THEME, css=GRADIO_CSS, ) as demo: add_oauth_route_marker() with gr.Group(elem_classes=["identity-strip"]): identity = gr.Markdown() with gr.Group(visible=False) as reviewer_app: with gr.Tabs(selected="registration"): with gr.Tab("Registration", id="registration"): registration_summary = gr.Markdown() submitter_invitation_summary = gr.Markdown() submitter_invitation_state = gr.State([]) with gr.Group(visible=False) as registration_form: gr.Markdown( "If the captain's Hugging Face account uses a personal email address, " "enter an organization email below. Before accepting the application, " "the AlpaSim team will email that organization address for confirmation." ) with gr.Row(): team_display_name = gr.Textbox( label="Team Display Name", placeholder="Team Alpha", max_lines=1, ) team_id = gr.Textbox( label="Requested Team ID", placeholder="team-alpha", max_lines=1, info="Lowercase letters, digits, hyphens. 3-40 characters.", ) with gr.Row(): captain_name = gr.Textbox(label="Captain Full Name", max_lines=1) contact_email = gr.Textbox( label="Captain Contact / Organization Email", max_lines=1, info=( "Use an organization email if your Hugging Face account " "email is personal. Organizers may email this address " "before accepting the application." ), ) additional_submitters = gr.Textbox( label="Additional Submitter Hugging Face Usernames", lines=4, placeholder="one username per line, or comma-separated", ) credited_contributors = gr.Textbox( label="Credited Contributors", lines=4, placeholder="one name per line, or comma-separated", ) with gr.Row(): organization = gr.Textbox(label="Organization / Affiliation", max_lines=1) country_region = gr.Textbox(label="Country / Region", max_lines=1) notes = gr.Textbox(label="Notes To Organizers", lines=3) roster_ack = gr.Checkbox( label="I understand that the submitter roster is frozen after approval except by administrator exception." ) limit_ack = gr.Checkbox( label="I understand that submissions are limited at the team level and should only be used for images we want evaluated." ) rules_ack = gr.Checkbox( label="I agree to follow the AlpaSim Challenge rules and organizer instructions." ) submit = gr.Button("Submit Registration", variant="primary") status = gr.Textbox(label="Status", interactive=False) response = gr.Code(label="Response", language="json", visible=False) with gr.Group(visible=False) as submitter_invitation_panel: submitter_invitation_choice = gr.Dropdown([], label="Submitter Invitation") claim_submitter = gr.Button("Join Team", variant="primary") claim_submitter_result = gr.Textbox(label="Result", interactive=False) with gr.Tab("PAI Leaderboard", id="leaderboard"): leaderboard_summary = gr.Markdown() with gr.Group(elem_classes=["leaderboard-controls"]): leaderboard_sort_by = gr.Dropdown( LEADERBOARD_SORT_CHOICES, value="Rank", label="Sort by", ) with gr.Row(): show_all_successful = gr.Checkbox( label="All successful submissions", value=False, scale=1, ) leaderboard_sort_ascending = gr.Checkbox( label="Ascending sort", value=True, scale=1, ) refresh_leaderboard = gr.Button("Refresh", variant="primary") leaderboard_table = gr.HTML() leaderboard_response = gr.Code( label="Leaderboard JSON", language="json", visible=False, ) with gr.Tab("nuPlan Leaderboard", id="nuplan-leaderboard"): nuplan_leaderboard_summary = gr.Markdown() with gr.Group(elem_classes=["leaderboard-controls"]): nuplan_leaderboard_sort_by = gr.Dropdown( LEADERBOARD_SORT_CHOICES, value="Rank", label="Sort by", ) with gr.Row(): nuplan_show_all_successful = gr.Checkbox( label="All successful submissions", value=False, scale=1, ) nuplan_leaderboard_sort_ascending = gr.Checkbox( label="Ascending sort", value=True, scale=1, ) refresh_nuplan_leaderboard = gr.Button("Refresh", variant="primary") nuplan_leaderboard_table = gr.HTML() nuplan_leaderboard_response = gr.Code( label="Leaderboard JSON", language="json", visible=False, ) with gr.Tab("Review", id="review"): gr.Markdown("Reviewer access is limited to active AlpaSim reviewers.") review_state = gr.State([]) with gr.Row(): review_status_filter = gr.Dropdown( REVIEW_STATUS_CHOICES, value="pending", label="Status", scale=1, ) refresh_review = gr.Button("Refresh", variant="primary", scale=1) review_table = gr.Dataframe( headers=[ "team_id", "display_name", "status", "captain_hf_username", "organization", "updated", ], datatype=["str", "str", "str", "str", "str", "str"], interactive=False, wrap=True, ) selected_review_registration = gr.Dropdown([], label="Selected Registration") review_registration_details = gr.Markdown("No registration selected.") approve_review = gr.Button("Approve", variant="primary") with gr.Accordion("Reject", open=False): review_rejection_reason = gr.Textbox(label="Reason", lines=3) reject_review = gr.Button("Reject", variant="stop") review_result = gr.Textbox(label="Result", interactive=False) demo.load( session_state, inputs=None, outputs=[ identity, reviewer_app, review_table, review_state, selected_review_registration, review_registration_details, review_result, ], ) demo.load( registration_panel_status, inputs=None, outputs=[ registration_summary, response, registration_form, ], ) demo.load( submitter_invitation_status, inputs=None, outputs=[ submitter_invitation_summary, submitter_invitation_state, submitter_invitation_choice, submitter_invitation_panel, ], ) demo.load( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "pai"), inputs=[show_all_successful, leaderboard_sort_by, leaderboard_sort_ascending], outputs=[leaderboard_summary, leaderboard_table, leaderboard_response], ) refresh_leaderboard.click( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "pai"), inputs=[show_all_successful, leaderboard_sort_by, leaderboard_sort_ascending], outputs=[leaderboard_summary, leaderboard_table, leaderboard_response], ) show_all_successful.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "pai"), inputs=[show_all_successful, leaderboard_sort_by, leaderboard_sort_ascending], outputs=[leaderboard_summary, leaderboard_table, leaderboard_response], ) leaderboard_sort_by.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "pai"), inputs=[show_all_successful, leaderboard_sort_by, leaderboard_sort_ascending], outputs=[leaderboard_summary, leaderboard_table, leaderboard_response], ) leaderboard_sort_ascending.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "pai"), inputs=[show_all_successful, leaderboard_sort_by, leaderboard_sort_ascending], outputs=[leaderboard_summary, leaderboard_table, leaderboard_response], ) demo.load( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "nuplan"), inputs=[ nuplan_show_all_successful, nuplan_leaderboard_sort_by, nuplan_leaderboard_sort_ascending, ], outputs=[ nuplan_leaderboard_summary, nuplan_leaderboard_table, nuplan_leaderboard_response, ], ) refresh_nuplan_leaderboard.click( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "nuplan"), inputs=[ nuplan_show_all_successful, nuplan_leaderboard_sort_by, nuplan_leaderboard_sort_ascending, ], outputs=[ nuplan_leaderboard_summary, nuplan_leaderboard_table, nuplan_leaderboard_response, ], ) nuplan_show_all_successful.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "nuplan"), inputs=[ nuplan_show_all_successful, nuplan_leaderboard_sort_by, nuplan_leaderboard_sort_ascending, ], outputs=[ nuplan_leaderboard_summary, nuplan_leaderboard_table, nuplan_leaderboard_response, ], ) nuplan_leaderboard_sort_by.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "nuplan"), inputs=[ nuplan_show_all_successful, nuplan_leaderboard_sort_by, nuplan_leaderboard_sort_ascending, ], outputs=[ nuplan_leaderboard_summary, nuplan_leaderboard_table, nuplan_leaderboard_response, ], ) nuplan_leaderboard_sort_ascending.change( lambda show_all, sort_by, ascending: leaderboard(show_all, sort_by, ascending, "nuplan"), inputs=[ nuplan_show_all_successful, nuplan_leaderboard_sort_by, nuplan_leaderboard_sort_ascending, ], outputs=[ nuplan_leaderboard_summary, nuplan_leaderboard_table, nuplan_leaderboard_response, ], ) submit.click( submit_registration, inputs=[ team_display_name, team_id, captain_name, contact_email, additional_submitters, credited_contributors, organization, country_region, notes, roster_ack, limit_ack, rules_ack, ], outputs=[ status, response, registration_summary, registration_form, ], ) claim_submitter.click( claim_submitter_invitation, inputs=[submitter_invitation_choice], outputs=[claim_submitter_result], ).then( submitter_invitation_status, inputs=None, outputs=[ submitter_invitation_summary, submitter_invitation_state, submitter_invitation_choice, submitter_invitation_panel, ], ).then( registration_panel_status, inputs=None, outputs=[ registration_summary, response, registration_form, ], ) refresh_review.click( refresh_review_registrations, inputs=[review_status_filter], outputs=[ review_table, review_state, selected_review_registration, review_registration_details, review_result, ], ) review_status_filter.change( refresh_review_registrations, inputs=[review_status_filter], outputs=[ review_table, review_state, selected_review_registration, review_registration_details, review_result, ], ) selected_review_registration.change( select_review_registration, inputs=[selected_review_registration, review_state], outputs=[review_registration_details], ) approve_review.click( approve_review_registration, inputs=[ selected_review_registration, ], outputs=[review_result], ).then( refresh_review_registrations_without_result, inputs=[review_status_filter], outputs=[ review_table, review_state, selected_review_registration, review_registration_details, ], ) reject_review.click( reject_review_registration, inputs=[selected_review_registration, review_rejection_reason], outputs=[review_result], ).then( refresh_review_registrations_without_result, inputs=[review_status_filter], outputs=[ review_table, review_state, selected_review_registration, review_registration_details, ], ) app = FastAPI(title="AlpaSim E2E Closed Loop Challenge 2026") app.mount("/assets", StaticFiles(directory=str(BANNER_DIR)), name="assets") @app.get("/", response_class=HTMLResponse, include_in_schema=False) def landing_page() -> HTMLResponse: return HTMLResponse(LANDING_HTML) app = gr.mount_gradio_app( app, demo, path="/gradio", allowed_paths=[str(BANNER_FILE)], ) for route in app.routes: if getattr(route, "path", None) == "/gradio" and hasattr(route, "app"): # Gradio Spaces OAuth redirects to /login/callback even when the app is # mounted under /gradio. Mount the same ASGI app at root after the exact # landing-page route so those OAuth endpoints still resolve. app.mount("/", route.app) break else: raise RuntimeError("Unable to find mounted Gradio app for OAuth routes.") if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")), )