alexgshaw's picture
Replace snapshot_download with level-by-level listing for agent files.
e419e83
"""
Terminal-Bench Leaderboard Importer
A HuggingFace Space that:
1. Validates submissions on PR (posts comment with validation results)
2. Imports validated submissions to Supabase on merge
"""
import asyncio
import json
import logging
import os
import re
import tarfile
import tempfile
from collections import defaultdict
from dataclasses import dataclass, field
from decimal import Decimal
from pathlib import Path
from typing import Any, TypedDict, cast
from uuid import UUID
import gradio as gr
import harbor
import yaml
from harbor.models.job.result import JobResult
# NOTE: We intentionally don't use JobConfig or TrialResult for parsing because
# they contain EnvironmentType enum which rejects custom environment types.
# Instead, we parse only the specific fields we need from raw dicts.
from huggingface_hub import (
HfApi,
RepoFile,
RepoFolder,
WebhookPayload,
WebhooksServer,
hf_hub_download,
)
from litellm import model_cost
from pydantic import UUID4, BaseModel, ConfigDict, Field, ValidationError
from supabase._async.client import AsyncClient, create_client as create_async_client
from tenacity import retry, stop_after_attempt, wait_exponential
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
DATASET_REPO = "harborframework/terminal-bench-2-leaderboard"
SUBMISSIONS_PATH = "submissions/terminal-bench/2.0"
MAX_CONCURRENT_DOWNLOADS = 20
MIN_TRIALS_PER_TASK = 5
EXPECTED_TASK_COUNT = 89
MAX_UPLOAD_WORKERS = 8
# ============================================================================
# Type Definitions
# ============================================================================
class ImportStats(TypedDict):
jobs_imported: int
trials_imported: int
errors: list[str]
class TotalStats(TypedDict):
jobs: int
trials: int
errors: list[str]
# ============================================================================
# Pydantic Models for DB inserts
# ============================================================================
class AgentInsert(BaseModel):
name: str
version: str
created_at: Any | None = None
description: str | None = None
display_name: str | None = None
org_display_name: str | None = None
url: str | None = None
class JobInsert(BaseModel):
id: UUID4
config: dict | list[dict] | list[Any]
job_name: str
n_trials: int
username: str
created_at: Any | None = None
ended_at: Any | None = None
git_commit_id: str | None = None
include_on_leaderboard: bool | None = None
metrics: dict | list[dict] | list[Any] | None = None
package_version: str | None = None
started_at: Any | None = None
stats: dict | list[dict] | list[Any] | None = None
verified: bool | None = None
class ModelInsert(BaseModel):
name: str
provider: str
cents_per_million_cache_tokens: Decimal | None = None
cents_per_million_input_tokens: int | None = None
cents_per_million_output_tokens: int | None = None
created_at: Any | None = None
description: str | None = None
display_name: str | None = None
org_display_name: str | None = None
class TrialInsert(BaseModel):
id: UUID4
agent_name: str
agent_version: str
config: dict | list[dict] | list[Any]
task_checksum: str
trial_name: str
agent_execution_ended_at: Any | None = None
agent_execution_started_at: Any | None = None
agent_metadata: dict | list[dict] | list[Any] | None = None
agent_setup_ended_at: Any | None = None
agent_setup_started_at: Any | None = None
created_at: Any | None = None
ended_at: Any | None = None
environment_setup_ended_at: Any | None = None
environment_setup_started_at: Any | None = None
exception_info: dict | list[dict] | list[Any] | None = None
job_id: UUID4 | None = None
reward: Decimal | None = None
started_at: Any | None = None
trial_uri: str | None = None
verifier_ended_at: Any | None = None
verifier_started_at: Any | None = None
class TrialModelInsert(BaseModel):
model_config = ConfigDict(populate_by_name=True)
model_name: str = Field(alias="model_name")
model_provider: str = Field(alias="model_provider")
trial_id: UUID4
created_at: Any | None = None
n_cache_tokens: int | None = None
n_input_tokens: int | None = None
n_output_tokens: int | None = None
# ============================================================================
# Metadata Schema
# ============================================================================
class ModelMetadata(BaseModel):
model_name: str
model_provider: str
model_display_name: str | None = None
model_org_display_name: str | None = None
class SubmissionMetadata(BaseModel):
agent_url: str
models: list[ModelMetadata]
agent_display_name: str | None = None
agent_org_display_name: str | None = None
# ============================================================================
# Helpers
# ============================================================================
def find_metadata_file(directory: Path) -> Path | None:
"""Find metadata file in a directory, supporting both .yaml and .yml extensions."""
for ext in ("yaml", "yml"):
path = directory / f"metadata.{ext}"
if path.exists():
return path
return None
def has_metadata_file(directory: Path) -> bool:
"""Check if a directory has a metadata file (.yaml or .yml)."""
return find_metadata_file(directory) is not None
def get_job_dirs(submission_dir: Path) -> list[Path]:
"""Get job directories (those with config.json) from a submission directory."""
return [
d
for d in submission_dir.iterdir()
if d.is_dir() and (d / "config.json").exists()
]
def get_trial_dirs(job_dir: Path) -> list[Path]:
"""Get trial directories (those with result.json) from a job directory."""
return [d for d in job_dir.iterdir() if d.is_dir() and (d / "result.json").exists()]
def load_json(path: Path) -> dict:
"""Load a JSON file."""
with open(path) as f:
return json.load(f)
def load_yaml(path: Path) -> dict:
"""Load a YAML file."""
with open(path) as f:
return yaml.safe_load(f)
async def upload_trial_to_storage(
client: AsyncClient, trial_dir: Path, trial_id: str
) -> str | None:
"""Upload trial directory as tar.gz and trajectory.json to Supabase storage.
Returns:
Public URL of the trial archive in storage, or None if upload failed.
"""
bucket_name = "trials"
with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file:
tmp_path = Path(tmp_file.name)
try:
# Create tar.gz archive of the trial directory
logger.info(f"Creating tar.gz archive for trial {trial_id}...")
with tarfile.open(tmp_path, "w:gz") as tar:
tar.add(trial_dir, arcname=trial_dir.name)
archive_size = tmp_path.stat().st_size
logger.info(f"Archive created: {archive_size / (1024 * 1024):.2f} MB")
# Upload the archive
with open(tmp_path, "rb") as f:
await client.storage.from_(bucket_name).upload(
file=f,
path=f"{trial_id}.tar.gz",
file_options={"upsert": "true"},
)
logger.info(f"Successfully uploaded {trial_id}.tar.gz")
# Upload trajectory.json if it exists
trajectory_path = trial_dir / "agent" / "trajectory.json"
if trajectory_path.exists():
logger.info(f"Found trajectory.json for trial {trial_id}, uploading...")
try:
with open(trajectory_path, "rb") as f:
await client.storage.from_(bucket_name).upload(
file=f,
path=f"{trial_id}-traj.json",
file_options={"upsert": "true"},
)
logger.info(f"Successfully uploaded {trial_id}-traj.json")
except Exception as e:
logger.warning(
f"Failed to upload trajectory.json for trial {trial_id}: {e}"
)
# Return the public URL for the archive
public_url = await client.storage.from_(bucket_name).get_public_url(
f"{trial_id}.tar.gz"
)
return public_url
except Exception as e:
logger.error(f"Failed to upload trial archive {trial_id}.tar.gz: {e}")
return None
finally:
if tmp_path.exists():
tmp_path.unlink()
def get_job_id_from_dir(job_dir: Path, trial_dirs: list[Path]) -> str | None:
"""Extract job_id from job result.json or first trial's config.
This is used when we can't parse the full models (e.g., custom environment types).
"""
logger.debug(f"Looking for job_id in {job_dir.name}")
# First try job result.json
job_result_path = job_dir / "result.json"
if job_result_path.exists():
logger.debug(f"Found job result.json at {job_result_path}")
try:
data = load_json(job_result_path)
if data.get("id"):
logger.debug(f"Got job_id from job result.json: {data['id']}")
return data["id"]
else:
logger.debug("job result.json exists but has no 'id' field")
except Exception as e:
logger.debug(f"Failed to parse job result.json: {e}")
else:
logger.debug("No job result.json found")
# Fall back to first trial's config.job_id
if trial_dirs:
logger.debug("Falling back to first trial's config.job_id")
try:
trial_data = load_json(trial_dirs[0] / "result.json")
config = trial_data.get("config")
if config and isinstance(config, dict):
job_id = config.get("job_id")
if job_id:
logger.debug(f"Got job_id from trial config: {job_id}")
return str(job_id)
else:
logger.debug("Trial config has no job_id")
else:
logger.debug("Trial has no config dict")
except Exception as e:
logger.debug(f"Failed to parse trial result.json: {e}")
else:
logger.debug("No trial directories to fall back to")
logger.debug("Could not find job_id")
return None
# ============================================================================
# Validation
# ============================================================================
@dataclass
class ValidationResult:
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
models: list[str] = field(default_factory=list)
job_count: int = 0
trial_count: int = 0
successful_trials: int = 0
@property
def is_valid(self) -> bool:
return len(self.errors) == 0
@property
def accuracy(self) -> float | None:
"""Calculate accuracy as successful trials / total trials."""
if self.trial_count == 0:
return None
return self.successful_trials / self.trial_count
def validate_job_config(config: dict, job_name: str) -> list[str]:
"""Validate a job config dict meets terminal-bench submission requirements.
Uses raw dict instead of JobConfig model to allow custom environment types
that aren't in Harbor's EnvironmentType enum.
"""
errors: list[str] = []
# Check timeout_multiplier
timeout_mult = config.get("timeout_multiplier")
if timeout_mult is not None and timeout_mult != 1.0:
errors.append(f"`timeout_multiplier` is {timeout_mult}, expected 1.0")
# Check agents
for i, agent in enumerate(config.get("agents") or []):
if not isinstance(agent, dict):
continue
if agent.get("override_timeout_sec") is not None:
errors.append(
f"`agents[{i}].override_timeout_sec` is set to {agent['override_timeout_sec']}"
)
if agent.get("max_timeout_sec") is not None:
errors.append(
f"`agents[{i}].max_timeout_sec` is set to {agent['max_timeout_sec']}"
)
# Check verifier
verifier = config.get("verifier")
if isinstance(verifier, dict):
if verifier.get("override_timeout_sec") is not None:
errors.append(
f"`verifier.override_timeout_sec` is set to {verifier['override_timeout_sec']}"
)
if verifier.get("max_timeout_sec") is not None:
errors.append(
f"`verifier.max_timeout_sec` is set to {verifier['max_timeout_sec']}"
)
# Check environment
environment = config.get("environment")
if isinstance(environment, dict):
if environment.get("override_cpus") is not None:
errors.append(
f"`environment.override_cpus` is set to {environment['override_cpus']}"
)
if environment.get("override_memory_mb") is not None:
errors.append(
f"`environment.override_memory_mb` is set to {environment['override_memory_mb']}"
)
if environment.get("override_storage_mb") is not None:
errors.append(
f"`environment.override_storage_mb` is set to {environment['override_storage_mb']}"
)
return errors
def validate_trial_result(
trial: dict, job_dir_name: str, trial_dir_name: str
) -> list[str]:
"""Validate a trial result dict has required fields.
Uses raw dict to allow custom environment types in trial config.
"""
errors: list[str] = []
prefix = f"`{job_dir_name}/{trial_dir_name}/result.json`"
# Check required top-level fields
required_fields = ["id", "trial_name", "task_checksum", "agent_info"]
missing = [f for f in required_fields if f not in trial or trial[f] is None]
if missing:
errors.append(f"{prefix}: Missing required fields: {missing}")
return errors
# Check agent_info has required subfields
agent_info = trial.get("agent_info")
if not isinstance(agent_info, dict):
errors.append(f"{prefix}: `agent_info` must be an object")
return errors
missing_agent = []
if not agent_info.get("name"):
missing_agent.append("agent_info.name")
if not agent_info.get("version"):
missing_agent.append("agent_info.version")
if missing_agent:
errors.append(f"{prefix}: Missing required fields: {missing_agent}")
return errors
REWARD_HACKING_PATTERN = re.compile(
r"harbor[-\s]?framework|harborframework|laude[-\s]?institute|tbench\.ai",
re.IGNORECASE,
)
def check_reward_hacking(trial_dir: Path, job_name: str, trial_name: str) -> list[str]:
"""Scan agent/* files in a trial directory for reward hacking strings."""
agent_dir = trial_dir / "agent"
if not agent_dir.is_dir():
return []
errors: list[str] = []
for file_path in agent_dir.rglob("*"):
if not file_path.is_file():
continue
try:
text = file_path.read_text(errors="ignore")
except Exception:
continue
matches = set(REWARD_HACKING_PATTERN.findall(text))
if matches:
rel = file_path.relative_to(trial_dir)
errors.append(
f"`{job_name}/{trial_name}/{rel}`: "
f"Reward hacking detected: {', '.join(sorted(matches))}"
)
return errors
def validate_submission(submission_dir: Path) -> ValidationResult:
"""Validate a complete submission directory."""
logger.info(f"Validating submission: {submission_dir.name}")
result = ValidationResult()
# Check metadata exists and is valid
metadata_path = find_metadata_file(submission_dir)
if not metadata_path:
logger.warning(f"Missing metadata.yaml in {submission_dir.name}")
result.errors.append(f"Missing `metadata.yaml` in `{submission_dir.name}/`")
return result
logger.debug(f"Found metadata at {metadata_path}")
try:
metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path))
result.models = [
f"{m.model_name} ({m.model_provider})" for m in metadata.models
]
logger.info(f"Metadata valid, models: {result.models}")
except ValidationError as e:
logger.warning(f"Invalid metadata.yaml: {e}")
result.errors.append(f"Invalid `metadata.yaml`: {e}")
return result
# Find and validate job directories
job_dirs = get_job_dirs(submission_dir)
if not job_dirs:
logger.warning(f"No job directories found in {submission_dir.name}")
result.errors.append(f"No job directories found in `{submission_dir.name}/`")
return result
result.job_count = len(job_dirs)
logger.info(f"Found {len(job_dirs)} job directories")
task_trial_counts: defaultdict[str, int] = defaultdict(int)
for job_dir in job_dirs:
logger.debug(f"Validating job directory: {job_dir.name}")
# Validate job config (using raw dict to allow custom environment types)
try:
config_raw = load_json(job_dir / "config.json")
config_errors = validate_job_config(config_raw, job_dir.name)
if config_errors:
logger.warning(f"Job config errors in {job_dir.name}: {config_errors}")
for error in config_errors:
result.errors.append(f"`{job_dir.name}`: {error}")
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in {job_dir.name}/config.json: {e}")
result.errors.append(f"`{job_dir.name}/config.json`: Invalid JSON: {e}")
continue
# Find and validate trials
trial_dirs = get_trial_dirs(job_dir)
if not trial_dirs:
logger.warning(f"No trial directories in {job_dir.name}")
result.errors.append(f"`{job_dir.name}`: No trial directories found")
continue
logger.debug(f"Found {len(trial_dirs)} trials in {job_dir.name}")
# Get canonical job_id for consistency check
canonical_job_id: str | None = None
job_result_path = job_dir / "result.json"
if job_result_path.exists():
logger.debug(f"Found job result.json in {job_dir.name}")
try:
job_result = JobResult.model_validate(load_json(job_result_path))
canonical_job_id = str(job_result.id)
logger.debug(f"Canonical job_id from result.json: {canonical_job_id}")
except (ValidationError, json.JSONDecodeError) as e:
logger.debug(f"Could not parse job result.json: {e}")
for trial_dir in trial_dirs:
logger.debug(f"Validating trial: {trial_dir.name}")
try:
trial_raw = load_json(trial_dir / "result.json")
# Validate trial structure (using raw dict to allow custom environment types)
trial_errors = validate_trial_result(
trial_raw, job_dir.name, trial_dir.name
)
if trial_errors:
logger.warning(f"Trial validation errors: {trial_errors}")
result.errors.extend(trial_errors)
continue
# Check job_id consistency
trial_config = trial_raw.get("config") or {}
trial_job_id = (
str(trial_config.get("job_id"))
if trial_config.get("job_id")
else None
)
if canonical_job_id is None and trial_job_id:
canonical_job_id = trial_job_id
logger.debug(f"Set canonical job_id from trial: {canonical_job_id}")
elif (
canonical_job_id
and trial_job_id
and trial_job_id != canonical_job_id
):
logger.warning(
f"Job ID mismatch in {trial_dir.name}: "
f"{trial_job_id} != {canonical_job_id}"
)
result.errors.append(
f"`{job_dir.name}/{trial_dir.name}/result.json`: "
f"`config.job_id` ({trial_job_id}) does not match job's id ({canonical_job_id})"
)
continue
# Check for reward hacking in agent files
rh_errors = check_reward_hacking(
trial_dir, job_dir.name, trial_dir.name
)
if rh_errors:
result.errors.extend(rh_errors)
result.trial_count += 1
task_trial_counts[trial_raw["task_checksum"]] += 1
# Check if trial was successful (reward > 0)
verifier_result = trial_raw.get("verifier_result") or {}
rewards = verifier_result.get("rewards") or {}
reward = rewards.get("reward", 0)
if reward and float(reward) > 0:
result.successful_trials += 1
logger.debug(f"Trial {trial_dir.name} successful (reward={reward})")
else:
logger.debug(f"Trial {trial_dir.name} failed (reward={reward})")
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in {trial_dir.name}/result.json: {e}")
result.errors.append(
f"`{job_dir.name}/{trial_dir.name}/result.json`: Invalid JSON: {e}"
)
# Check all tasks are covered
if len(task_trial_counts) < EXPECTED_TASK_COUNT:
result.errors.append(
f"Submission covers {len(task_trial_counts)} unique task(s), "
f"expected {EXPECTED_TASK_COUNT}"
)
# Check minimum trials per task
for task_checksum, count in sorted(task_trial_counts.items()):
if count < MIN_TRIALS_PER_TASK:
result.errors.append(
f"Task `{task_checksum}` has only {count} trial(s), "
f"minimum {MIN_TRIALS_PER_TASK} required"
)
logger.info(
f"Validation complete: {result.trial_count} trials, "
f"{result.successful_trials} successful, {len(result.errors)} errors"
)
return result
def format_validation_comment(submissions: list[tuple[str, ValidationResult]]) -> str:
"""Format validation results as a markdown comment."""
lines = ["## Terminal-Bench Submission Validation", ""]
all_valid = all(r.is_valid for _, r in submissions)
if all_valid:
lines.append("**All submissions passed validation!**")
lines.append("")
for name, result in submissions:
lines.append(f"### `{name}`")
if result.models:
lines.append(f"- Models: {', '.join(result.models)}")
lines.append(f"- Jobs: {result.job_count}")
lines.append(f"- Trials: {result.trial_count}")
if result.accuracy is not None:
lines.append(f"- Accuracy: {result.accuracy:.1%}")
lines.append("")
lines.append("Ready to merge.")
else:
lines.append("**Validation failed**")
lines.append("")
for name, result in submissions:
if not result.is_valid:
lines.append(f"### `{name}`")
max_errors = 20
for error in result.errors[:max_errors]:
lines.append(f"- {error}")
if len(result.errors) > max_errors:
lines.append(
f"- ... and {len(result.errors) - max_errors} more errors"
)
lines.append("")
lines.append("Please fix the errors and push again.")
return "\n".join(lines)
# ============================================================================
# Import Logic
# ============================================================================
async def get_supabase_client() -> AsyncClient:
"""Create async Supabase client from environment variables."""
url = os.environ.get("SUPABASE_URL")
key = os.environ.get("SUPABASE_SECRET_KEY")
if not url or not key:
raise ValueError("SUPABASE_URL and SUPABASE_SECRET_KEY must be set")
return await create_async_client(url, key)
async def job_exists(client: AsyncClient, job_id: str) -> bool:
"""Check if a job already exists in the database."""
result = await client.table("job").select("id").eq("id", job_id).execute()
return len(result.data) > 0
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def upsert_with_retry(client: AsyncClient, table: str, data: dict | list[dict]):
"""Upsert data with retry logic."""
return await client.table(table).upsert(data).execute()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def insert_ignore_conflicts(
client: AsyncClient, table: str, data: dict | list[dict]
):
"""Insert data, ignoring conflicts (existing rows)."""
return await (
client.table(table)
.upsert(data, on_conflict="name,provider", ignore_duplicates=True)
.execute()
)
async def import_submission(
submission_dir: Path,
metadata: SubmissionMetadata,
*,
client: AsyncClient | None = None,
dry_run: bool = False,
) -> ImportStats:
"""Import a validated submission to the database.
If dry_run=True, parses all data and validates structure but skips
database writes and storage uploads.
"""
logger.info(
f"Starting import for submission: {submission_dir.name}"
+ (" (DRY RUN)" if dry_run else "")
)
if not dry_run:
db = client if client is not None else await get_supabase_client()
stats: ImportStats = {"jobs_imported": 0, "trials_imported": 0, "errors": []}
for job_dir in get_job_dirs(submission_dir):
try:
# Load job config (raw dict to allow custom environment types)
config_raw = load_json(job_dir / "config.json")
job_name = config_raw.get("job_name") or job_dir.name
logger.info(f"Processing job: {job_name}")
# Load job result if it exists (JobResult doesn't have enum issues)
job_result: JobResult | None = None
job_result_path = job_dir / "result.json"
if job_result_path.exists():
logger.debug(f"Found job result.json for {job_name}")
try:
job_result = JobResult.model_validate(load_json(job_result_path))
logger.debug("Parsed job result successfully")
except ValidationError as e:
logger.debug(f"Could not parse job result.json: {e}")
else:
logger.debug(f"No job result.json for {job_name}")
trial_dirs = get_trial_dirs(job_dir)
if not trial_dirs:
logger.warning(f"No trials in {job_dir.name}, skipping")
stats["errors"].append(f"No trials in {job_dir.name}")
continue
logger.debug(f"Found {len(trial_dirs)} trials")
# Get job_id from job result or trial
job_id: str | None = None
if job_result:
job_id = str(job_result.id)
logger.debug(f"Got job_id from job result: {job_id}")
else:
logger.debug("Getting job_id from directory")
job_id = get_job_id_from_dir(job_dir, trial_dirs)
if not job_id:
logger.warning(f"Could not find job_id in {job_dir.name}, skipping")
stats["errors"].append(f"Could not find job_id in {job_dir.name}")
continue
# Skip if job already exists
if not dry_run and await job_exists(db, job_id):
logger.info(f"Job {job_id} already exists in database, skipping")
continue
logger.info(
f"{'[DRY RUN] Would import' if dry_run else 'Importing new'} job: {job_id}"
)
# Insert job
job_insert = JobInsert(
id=UUID(job_id),
config=config_raw,
job_name=job_name,
n_trials=len(trial_dirs),
username="hf-submission",
package_version=harbor.__version__,
include_on_leaderboard=True,
started_at=job_result.started_at if job_result else None,
ended_at=job_result.finished_at if job_result else None,
stats=job_result.stats.model_dump(mode="json")
if job_result and job_result.stats
else None,
)
if not dry_run:
await upsert_with_retry(
db,
"job",
job_insert.model_dump(mode="json", exclude_none=True),
)
logger.info(
f"{'[DRY RUN] Parsed' if dry_run else 'Inserted'} job: {job_id}"
)
stats["jobs_imported"] += 1
# Process trials in three phases:
# Phase 1: Parse trial data (sequential, fast)
# Phase 2: Upload to storage (parallel, slow)
# Phase 3: Build insert objects (sequential, fast)
agent_inserts: dict[tuple[str, str], dict] = {}
model_inserts: dict[tuple[str, str], dict] = {}
trial_inserts: list[dict] = []
trial_model_inserts: list[dict] = []
# Phase 1: Parse all trial JSONs and collect metadata
parsed_trials: list[tuple[Path, dict]] = []
for trial_dir in trial_dirs:
try:
trial_raw = load_json(trial_dir / "result.json")
# Extract required fields from raw dict
agent_info = trial_raw.get("agent_info") or {}
agent_name = agent_info.get("name", "unknown")
agent_version = agent_info.get("version", "unknown")
# Collect agent
agent_key = (agent_name, agent_version)
if agent_key not in agent_inserts:
agent_inserts[agent_key] = AgentInsert(
name=agent_name,
version=agent_version,
display_name=metadata.agent_display_name,
url=metadata.agent_url,
org_display_name=metadata.agent_org_display_name,
).model_dump(mode="json", exclude_none=True)
# Collect models
for model in metadata.models:
model_key = (model.model_name, model.model_provider)
if model_key not in model_inserts:
# Look up token costs from litellm
key = f"{model.model_provider}/{model.model_name}"
token_costs = model_cost.get(key) or model_cost.get(
model.model_name
)
input_cost = (
token_costs.get("input_cost_per_token")
if token_costs
else None
)
output_cost = (
token_costs.get("output_cost_per_token")
if token_costs
else None
)
model_inserts[model_key] = ModelInsert(
name=model.model_name,
provider=model.model_provider,
display_name=model.model_display_name,
org_display_name=model.model_org_display_name,
cents_per_million_input_tokens=round(input_cost * 1e8)
if input_cost
else None,
cents_per_million_output_tokens=round(output_cost * 1e8)
if output_cost
else None,
).model_dump(mode="json", exclude_none=True)
parsed_trials.append((trial_dir, trial_raw))
except Exception as e:
logger.error(f"Error parsing trial {trial_dir.name}: {e}")
stats["errors"].append(f"Trial {trial_dir.name}: {e}")
# Phase 2: Upload trials to storage in parallel
storage_urls: dict[str, str | None] = {}
if parsed_trials and not dry_run:
logger.info(
f"Uploading {len(parsed_trials)} trials to storage "
f"with {MAX_UPLOAD_WORKERS} workers"
)
sem = asyncio.Semaphore(MAX_UPLOAD_WORKERS)
async def _upload(td: Path, tr: dict) -> None:
tid = str(tr["id"])
async with sem:
try:
url = await upload_trial_to_storage(db, td, tid)
if not url:
logger.warning(
f"Storage upload failed for trial {tid}, "
"trial_uri will be null"
)
storage_urls[tid] = url
except Exception as e:
logger.error(f"Storage upload error for trial {tid}: {e}")
storage_urls[tid] = None
async with asyncio.TaskGroup() as tg:
for td, tr in parsed_trials:
tg.create_task(_upload(td, tr))
elif parsed_trials and dry_run:
logger.info(
f"[DRY RUN] Would upload {len(parsed_trials)} trials to storage"
)
# Phase 3: Build TrialInsert and TrialModelInsert objects
for trial_dir, trial_raw in parsed_trials:
try:
agent_info = trial_raw.get("agent_info") or {}
agent_name = agent_info.get("name", "unknown")
agent_version = agent_info.get("version", "unknown")
# Get reward from verifier_result (optional)
reward: Decimal | None = None
verifier_result = trial_raw.get("verifier_result") or {}
rewards = verifier_result.get("rewards") or {}
reward_val = rewards.get("reward")
if reward_val is not None:
reward = Decimal(str(reward_val))
# Extract timing info (all optional)
agent_execution = trial_raw.get("agent_execution") or {}
agent_setup = trial_raw.get("agent_setup") or {}
environment_setup = trial_raw.get("environment_setup") or {}
verifier_timing = trial_raw.get("verifier") or {}
agent_result = trial_raw.get("agent_result") or {}
trial_id = str(trial_raw["id"])
storage_url = storage_urls.get(trial_id)
# Collect trial
trial_inserts.append(
TrialInsert(
id=trial_raw["id"],
agent_name=agent_name,
agent_version=agent_version,
config=trial_raw.get("config") or {},
task_checksum=trial_raw.get("task_checksum", ""),
trial_name=trial_raw.get("trial_name", ""),
trial_uri=storage_url,
# TimingInfo fields (all optional)
agent_execution_started_at=agent_execution.get(
"started_at"
),
agent_execution_ended_at=agent_execution.get("finished_at"),
agent_setup_started_at=agent_setup.get("started_at"),
agent_setup_ended_at=agent_setup.get("finished_at"),
environment_setup_started_at=environment_setup.get(
"started_at"
),
environment_setup_ended_at=environment_setup.get(
"finished_at"
),
verifier_started_at=verifier_timing.get("started_at"),
verifier_ended_at=verifier_timing.get("finished_at"),
# Other optional fields
exception_info=trial_raw.get("exception_info"),
job_id=UUID(job_id),
reward=reward,
started_at=trial_raw.get("started_at"),
ended_at=trial_raw.get("finished_at"),
agent_metadata=agent_result.get("metadata"),
).model_dump(mode="json", exclude_none=True)
)
# Collect trial_models
for model in metadata.models:
trial_model_inserts.append(
TrialModelInsert(
trial_id=trial_raw["id"],
model_name=model.model_name,
model_provider=model.model_provider,
n_input_tokens=agent_result.get("n_input_tokens"),
n_output_tokens=agent_result.get("n_output_tokens"),
n_cache_tokens=agent_result.get("n_cache_tokens"),
).model_dump(mode="json", by_alias=True, exclude_none=True)
)
stats["trials_imported"] += 1
except Exception as e:
logger.error(f"Error processing trial {trial_dir.name}: {e}")
stats["errors"].append(f"Trial {trial_dir.name}: {e}")
# Batch insert
if dry_run:
logger.info(
f"[DRY RUN] Would insert: {len(agent_inserts)} agents, "
f"{len(model_inserts)} models, {len(trial_inserts)} trials, "
f"{len(trial_model_inserts)} trial_models"
)
else:
if agent_inserts:
logger.debug(f"Upserting {len(agent_inserts)} agents")
await upsert_with_retry(db, "agent", list(agent_inserts.values()))
if model_inserts:
logger.debug(f"Inserting {len(model_inserts)} models")
await insert_ignore_conflicts(
db, "model", list(model_inserts.values())
)
if trial_inserts:
logger.debug(f"Upserting {len(trial_inserts)} trials")
await upsert_with_retry(db, "trial", trial_inserts)
if trial_model_inserts:
logger.debug(f"Upserting {len(trial_model_inserts)} trial_models")
await upsert_with_retry(db, "trial_model", trial_model_inserts)
logger.info(
f"Job {job_dir.name} import complete: "
f"{len(trial_inserts)} trials, {len(agent_inserts)} agents, {len(model_inserts)} models"
)
except ValidationError as e:
logger.exception(f"Error importing job {job_dir.name}")
stats["errors"].append(f"Job {job_dir.name}: {e}")
except Exception as e:
logger.exception(f"Error importing job {job_dir.name}")
stats["errors"].append(f"Job {job_dir.name}: {e}")
logger.info(f"Submission import complete: {stats}")
return stats
# ============================================================================
# HF Hub Operations
# ============================================================================
async def _list_submission_folders(
api: HfApi, revision: str | None = None
) -> list[RepoFolder]:
"""List submission folders (non-recursive, fast)."""
items = await asyncio.to_thread(
lambda: list(
api.list_repo_tree(
repo_id=DATASET_REPO,
path_in_repo=SUBMISSIONS_PATH,
repo_type="dataset",
revision=revision,
)
)
)
return [item for item in items if isinstance(item, RepoFolder)]
async def _list_job_folders(
api: HfApi, submission_path: str, revision: str | None = None
) -> list[RepoFolder]:
"""List job folders inside a submission (non-recursive, fast)."""
items = await asyncio.to_thread(
lambda: list(
api.list_repo_tree(
repo_id=DATASET_REPO,
path_in_repo=submission_path,
repo_type="dataset",
revision=revision,
)
)
)
return [item for item in items if isinstance(item, RepoFolder)]
def _download_file(
file_path: str,
local_dir: str | Path,
revision: str | None = None,
) -> Path:
"""Download a single file from the dataset repo."""
return Path(
hf_hub_download(
repo_id=DATASET_REPO,
filename=file_path,
repo_type="dataset",
revision=revision,
local_dir=str(local_dir),
)
)
async def _download_submission_files(
api: HfApi,
submission_name: str,
local_dir: str | Path,
revision: str | None = None,
) -> None:
"""Download essential files for a submission: metadata, configs, and results.
Uses level-by-level tree listing + parallel file downloads to avoid
the recursive tree listing that causes timeouts on large repos.
"""
sub_path = f"{SUBMISSIONS_PATH}/{submission_name}"
# Download metadata.yaml (try both extensions)
for ext in ("yaml", "yml"):
try:
await asyncio.to_thread(
_download_file, f"{sub_path}/metadata.{ext}", local_dir, revision
)
break
except Exception:
pass
# Collect all file paths to download, then fetch in parallel
files_to_download: list[str] = []
for job_folder in await _list_job_folders(api, sub_path, revision):
job_path = job_folder.path
files_to_download.append(f"{job_path}/config.json")
files_to_download.append(f"{job_path}/result.json")
# List trial folders and collect their result.json paths
trial_items = await asyncio.to_thread(
lambda jp=job_path: list(
api.list_repo_tree(
repo_id=DATASET_REPO,
path_in_repo=jp,
repo_type="dataset",
revision=revision,
)
)
)
for trial_item in trial_items:
if isinstance(trial_item, RepoFolder):
files_to_download.append(f"{trial_item.path}/result.json")
# List agent/* files for reward hacking checks
try:
agent_items = await asyncio.to_thread(
lambda tp=trial_item.path: list(
api.list_repo_tree(
repo_id=DATASET_REPO,
path_in_repo=f"{tp}/agent",
repo_type="dataset",
revision=revision,
recursive=True,
)
)
)
for agent_item in agent_items:
if isinstance(agent_item, RepoFile):
files_to_download.append(agent_item.path)
except Exception:
pass # agent/ dir may not exist
logger.info(f"Downloading {len(files_to_download)} files for {submission_name}")
sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)
async def _safe_download(file_path: str) -> None:
async with sem:
try:
await asyncio.to_thread(_download_file, file_path, local_dir, revision)
except Exception as e:
logger.debug(f"Could not download {file_path}: {e}")
async with asyncio.TaskGroup() as tg:
for f in files_to_download:
tg.create_task(_safe_download(f))
async def get_changed_submission_names(api: HfApi, pr_revision: str) -> list[str]:
"""Detect which submission folders changed in a PR vs main.
Compares submission folder tree_ids (content hashes of the whole subtree)
between the PR and main. A different tree_id means something changed inside.
"""
logger.info(f"Comparing submission folders: PR ({pr_revision}) vs main")
pr_folders = {
f.path.split("/")[-1]: f.tree_id
for f in await _list_submission_folders(api, pr_revision)
}
main_folders = {
f.path.split("/")[-1]: f.tree_id
for f in await _list_submission_folders(api, "main")
}
changed = [
name
for name, tree_id in pr_folders.items()
if main_folders.get(name) != tree_id
]
logger.info(f"Changed submission folders: {sorted(changed)}")
return sorted(changed)
def get_new_job_ids_by_submission(
probe_dir: Path,
) -> dict[str, list[str]]:
"""From a probe download of result.json files, extract job IDs grouped by submission.
Returns a dict mapping submission folder name -> list of job IDs found.
"""
submissions_dir = probe_dir / SUBMISSIONS_PATH
if not submissions_dir.exists():
return {}
result: dict[str, list[str]] = {}
for sub_dir in submissions_dir.iterdir():
if not sub_dir.is_dir():
continue
job_ids: list[str] = []
for job_dir in sub_dir.iterdir():
if not job_dir.is_dir():
continue
result_path = job_dir / "result.json"
if not result_path.exists():
continue
try:
data = load_json(result_path)
job_id = data.get("id")
if job_id:
job_ids.append(str(job_id))
except Exception as e:
logger.debug(f"Failed to parse {result_path}: {e}")
if job_ids:
result[sub_dir.name] = job_ids
return result
async def batch_check_existing_jobs(
client: AsyncClient, job_ids: list[str]
) -> set[str]:
"""Check which job IDs already exist in the database. Returns set of existing IDs."""
if not job_ids:
return set()
# Supabase .in_() has a limit, so batch in chunks
existing: set[str] = set()
chunk_size = 100
for i in range(0, len(job_ids), chunk_size):
chunk = job_ids[i : i + chunk_size]
result = await client.table("job").select("id").in_("id", chunk).execute()
for row in cast(list[dict[str, Any]], result.data):
existing.add(row["id"])
return existing
# ============================================================================
# Webhook Handlers
# ============================================================================
async def handle_pr(pr_num: int, api: HfApi) -> dict:
"""Handle PR validation."""
logger.info(f"Processing PR #{pr_num}")
pr_revision = f"refs/pr/{pr_num}"
# Detect changed submission folders by comparing file trees (no downloads)
try:
changed_names = await get_changed_submission_names(api, pr_revision)
except Exception as e:
logger.error(f"Failed to detect changed submissions: {e}")
return {"status": "error", "message": "Failed to determine changed files"}
if not changed_names:
comment = "No new or changed submissions found in this PR."
else:
# Download only essential files for changed submissions
logger.info(
f"Downloading {len(changed_names)} changed submissions: {changed_names}"
)
with tempfile.TemporaryDirectory() as tmpdir:
local_dir = os.path.join(tmpdir, "repo")
for name in changed_names:
await _download_submission_files(
api, name, local_dir, revision=pr_revision
)
submissions_dir = Path(local_dir) / SUBMISSIONS_PATH
subs_to_validate: list[Path] = []
missing_metadata: list[str] = []
for name in changed_names:
sub_dir = submissions_dir / name
if not sub_dir.exists() or not sub_dir.is_dir():
continue
if has_metadata_file(sub_dir):
subs_to_validate.append(sub_dir)
else:
missing_metadata.append(name)
logger.info(f"Will validate {len(subs_to_validate)} submissions")
if missing_metadata:
logger.warning(f"Submissions missing metadata: {missing_metadata}")
if missing_metadata:
comment = (
"## Terminal-Bench Submission Validation\n\n"
"**Validation failed**\n\n"
"The following submission directories are missing `metadata.yaml`:\n"
+ "\n".join(f"- `{name}/`" for name in missing_metadata)
+ "\n\nPlease add a `metadata.yaml` file to each submission "
"directory, as described in the README."
)
elif not subs_to_validate:
comment = "No new or changed submissions found in this PR."
else:
logger.info(f"Validating {len(subs_to_validate)} submissions")
results = [(d.name, validate_submission(d)) for d in subs_to_validate]
all_valid = all(r.is_valid for _, r in results)
logger.info(f"Validation complete, all_valid={all_valid}")
comment = format_validation_comment(results)
logger.info(f"Posting comment to PR #{pr_num}")
await asyncio.to_thread(
api.comment_discussion,
repo_id=DATASET_REPO,
repo_type="dataset",
discussion_num=pr_num,
comment=comment,
)
logger.info(f"PR #{pr_num} handling complete")
return {"status": "success", "action": "validated"}
async def handle_merge(*, dry_run: bool = False) -> dict:
"""Handle merge to main - import only new submissions.
If dry_run=True, downloads and parses all data but skips database writes
and storage uploads. Useful for testing the full pipeline.
"""
logger.info(f"Processing merge to main{' (DRY RUN)' if dry_run else ''}")
api = HfApi(token=os.environ.get("HF_TOKEN"))
with tempfile.TemporaryDirectory() as tmpdir:
probe_dir = os.path.join(tmpdir, "probe")
# Phase 1: Download only job-level result.json to discover job IDs
# Walk the tree level-by-level (non-recursive) to avoid timeouts
logger.info("Phase 1: Probing for job IDs via result.json files")
sub_folders = await _list_submission_folders(api)
logger.info(f"Found {len(sub_folders)} submission folders")
# Collect all job result.json paths, then download in parallel
probe_files: list[str] = []
for sub_folder in sub_folders:
for job_folder in await _list_job_folders(api, sub_folder.path):
probe_files.append(f"{job_folder.path}/result.json")
logger.info(f"Downloading {len(probe_files)} job result.json files")
sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)
async def _probe_download(file_path: str) -> None:
async with sem:
try:
await asyncio.to_thread(_download_file, file_path, probe_dir, None)
except Exception as e:
logger.debug(f"No result.json at {file_path}: {e}")
async with asyncio.TaskGroup() as tg:
for f in probe_files:
tg.create_task(_probe_download(f))
jobs_by_submission = get_new_job_ids_by_submission(Path(probe_dir))
all_job_ids = [jid for ids in jobs_by_submission.values() for jid in ids]
logger.info(
f"Found {len(all_job_ids)} jobs across "
f"{len(jobs_by_submission)} submissions"
)
if not all_job_ids:
return {"status": "success", "message": "no jobs found"}
# Create client once for all DB operations (skipped in dry run)
client: AsyncClient | None = None if dry_run else await get_supabase_client()
if dry_run:
# In dry run, treat all submissions as having new jobs
new_submission_names = sorted(jobs_by_submission.keys())
logger.info(
f"[DRY RUN] Skipping DB check, treating all "
f"{len(new_submission_names)} submissions as new"
)
else:
# Check which jobs already exist in the database
assert client is not None
existing_ids = await batch_check_existing_jobs(client, all_job_ids)
logger.info(f"{len(existing_ids)} of {len(all_job_ids)} jobs already in DB")
# Determine which submissions have at least one new job
new_submission_names = []
for name, job_ids in jobs_by_submission.items():
new_ids = [jid for jid in job_ids if jid not in existing_ids]
if new_ids:
logger.info(
f"Submission {name}: {len(new_ids)} new jobs "
f"(of {len(job_ids)} total)"
)
new_submission_names.append(name)
else:
logger.debug(
f"Submission {name}: all {len(job_ids)} jobs exist, skipping"
)
if not new_submission_names:
logger.info("All jobs already imported, nothing to do")
return {"status": "success", "message": "all jobs already imported"}
# Phase 2: Download essential files for submissions with new jobs
local_dir = os.path.join(tmpdir, "repo")
logger.info(
f"Phase 2: Downloading {len(new_submission_names)} submissions "
f"with new jobs: {new_submission_names}"
)
for name in new_submission_names:
await _download_submission_files(api, name, local_dir)
submissions_dir = Path(local_dir) / SUBMISSIONS_PATH
total: TotalStats = {"jobs": 0, "trials": 0, "errors": []}
for name in new_submission_names:
sub_dir = submissions_dir / name
if not sub_dir.exists() or not sub_dir.is_dir():
logger.warning(f"Expected folder not found after download: {name}")
total["errors"].append(f"{name}: folder not found after download")
continue
metadata_path = find_metadata_file(sub_dir)
if not metadata_path:
logger.debug(f"Skipping {name}: no metadata file")
continue
try:
logger.info(f"Importing submission: {name}")
metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path))
stats = await import_submission(
sub_dir, metadata, client=client, dry_run=dry_run
)
total["jobs"] += stats["jobs_imported"]
total["trials"] += stats["trials_imported"]
total["errors"].extend(stats["errors"])
logger.info(
f"Submission {name} imported: "
f"{stats['jobs_imported']} jobs, {stats['trials_imported']} trials"
)
except Exception as e:
logger.exception(f"Error importing {name}")
total["errors"].append(f"{name}: {e}")
logger.info(
f"Merge handling complete: {total['jobs']} jobs, "
f"{total['trials']} trials, {len(total['errors'])} errors"
)
return {"status": "success", "action": "imported", "stats": total}
# ============================================================================
# Gradio UI & Webhook Server
# ============================================================================
with gr.Blocks() as ui:
gr.Markdown("""
# Terminal-Bench Leaderboard Importer
This Space automatically processes submissions to the Terminal-Bench leaderboard.
## How it works
1. **On Pull Request**: Validates the submission and posts a comment with results
2. **On Merge**: Imports validated submissions to the leaderboard database
## Submission Format
Submissions should be placed in `submissions/terminal-bench/2.0/<agent>__<model>/`:
```
submissions/terminal-bench/2.0/my-agent__gpt-5/
metadata.yaml
job-folder/
config.json
trial-1/result.json
trial-2/result.json
...
```
See the [leaderboard repo](https://huggingface.co/datasets/alexgshaw/terminal-bench-2-leaderboard)
for detailed submission instructions.
""")
_background_tasks: set[asyncio.Task] = set()
def create_app() -> WebhooksServer:
"""Create and configure the webhook server."""
app = WebhooksServer(ui=ui, webhook_secret=os.environ.get("WEBHOOK_SECRET"))
@app.add_webhook()
async def handle_submission(payload: WebhookPayload) -> dict:
"""Handle webhook events from the leaderboard repo."""
logger.info(
f"Received webhook: action={payload.event.action}, scope={payload.event.scope}"
)
logger.debug(
f"Webhook payload repo: type={payload.repo.type}, name={payload.repo.name}"
)
# Only process our dataset repo
if payload.repo.type != "dataset" or payload.repo.name != DATASET_REPO:
logger.info(
f"Ignoring webhook: wrong repo "
f"(type={payload.repo.type}, name={payload.repo.name})"
)
return {"status": "ignored", "reason": "wrong repo"}
logger.debug("Webhook is for our dataset repo")
# Determine if this is a PR event
is_pr = payload.discussion is not None and payload.discussion.isPullRequest
pr_num = payload.discussion.num if payload.discussion else None
logger.debug(f"Initial PR detection: is_pr={is_pr}, pr_num={pr_num}")
# Check updatedRefs for PR refs when discussion is not present
if not is_pr and payload.event.scope == "repo.content":
logger.debug("Checking updatedRefs for PR refs")
for ref_info in getattr(payload, "updatedRefs", None) or []:
ref = (
ref_info.get("ref", "")
if isinstance(ref_info, dict)
else getattr(ref_info, "ref", "")
)
logger.debug(f"Checking ref: {ref}")
match = re.match(r"refs/pr/(\d+)", ref)
if match:
is_pr = True
pr_num = int(match.group(1))
logger.debug(f"Found PR ref in updatedRefs: PR #{pr_num}")
break
is_merge = (
payload.event.action == "update"
and payload.event.scope == "repo.content"
and not is_pr
)
logger.info(
f"Event classification: is_pr={is_pr}, is_merge={is_merge}, pr_num={pr_num}"
)
def _run_in_background(coro):
"""Fire-and-forget: run coroutine as a background task on the event loop."""
task = asyncio.create_task(coro)
_background_tasks.add(task)
def _on_done(t):
_background_tasks.discard(t)
if t.exception():
logger.exception(
"Error in background handler", exc_info=t.exception()
)
task.add_done_callback(_on_done)
if is_pr and pr_num is not None:
logger.info(f"Handling as PR event: PR #{pr_num}")
api = HfApi(token=os.environ.get("HF_TOKEN"))
_run_in_background(handle_pr(pr_num, api))
return {"status": "processing", "action": "validating"}
elif is_merge:
logger.info("Handling as merge event")
_run_in_background(handle_merge())
return {"status": "processing", "action": "importing"}
else:
logger.info(
f"Ignoring webhook: unhandled event type "
f"(action={payload.event.action}, scope={payload.event.scope})"
)
return {"status": "ignored", "reason": "unhandled event type"}
return app
if __name__ == "__main__":
app = create_app()
app.launch(server_name="0.0.0.0", server_port=7860)