""" Terminal-Bench Leaderboard Importer A HuggingFace Space that: 1. Validates submissions on PR (posts comment with validation results) 2. Imports validated submissions to Supabase on merge """ import asyncio import json import logging import os import re import tarfile import tempfile from collections import defaultdict from dataclasses import dataclass, field from decimal import Decimal from pathlib import Path from typing import Any, TypedDict, cast from uuid import UUID import gradio as gr import harbor import yaml from harbor.models.job.result import JobResult # NOTE: We intentionally don't use JobConfig or TrialResult for parsing because # they contain EnvironmentType enum which rejects custom environment types. # Instead, we parse only the specific fields we need from raw dicts. from huggingface_hub import ( HfApi, RepoFile, RepoFolder, WebhookPayload, WebhooksServer, hf_hub_download, ) from litellm import model_cost from pydantic import UUID4, BaseModel, ConfigDict, Field, ValidationError from supabase._async.client import AsyncClient, create_client as create_async_client from tenacity import retry, stop_after_attempt, wait_exponential # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants DATASET_REPO = "harborframework/terminal-bench-2-leaderboard" SUBMISSIONS_PATH = "submissions/terminal-bench/2.0" MAX_CONCURRENT_DOWNLOADS = 20 MIN_TRIALS_PER_TASK = 5 EXPECTED_TASK_COUNT = 89 MAX_UPLOAD_WORKERS = 8 # ============================================================================ # Type Definitions # ============================================================================ class ImportStats(TypedDict): jobs_imported: int trials_imported: int errors: list[str] class TotalStats(TypedDict): jobs: int trials: int errors: list[str] # ============================================================================ # Pydantic Models for DB inserts # ============================================================================ class AgentInsert(BaseModel): name: str version: str created_at: Any | None = None description: str | None = None display_name: str | None = None org_display_name: str | None = None url: str | None = None class JobInsert(BaseModel): id: UUID4 config: dict | list[dict] | list[Any] job_name: str n_trials: int username: str created_at: Any | None = None ended_at: Any | None = None git_commit_id: str | None = None include_on_leaderboard: bool | None = None metrics: dict | list[dict] | list[Any] | None = None package_version: str | None = None started_at: Any | None = None stats: dict | list[dict] | list[Any] | None = None verified: bool | None = None class ModelInsert(BaseModel): name: str provider: str cents_per_million_cache_tokens: Decimal | None = None cents_per_million_input_tokens: int | None = None cents_per_million_output_tokens: int | None = None created_at: Any | None = None description: str | None = None display_name: str | None = None org_display_name: str | None = None class TrialInsert(BaseModel): id: UUID4 agent_name: str agent_version: str config: dict | list[dict] | list[Any] task_checksum: str trial_name: str agent_execution_ended_at: Any | None = None agent_execution_started_at: Any | None = None agent_metadata: dict | list[dict] | list[Any] | None = None agent_setup_ended_at: Any | None = None agent_setup_started_at: Any | None = None created_at: Any | None = None ended_at: Any | None = None environment_setup_ended_at: Any | None = None environment_setup_started_at: Any | None = None exception_info: dict | list[dict] | list[Any] | None = None job_id: UUID4 | None = None reward: Decimal | None = None started_at: Any | None = None trial_uri: str | None = None verifier_ended_at: Any | None = None verifier_started_at: Any | None = None class TrialModelInsert(BaseModel): model_config = ConfigDict(populate_by_name=True) model_name: str = Field(alias="model_name") model_provider: str = Field(alias="model_provider") trial_id: UUID4 created_at: Any | None = None n_cache_tokens: int | None = None n_input_tokens: int | None = None n_output_tokens: int | None = None # ============================================================================ # Metadata Schema # ============================================================================ class ModelMetadata(BaseModel): model_name: str model_provider: str model_display_name: str | None = None model_org_display_name: str | None = None class SubmissionMetadata(BaseModel): agent_url: str models: list[ModelMetadata] agent_display_name: str | None = None agent_org_display_name: str | None = None # ============================================================================ # Helpers # ============================================================================ def find_metadata_file(directory: Path) -> Path | None: """Find metadata file in a directory, supporting both .yaml and .yml extensions.""" for ext in ("yaml", "yml"): path = directory / f"metadata.{ext}" if path.exists(): return path return None def has_metadata_file(directory: Path) -> bool: """Check if a directory has a metadata file (.yaml or .yml).""" return find_metadata_file(directory) is not None def get_job_dirs(submission_dir: Path) -> list[Path]: """Get job directories (those with config.json) from a submission directory.""" return [ d for d in submission_dir.iterdir() if d.is_dir() and (d / "config.json").exists() ] def get_trial_dirs(job_dir: Path) -> list[Path]: """Get trial directories (those with result.json) from a job directory.""" return [d for d in job_dir.iterdir() if d.is_dir() and (d / "result.json").exists()] def load_json(path: Path) -> dict: """Load a JSON file.""" with open(path) as f: return json.load(f) def load_yaml(path: Path) -> dict: """Load a YAML file.""" with open(path) as f: return yaml.safe_load(f) async def upload_trial_to_storage( client: AsyncClient, trial_dir: Path, trial_id: str ) -> str | None: """Upload trial directory as tar.gz and trajectory.json to Supabase storage. Returns: Public URL of the trial archive in storage, or None if upload failed. """ bucket_name = "trials" with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: tmp_path = Path(tmp_file.name) try: # Create tar.gz archive of the trial directory logger.info(f"Creating tar.gz archive for trial {trial_id}...") with tarfile.open(tmp_path, "w:gz") as tar: tar.add(trial_dir, arcname=trial_dir.name) archive_size = tmp_path.stat().st_size logger.info(f"Archive created: {archive_size / (1024 * 1024):.2f} MB") # Upload the archive with open(tmp_path, "rb") as f: await client.storage.from_(bucket_name).upload( file=f, path=f"{trial_id}.tar.gz", file_options={"upsert": "true"}, ) logger.info(f"Successfully uploaded {trial_id}.tar.gz") # Upload trajectory.json if it exists trajectory_path = trial_dir / "agent" / "trajectory.json" if trajectory_path.exists(): logger.info(f"Found trajectory.json for trial {trial_id}, uploading...") try: with open(trajectory_path, "rb") as f: await client.storage.from_(bucket_name).upload( file=f, path=f"{trial_id}-traj.json", file_options={"upsert": "true"}, ) logger.info(f"Successfully uploaded {trial_id}-traj.json") except Exception as e: logger.warning( f"Failed to upload trajectory.json for trial {trial_id}: {e}" ) # Return the public URL for the archive public_url = await client.storage.from_(bucket_name).get_public_url( f"{trial_id}.tar.gz" ) return public_url except Exception as e: logger.error(f"Failed to upload trial archive {trial_id}.tar.gz: {e}") return None finally: if tmp_path.exists(): tmp_path.unlink() def get_job_id_from_dir(job_dir: Path, trial_dirs: list[Path]) -> str | None: """Extract job_id from job result.json or first trial's config. This is used when we can't parse the full models (e.g., custom environment types). """ logger.debug(f"Looking for job_id in {job_dir.name}") # First try job result.json job_result_path = job_dir / "result.json" if job_result_path.exists(): logger.debug(f"Found job result.json at {job_result_path}") try: data = load_json(job_result_path) if data.get("id"): logger.debug(f"Got job_id from job result.json: {data['id']}") return data["id"] else: logger.debug("job result.json exists but has no 'id' field") except Exception as e: logger.debug(f"Failed to parse job result.json: {e}") else: logger.debug("No job result.json found") # Fall back to first trial's config.job_id if trial_dirs: logger.debug("Falling back to first trial's config.job_id") try: trial_data = load_json(trial_dirs[0] / "result.json") config = trial_data.get("config") if config and isinstance(config, dict): job_id = config.get("job_id") if job_id: logger.debug(f"Got job_id from trial config: {job_id}") return str(job_id) else: logger.debug("Trial config has no job_id") else: logger.debug("Trial has no config dict") except Exception as e: logger.debug(f"Failed to parse trial result.json: {e}") else: logger.debug("No trial directories to fall back to") logger.debug("Could not find job_id") return None # ============================================================================ # Validation # ============================================================================ @dataclass class ValidationResult: errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) models: list[str] = field(default_factory=list) job_count: int = 0 trial_count: int = 0 successful_trials: int = 0 @property def is_valid(self) -> bool: return len(self.errors) == 0 @property def accuracy(self) -> float | None: """Calculate accuracy as successful trials / total trials.""" if self.trial_count == 0: return None return self.successful_trials / self.trial_count def validate_job_config(config: dict, job_name: str) -> list[str]: """Validate a job config dict meets terminal-bench submission requirements. Uses raw dict instead of JobConfig model to allow custom environment types that aren't in Harbor's EnvironmentType enum. """ errors: list[str] = [] # Check timeout_multiplier timeout_mult = config.get("timeout_multiplier") if timeout_mult is not None and timeout_mult != 1.0: errors.append(f"`timeout_multiplier` is {timeout_mult}, expected 1.0") # Check agents for i, agent in enumerate(config.get("agents") or []): if not isinstance(agent, dict): continue if agent.get("override_timeout_sec") is not None: errors.append( f"`agents[{i}].override_timeout_sec` is set to {agent['override_timeout_sec']}" ) if agent.get("max_timeout_sec") is not None: errors.append( f"`agents[{i}].max_timeout_sec` is set to {agent['max_timeout_sec']}" ) # Check verifier verifier = config.get("verifier") if isinstance(verifier, dict): if verifier.get("override_timeout_sec") is not None: errors.append( f"`verifier.override_timeout_sec` is set to {verifier['override_timeout_sec']}" ) if verifier.get("max_timeout_sec") is not None: errors.append( f"`verifier.max_timeout_sec` is set to {verifier['max_timeout_sec']}" ) # Check environment environment = config.get("environment") if isinstance(environment, dict): if environment.get("override_cpus") is not None: errors.append( f"`environment.override_cpus` is set to {environment['override_cpus']}" ) if environment.get("override_memory_mb") is not None: errors.append( f"`environment.override_memory_mb` is set to {environment['override_memory_mb']}" ) if environment.get("override_storage_mb") is not None: errors.append( f"`environment.override_storage_mb` is set to {environment['override_storage_mb']}" ) return errors def validate_trial_result( trial: dict, job_dir_name: str, trial_dir_name: str ) -> list[str]: """Validate a trial result dict has required fields. Uses raw dict to allow custom environment types in trial config. """ errors: list[str] = [] prefix = f"`{job_dir_name}/{trial_dir_name}/result.json`" # Check required top-level fields required_fields = ["id", "trial_name", "task_checksum", "agent_info"] missing = [f for f in required_fields if f not in trial or trial[f] is None] if missing: errors.append(f"{prefix}: Missing required fields: {missing}") return errors # Check agent_info has required subfields agent_info = trial.get("agent_info") if not isinstance(agent_info, dict): errors.append(f"{prefix}: `agent_info` must be an object") return errors missing_agent = [] if not agent_info.get("name"): missing_agent.append("agent_info.name") if not agent_info.get("version"): missing_agent.append("agent_info.version") if missing_agent: errors.append(f"{prefix}: Missing required fields: {missing_agent}") return errors REWARD_HACKING_PATTERN = re.compile( r"harbor[-\s]?framework|harborframework|laude[-\s]?institute|tbench\.ai", re.IGNORECASE, ) def check_reward_hacking(trial_dir: Path, job_name: str, trial_name: str) -> list[str]: """Scan agent/* files in a trial directory for reward hacking strings.""" agent_dir = trial_dir / "agent" if not agent_dir.is_dir(): return [] errors: list[str] = [] for file_path in agent_dir.rglob("*"): if not file_path.is_file(): continue try: text = file_path.read_text(errors="ignore") except Exception: continue matches = set(REWARD_HACKING_PATTERN.findall(text)) if matches: rel = file_path.relative_to(trial_dir) errors.append( f"`{job_name}/{trial_name}/{rel}`: " f"Reward hacking detected: {', '.join(sorted(matches))}" ) return errors def validate_submission(submission_dir: Path) -> ValidationResult: """Validate a complete submission directory.""" logger.info(f"Validating submission: {submission_dir.name}") result = ValidationResult() # Check metadata exists and is valid metadata_path = find_metadata_file(submission_dir) if not metadata_path: logger.warning(f"Missing metadata.yaml in {submission_dir.name}") result.errors.append(f"Missing `metadata.yaml` in `{submission_dir.name}/`") return result logger.debug(f"Found metadata at {metadata_path}") try: metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path)) result.models = [ f"{m.model_name} ({m.model_provider})" for m in metadata.models ] logger.info(f"Metadata valid, models: {result.models}") except ValidationError as e: logger.warning(f"Invalid metadata.yaml: {e}") result.errors.append(f"Invalid `metadata.yaml`: {e}") return result # Find and validate job directories job_dirs = get_job_dirs(submission_dir) if not job_dirs: logger.warning(f"No job directories found in {submission_dir.name}") result.errors.append(f"No job directories found in `{submission_dir.name}/`") return result result.job_count = len(job_dirs) logger.info(f"Found {len(job_dirs)} job directories") task_trial_counts: defaultdict[str, int] = defaultdict(int) for job_dir in job_dirs: logger.debug(f"Validating job directory: {job_dir.name}") # Validate job config (using raw dict to allow custom environment types) try: config_raw = load_json(job_dir / "config.json") config_errors = validate_job_config(config_raw, job_dir.name) if config_errors: logger.warning(f"Job config errors in {job_dir.name}: {config_errors}") for error in config_errors: result.errors.append(f"`{job_dir.name}`: {error}") except json.JSONDecodeError as e: logger.warning(f"Invalid JSON in {job_dir.name}/config.json: {e}") result.errors.append(f"`{job_dir.name}/config.json`: Invalid JSON: {e}") continue # Find and validate trials trial_dirs = get_trial_dirs(job_dir) if not trial_dirs: logger.warning(f"No trial directories in {job_dir.name}") result.errors.append(f"`{job_dir.name}`: No trial directories found") continue logger.debug(f"Found {len(trial_dirs)} trials in {job_dir.name}") # Get canonical job_id for consistency check canonical_job_id: str | None = None job_result_path = job_dir / "result.json" if job_result_path.exists(): logger.debug(f"Found job result.json in {job_dir.name}") try: job_result = JobResult.model_validate(load_json(job_result_path)) canonical_job_id = str(job_result.id) logger.debug(f"Canonical job_id from result.json: {canonical_job_id}") except (ValidationError, json.JSONDecodeError) as e: logger.debug(f"Could not parse job result.json: {e}") for trial_dir in trial_dirs: logger.debug(f"Validating trial: {trial_dir.name}") try: trial_raw = load_json(trial_dir / "result.json") # Validate trial structure (using raw dict to allow custom environment types) trial_errors = validate_trial_result( trial_raw, job_dir.name, trial_dir.name ) if trial_errors: logger.warning(f"Trial validation errors: {trial_errors}") result.errors.extend(trial_errors) continue # Check job_id consistency trial_config = trial_raw.get("config") or {} trial_job_id = ( str(trial_config.get("job_id")) if trial_config.get("job_id") else None ) if canonical_job_id is None and trial_job_id: canonical_job_id = trial_job_id logger.debug(f"Set canonical job_id from trial: {canonical_job_id}") elif ( canonical_job_id and trial_job_id and trial_job_id != canonical_job_id ): logger.warning( f"Job ID mismatch in {trial_dir.name}: " f"{trial_job_id} != {canonical_job_id}" ) result.errors.append( f"`{job_dir.name}/{trial_dir.name}/result.json`: " f"`config.job_id` ({trial_job_id}) does not match job's id ({canonical_job_id})" ) continue # Check for reward hacking in agent files rh_errors = check_reward_hacking( trial_dir, job_dir.name, trial_dir.name ) if rh_errors: result.errors.extend(rh_errors) result.trial_count += 1 task_trial_counts[trial_raw["task_checksum"]] += 1 # Check if trial was successful (reward > 0) verifier_result = trial_raw.get("verifier_result") or {} rewards = verifier_result.get("rewards") or {} reward = rewards.get("reward", 0) if reward and float(reward) > 0: result.successful_trials += 1 logger.debug(f"Trial {trial_dir.name} successful (reward={reward})") else: logger.debug(f"Trial {trial_dir.name} failed (reward={reward})") except json.JSONDecodeError as e: logger.warning(f"Invalid JSON in {trial_dir.name}/result.json: {e}") result.errors.append( f"`{job_dir.name}/{trial_dir.name}/result.json`: Invalid JSON: {e}" ) # Check all tasks are covered if len(task_trial_counts) < EXPECTED_TASK_COUNT: result.errors.append( f"Submission covers {len(task_trial_counts)} unique task(s), " f"expected {EXPECTED_TASK_COUNT}" ) # Check minimum trials per task for task_checksum, count in sorted(task_trial_counts.items()): if count < MIN_TRIALS_PER_TASK: result.errors.append( f"Task `{task_checksum}` has only {count} trial(s), " f"minimum {MIN_TRIALS_PER_TASK} required" ) logger.info( f"Validation complete: {result.trial_count} trials, " f"{result.successful_trials} successful, {len(result.errors)} errors" ) return result def format_validation_comment(submissions: list[tuple[str, ValidationResult]]) -> str: """Format validation results as a markdown comment.""" lines = ["## Terminal-Bench Submission Validation", ""] all_valid = all(r.is_valid for _, r in submissions) if all_valid: lines.append("**All submissions passed validation!**") lines.append("") for name, result in submissions: lines.append(f"### `{name}`") if result.models: lines.append(f"- Models: {', '.join(result.models)}") lines.append(f"- Jobs: {result.job_count}") lines.append(f"- Trials: {result.trial_count}") if result.accuracy is not None: lines.append(f"- Accuracy: {result.accuracy:.1%}") lines.append("") lines.append("Ready to merge.") else: lines.append("**Validation failed**") lines.append("") for name, result in submissions: if not result.is_valid: lines.append(f"### `{name}`") max_errors = 20 for error in result.errors[:max_errors]: lines.append(f"- {error}") if len(result.errors) > max_errors: lines.append( f"- ... and {len(result.errors) - max_errors} more errors" ) lines.append("") lines.append("Please fix the errors and push again.") return "\n".join(lines) # ============================================================================ # Import Logic # ============================================================================ async def get_supabase_client() -> AsyncClient: """Create async Supabase client from environment variables.""" url = os.environ.get("SUPABASE_URL") key = os.environ.get("SUPABASE_SECRET_KEY") if not url or not key: raise ValueError("SUPABASE_URL and SUPABASE_SECRET_KEY must be set") return await create_async_client(url, key) async def job_exists(client: AsyncClient, job_id: str) -> bool: """Check if a job already exists in the database.""" result = await client.table("job").select("id").eq("id", job_id).execute() return len(result.data) > 0 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def upsert_with_retry(client: AsyncClient, table: str, data: dict | list[dict]): """Upsert data with retry logic.""" return await client.table(table).upsert(data).execute() @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def insert_ignore_conflicts( client: AsyncClient, table: str, data: dict | list[dict] ): """Insert data, ignoring conflicts (existing rows).""" return await ( client.table(table) .upsert(data, on_conflict="name,provider", ignore_duplicates=True) .execute() ) async def import_submission( submission_dir: Path, metadata: SubmissionMetadata, *, client: AsyncClient | None = None, dry_run: bool = False, ) -> ImportStats: """Import a validated submission to the database. If dry_run=True, parses all data and validates structure but skips database writes and storage uploads. """ logger.info( f"Starting import for submission: {submission_dir.name}" + (" (DRY RUN)" if dry_run else "") ) if not dry_run: db = client if client is not None else await get_supabase_client() stats: ImportStats = {"jobs_imported": 0, "trials_imported": 0, "errors": []} for job_dir in get_job_dirs(submission_dir): try: # Load job config (raw dict to allow custom environment types) config_raw = load_json(job_dir / "config.json") job_name = config_raw.get("job_name") or job_dir.name logger.info(f"Processing job: {job_name}") # Load job result if it exists (JobResult doesn't have enum issues) job_result: JobResult | None = None job_result_path = job_dir / "result.json" if job_result_path.exists(): logger.debug(f"Found job result.json for {job_name}") try: job_result = JobResult.model_validate(load_json(job_result_path)) logger.debug("Parsed job result successfully") except ValidationError as e: logger.debug(f"Could not parse job result.json: {e}") else: logger.debug(f"No job result.json for {job_name}") trial_dirs = get_trial_dirs(job_dir) if not trial_dirs: logger.warning(f"No trials in {job_dir.name}, skipping") stats["errors"].append(f"No trials in {job_dir.name}") continue logger.debug(f"Found {len(trial_dirs)} trials") # Get job_id from job result or trial job_id: str | None = None if job_result: job_id = str(job_result.id) logger.debug(f"Got job_id from job result: {job_id}") else: logger.debug("Getting job_id from directory") job_id = get_job_id_from_dir(job_dir, trial_dirs) if not job_id: logger.warning(f"Could not find job_id in {job_dir.name}, skipping") stats["errors"].append(f"Could not find job_id in {job_dir.name}") continue # Skip if job already exists if not dry_run and await job_exists(db, job_id): logger.info(f"Job {job_id} already exists in database, skipping") continue logger.info( f"{'[DRY RUN] Would import' if dry_run else 'Importing new'} job: {job_id}" ) # Insert job job_insert = JobInsert( id=UUID(job_id), config=config_raw, job_name=job_name, n_trials=len(trial_dirs), username="hf-submission", package_version=harbor.__version__, include_on_leaderboard=True, started_at=job_result.started_at if job_result else None, ended_at=job_result.finished_at if job_result else None, stats=job_result.stats.model_dump(mode="json") if job_result and job_result.stats else None, ) if not dry_run: await upsert_with_retry( db, "job", job_insert.model_dump(mode="json", exclude_none=True), ) logger.info( f"{'[DRY RUN] Parsed' if dry_run else 'Inserted'} job: {job_id}" ) stats["jobs_imported"] += 1 # Process trials in three phases: # Phase 1: Parse trial data (sequential, fast) # Phase 2: Upload to storage (parallel, slow) # Phase 3: Build insert objects (sequential, fast) agent_inserts: dict[tuple[str, str], dict] = {} model_inserts: dict[tuple[str, str], dict] = {} trial_inserts: list[dict] = [] trial_model_inserts: list[dict] = [] # Phase 1: Parse all trial JSONs and collect metadata parsed_trials: list[tuple[Path, dict]] = [] for trial_dir in trial_dirs: try: trial_raw = load_json(trial_dir / "result.json") # Extract required fields from raw dict agent_info = trial_raw.get("agent_info") or {} agent_name = agent_info.get("name", "unknown") agent_version = agent_info.get("version", "unknown") # Collect agent agent_key = (agent_name, agent_version) if agent_key not in agent_inserts: agent_inserts[agent_key] = AgentInsert( name=agent_name, version=agent_version, display_name=metadata.agent_display_name, url=metadata.agent_url, org_display_name=metadata.agent_org_display_name, ).model_dump(mode="json", exclude_none=True) # Collect models for model in metadata.models: model_key = (model.model_name, model.model_provider) if model_key not in model_inserts: # Look up token costs from litellm key = f"{model.model_provider}/{model.model_name}" token_costs = model_cost.get(key) or model_cost.get( model.model_name ) input_cost = ( token_costs.get("input_cost_per_token") if token_costs else None ) output_cost = ( token_costs.get("output_cost_per_token") if token_costs else None ) model_inserts[model_key] = ModelInsert( name=model.model_name, provider=model.model_provider, display_name=model.model_display_name, org_display_name=model.model_org_display_name, cents_per_million_input_tokens=round(input_cost * 1e8) if input_cost else None, cents_per_million_output_tokens=round(output_cost * 1e8) if output_cost else None, ).model_dump(mode="json", exclude_none=True) parsed_trials.append((trial_dir, trial_raw)) except Exception as e: logger.error(f"Error parsing trial {trial_dir.name}: {e}") stats["errors"].append(f"Trial {trial_dir.name}: {e}") # Phase 2: Upload trials to storage in parallel storage_urls: dict[str, str | None] = {} if parsed_trials and not dry_run: logger.info( f"Uploading {len(parsed_trials)} trials to storage " f"with {MAX_UPLOAD_WORKERS} workers" ) sem = asyncio.Semaphore(MAX_UPLOAD_WORKERS) async def _upload(td: Path, tr: dict) -> None: tid = str(tr["id"]) async with sem: try: url = await upload_trial_to_storage(db, td, tid) if not url: logger.warning( f"Storage upload failed for trial {tid}, " "trial_uri will be null" ) storage_urls[tid] = url except Exception as e: logger.error(f"Storage upload error for trial {tid}: {e}") storage_urls[tid] = None async with asyncio.TaskGroup() as tg: for td, tr in parsed_trials: tg.create_task(_upload(td, tr)) elif parsed_trials and dry_run: logger.info( f"[DRY RUN] Would upload {len(parsed_trials)} trials to storage" ) # Phase 3: Build TrialInsert and TrialModelInsert objects for trial_dir, trial_raw in parsed_trials: try: agent_info = trial_raw.get("agent_info") or {} agent_name = agent_info.get("name", "unknown") agent_version = agent_info.get("version", "unknown") # Get reward from verifier_result (optional) reward: Decimal | None = None verifier_result = trial_raw.get("verifier_result") or {} rewards = verifier_result.get("rewards") or {} reward_val = rewards.get("reward") if reward_val is not None: reward = Decimal(str(reward_val)) # Extract timing info (all optional) agent_execution = trial_raw.get("agent_execution") or {} agent_setup = trial_raw.get("agent_setup") or {} environment_setup = trial_raw.get("environment_setup") or {} verifier_timing = trial_raw.get("verifier") or {} agent_result = trial_raw.get("agent_result") or {} trial_id = str(trial_raw["id"]) storage_url = storage_urls.get(trial_id) # Collect trial trial_inserts.append( TrialInsert( id=trial_raw["id"], agent_name=agent_name, agent_version=agent_version, config=trial_raw.get("config") or {}, task_checksum=trial_raw.get("task_checksum", ""), trial_name=trial_raw.get("trial_name", ""), trial_uri=storage_url, # TimingInfo fields (all optional) agent_execution_started_at=agent_execution.get( "started_at" ), agent_execution_ended_at=agent_execution.get("finished_at"), agent_setup_started_at=agent_setup.get("started_at"), agent_setup_ended_at=agent_setup.get("finished_at"), environment_setup_started_at=environment_setup.get( "started_at" ), environment_setup_ended_at=environment_setup.get( "finished_at" ), verifier_started_at=verifier_timing.get("started_at"), verifier_ended_at=verifier_timing.get("finished_at"), # Other optional fields exception_info=trial_raw.get("exception_info"), job_id=UUID(job_id), reward=reward, started_at=trial_raw.get("started_at"), ended_at=trial_raw.get("finished_at"), agent_metadata=agent_result.get("metadata"), ).model_dump(mode="json", exclude_none=True) ) # Collect trial_models for model in metadata.models: trial_model_inserts.append( TrialModelInsert( trial_id=trial_raw["id"], model_name=model.model_name, model_provider=model.model_provider, n_input_tokens=agent_result.get("n_input_tokens"), n_output_tokens=agent_result.get("n_output_tokens"), n_cache_tokens=agent_result.get("n_cache_tokens"), ).model_dump(mode="json", by_alias=True, exclude_none=True) ) stats["trials_imported"] += 1 except Exception as e: logger.error(f"Error processing trial {trial_dir.name}: {e}") stats["errors"].append(f"Trial {trial_dir.name}: {e}") # Batch insert if dry_run: logger.info( f"[DRY RUN] Would insert: {len(agent_inserts)} agents, " f"{len(model_inserts)} models, {len(trial_inserts)} trials, " f"{len(trial_model_inserts)} trial_models" ) else: if agent_inserts: logger.debug(f"Upserting {len(agent_inserts)} agents") await upsert_with_retry(db, "agent", list(agent_inserts.values())) if model_inserts: logger.debug(f"Inserting {len(model_inserts)} models") await insert_ignore_conflicts( db, "model", list(model_inserts.values()) ) if trial_inserts: logger.debug(f"Upserting {len(trial_inserts)} trials") await upsert_with_retry(db, "trial", trial_inserts) if trial_model_inserts: logger.debug(f"Upserting {len(trial_model_inserts)} trial_models") await upsert_with_retry(db, "trial_model", trial_model_inserts) logger.info( f"Job {job_dir.name} import complete: " f"{len(trial_inserts)} trials, {len(agent_inserts)} agents, {len(model_inserts)} models" ) except ValidationError as e: logger.exception(f"Error importing job {job_dir.name}") stats["errors"].append(f"Job {job_dir.name}: {e}") except Exception as e: logger.exception(f"Error importing job {job_dir.name}") stats["errors"].append(f"Job {job_dir.name}: {e}") logger.info(f"Submission import complete: {stats}") return stats # ============================================================================ # HF Hub Operations # ============================================================================ async def _list_submission_folders( api: HfApi, revision: str | None = None ) -> list[RepoFolder]: """List submission folders (non-recursive, fast).""" items = await asyncio.to_thread( lambda: list( api.list_repo_tree( repo_id=DATASET_REPO, path_in_repo=SUBMISSIONS_PATH, repo_type="dataset", revision=revision, ) ) ) return [item for item in items if isinstance(item, RepoFolder)] async def _list_job_folders( api: HfApi, submission_path: str, revision: str | None = None ) -> list[RepoFolder]: """List job folders inside a submission (non-recursive, fast).""" items = await asyncio.to_thread( lambda: list( api.list_repo_tree( repo_id=DATASET_REPO, path_in_repo=submission_path, repo_type="dataset", revision=revision, ) ) ) return [item for item in items if isinstance(item, RepoFolder)] def _download_file( file_path: str, local_dir: str | Path, revision: str | None = None, ) -> Path: """Download a single file from the dataset repo.""" return Path( hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type="dataset", revision=revision, local_dir=str(local_dir), ) ) async def _download_submission_files( api: HfApi, submission_name: str, local_dir: str | Path, revision: str | None = None, ) -> None: """Download essential files for a submission: metadata, configs, and results. Uses level-by-level tree listing + parallel file downloads to avoid the recursive tree listing that causes timeouts on large repos. """ sub_path = f"{SUBMISSIONS_PATH}/{submission_name}" # Download metadata.yaml (try both extensions) for ext in ("yaml", "yml"): try: await asyncio.to_thread( _download_file, f"{sub_path}/metadata.{ext}", local_dir, revision ) break except Exception: pass # Collect all file paths to download, then fetch in parallel files_to_download: list[str] = [] for job_folder in await _list_job_folders(api, sub_path, revision): job_path = job_folder.path files_to_download.append(f"{job_path}/config.json") files_to_download.append(f"{job_path}/result.json") # List trial folders and collect their result.json paths trial_items = await asyncio.to_thread( lambda jp=job_path: list( api.list_repo_tree( repo_id=DATASET_REPO, path_in_repo=jp, repo_type="dataset", revision=revision, ) ) ) for trial_item in trial_items: if isinstance(trial_item, RepoFolder): files_to_download.append(f"{trial_item.path}/result.json") # List agent/* files for reward hacking checks try: agent_items = await asyncio.to_thread( lambda tp=trial_item.path: list( api.list_repo_tree( repo_id=DATASET_REPO, path_in_repo=f"{tp}/agent", repo_type="dataset", revision=revision, recursive=True, ) ) ) for agent_item in agent_items: if isinstance(agent_item, RepoFile): files_to_download.append(agent_item.path) except Exception: pass # agent/ dir may not exist logger.info(f"Downloading {len(files_to_download)} files for {submission_name}") sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) async def _safe_download(file_path: str) -> None: async with sem: try: await asyncio.to_thread(_download_file, file_path, local_dir, revision) except Exception as e: logger.debug(f"Could not download {file_path}: {e}") async with asyncio.TaskGroup() as tg: for f in files_to_download: tg.create_task(_safe_download(f)) async def get_changed_submission_names(api: HfApi, pr_revision: str) -> list[str]: """Detect which submission folders changed in a PR vs main. Compares submission folder tree_ids (content hashes of the whole subtree) between the PR and main. A different tree_id means something changed inside. """ logger.info(f"Comparing submission folders: PR ({pr_revision}) vs main") pr_folders = { f.path.split("/")[-1]: f.tree_id for f in await _list_submission_folders(api, pr_revision) } main_folders = { f.path.split("/")[-1]: f.tree_id for f in await _list_submission_folders(api, "main") } changed = [ name for name, tree_id in pr_folders.items() if main_folders.get(name) != tree_id ] logger.info(f"Changed submission folders: {sorted(changed)}") return sorted(changed) def get_new_job_ids_by_submission( probe_dir: Path, ) -> dict[str, list[str]]: """From a probe download of result.json files, extract job IDs grouped by submission. Returns a dict mapping submission folder name -> list of job IDs found. """ submissions_dir = probe_dir / SUBMISSIONS_PATH if not submissions_dir.exists(): return {} result: dict[str, list[str]] = {} for sub_dir in submissions_dir.iterdir(): if not sub_dir.is_dir(): continue job_ids: list[str] = [] for job_dir in sub_dir.iterdir(): if not job_dir.is_dir(): continue result_path = job_dir / "result.json" if not result_path.exists(): continue try: data = load_json(result_path) job_id = data.get("id") if job_id: job_ids.append(str(job_id)) except Exception as e: logger.debug(f"Failed to parse {result_path}: {e}") if job_ids: result[sub_dir.name] = job_ids return result async def batch_check_existing_jobs( client: AsyncClient, job_ids: list[str] ) -> set[str]: """Check which job IDs already exist in the database. Returns set of existing IDs.""" if not job_ids: return set() # Supabase .in_() has a limit, so batch in chunks existing: set[str] = set() chunk_size = 100 for i in range(0, len(job_ids), chunk_size): chunk = job_ids[i : i + chunk_size] result = await client.table("job").select("id").in_("id", chunk).execute() for row in cast(list[dict[str, Any]], result.data): existing.add(row["id"]) return existing # ============================================================================ # Webhook Handlers # ============================================================================ async def handle_pr(pr_num: int, api: HfApi) -> dict: """Handle PR validation.""" logger.info(f"Processing PR #{pr_num}") pr_revision = f"refs/pr/{pr_num}" # Detect changed submission folders by comparing file trees (no downloads) try: changed_names = await get_changed_submission_names(api, pr_revision) except Exception as e: logger.error(f"Failed to detect changed submissions: {e}") return {"status": "error", "message": "Failed to determine changed files"} if not changed_names: comment = "No new or changed submissions found in this PR." else: # Download only essential files for changed submissions logger.info( f"Downloading {len(changed_names)} changed submissions: {changed_names}" ) with tempfile.TemporaryDirectory() as tmpdir: local_dir = os.path.join(tmpdir, "repo") for name in changed_names: await _download_submission_files( api, name, local_dir, revision=pr_revision ) submissions_dir = Path(local_dir) / SUBMISSIONS_PATH subs_to_validate: list[Path] = [] missing_metadata: list[str] = [] for name in changed_names: sub_dir = submissions_dir / name if not sub_dir.exists() or not sub_dir.is_dir(): continue if has_metadata_file(sub_dir): subs_to_validate.append(sub_dir) else: missing_metadata.append(name) logger.info(f"Will validate {len(subs_to_validate)} submissions") if missing_metadata: logger.warning(f"Submissions missing metadata: {missing_metadata}") if missing_metadata: comment = ( "## Terminal-Bench Submission Validation\n\n" "**Validation failed**\n\n" "The following submission directories are missing `metadata.yaml`:\n" + "\n".join(f"- `{name}/`" for name in missing_metadata) + "\n\nPlease add a `metadata.yaml` file to each submission " "directory, as described in the README." ) elif not subs_to_validate: comment = "No new or changed submissions found in this PR." else: logger.info(f"Validating {len(subs_to_validate)} submissions") results = [(d.name, validate_submission(d)) for d in subs_to_validate] all_valid = all(r.is_valid for _, r in results) logger.info(f"Validation complete, all_valid={all_valid}") comment = format_validation_comment(results) logger.info(f"Posting comment to PR #{pr_num}") await asyncio.to_thread( api.comment_discussion, repo_id=DATASET_REPO, repo_type="dataset", discussion_num=pr_num, comment=comment, ) logger.info(f"PR #{pr_num} handling complete") return {"status": "success", "action": "validated"} async def handle_merge(*, dry_run: bool = False) -> dict: """Handle merge to main - import only new submissions. If dry_run=True, downloads and parses all data but skips database writes and storage uploads. Useful for testing the full pipeline. """ logger.info(f"Processing merge to main{' (DRY RUN)' if dry_run else ''}") api = HfApi(token=os.environ.get("HF_TOKEN")) with tempfile.TemporaryDirectory() as tmpdir: probe_dir = os.path.join(tmpdir, "probe") # Phase 1: Download only job-level result.json to discover job IDs # Walk the tree level-by-level (non-recursive) to avoid timeouts logger.info("Phase 1: Probing for job IDs via result.json files") sub_folders = await _list_submission_folders(api) logger.info(f"Found {len(sub_folders)} submission folders") # Collect all job result.json paths, then download in parallel probe_files: list[str] = [] for sub_folder in sub_folders: for job_folder in await _list_job_folders(api, sub_folder.path): probe_files.append(f"{job_folder.path}/result.json") logger.info(f"Downloading {len(probe_files)} job result.json files") sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) async def _probe_download(file_path: str) -> None: async with sem: try: await asyncio.to_thread(_download_file, file_path, probe_dir, None) except Exception as e: logger.debug(f"No result.json at {file_path}: {e}") async with asyncio.TaskGroup() as tg: for f in probe_files: tg.create_task(_probe_download(f)) jobs_by_submission = get_new_job_ids_by_submission(Path(probe_dir)) all_job_ids = [jid for ids in jobs_by_submission.values() for jid in ids] logger.info( f"Found {len(all_job_ids)} jobs across " f"{len(jobs_by_submission)} submissions" ) if not all_job_ids: return {"status": "success", "message": "no jobs found"} # Create client once for all DB operations (skipped in dry run) client: AsyncClient | None = None if dry_run else await get_supabase_client() if dry_run: # In dry run, treat all submissions as having new jobs new_submission_names = sorted(jobs_by_submission.keys()) logger.info( f"[DRY RUN] Skipping DB check, treating all " f"{len(new_submission_names)} submissions as new" ) else: # Check which jobs already exist in the database assert client is not None existing_ids = await batch_check_existing_jobs(client, all_job_ids) logger.info(f"{len(existing_ids)} of {len(all_job_ids)} jobs already in DB") # Determine which submissions have at least one new job new_submission_names = [] for name, job_ids in jobs_by_submission.items(): new_ids = [jid for jid in job_ids if jid not in existing_ids] if new_ids: logger.info( f"Submission {name}: {len(new_ids)} new jobs " f"(of {len(job_ids)} total)" ) new_submission_names.append(name) else: logger.debug( f"Submission {name}: all {len(job_ids)} jobs exist, skipping" ) if not new_submission_names: logger.info("All jobs already imported, nothing to do") return {"status": "success", "message": "all jobs already imported"} # Phase 2: Download essential files for submissions with new jobs local_dir = os.path.join(tmpdir, "repo") logger.info( f"Phase 2: Downloading {len(new_submission_names)} submissions " f"with new jobs: {new_submission_names}" ) for name in new_submission_names: await _download_submission_files(api, name, local_dir) submissions_dir = Path(local_dir) / SUBMISSIONS_PATH total: TotalStats = {"jobs": 0, "trials": 0, "errors": []} for name in new_submission_names: sub_dir = submissions_dir / name if not sub_dir.exists() or not sub_dir.is_dir(): logger.warning(f"Expected folder not found after download: {name}") total["errors"].append(f"{name}: folder not found after download") continue metadata_path = find_metadata_file(sub_dir) if not metadata_path: logger.debug(f"Skipping {name}: no metadata file") continue try: logger.info(f"Importing submission: {name}") metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path)) stats = await import_submission( sub_dir, metadata, client=client, dry_run=dry_run ) total["jobs"] += stats["jobs_imported"] total["trials"] += stats["trials_imported"] total["errors"].extend(stats["errors"]) logger.info( f"Submission {name} imported: " f"{stats['jobs_imported']} jobs, {stats['trials_imported']} trials" ) except Exception as e: logger.exception(f"Error importing {name}") total["errors"].append(f"{name}: {e}") logger.info( f"Merge handling complete: {total['jobs']} jobs, " f"{total['trials']} trials, {len(total['errors'])} errors" ) return {"status": "success", "action": "imported", "stats": total} # ============================================================================ # Gradio UI & Webhook Server # ============================================================================ with gr.Blocks() as ui: gr.Markdown(""" # Terminal-Bench Leaderboard Importer This Space automatically processes submissions to the Terminal-Bench leaderboard. ## How it works 1. **On Pull Request**: Validates the submission and posts a comment with results 2. **On Merge**: Imports validated submissions to the leaderboard database ## Submission Format Submissions should be placed in `submissions/terminal-bench/2.0/__/`: ``` submissions/terminal-bench/2.0/my-agent__gpt-5/ metadata.yaml job-folder/ config.json trial-1/result.json trial-2/result.json ... ``` See the [leaderboard repo](https://huggingface.co/datasets/alexgshaw/terminal-bench-2-leaderboard) for detailed submission instructions. """) _background_tasks: set[asyncio.Task] = set() def create_app() -> WebhooksServer: """Create and configure the webhook server.""" app = WebhooksServer(ui=ui, webhook_secret=os.environ.get("WEBHOOK_SECRET")) @app.add_webhook() async def handle_submission(payload: WebhookPayload) -> dict: """Handle webhook events from the leaderboard repo.""" logger.info( f"Received webhook: action={payload.event.action}, scope={payload.event.scope}" ) logger.debug( f"Webhook payload repo: type={payload.repo.type}, name={payload.repo.name}" ) # Only process our dataset repo if payload.repo.type != "dataset" or payload.repo.name != DATASET_REPO: logger.info( f"Ignoring webhook: wrong repo " f"(type={payload.repo.type}, name={payload.repo.name})" ) return {"status": "ignored", "reason": "wrong repo"} logger.debug("Webhook is for our dataset repo") # Determine if this is a PR event is_pr = payload.discussion is not None and payload.discussion.isPullRequest pr_num = payload.discussion.num if payload.discussion else None logger.debug(f"Initial PR detection: is_pr={is_pr}, pr_num={pr_num}") # Check updatedRefs for PR refs when discussion is not present if not is_pr and payload.event.scope == "repo.content": logger.debug("Checking updatedRefs for PR refs") for ref_info in getattr(payload, "updatedRefs", None) or []: ref = ( ref_info.get("ref", "") if isinstance(ref_info, dict) else getattr(ref_info, "ref", "") ) logger.debug(f"Checking ref: {ref}") match = re.match(r"refs/pr/(\d+)", ref) if match: is_pr = True pr_num = int(match.group(1)) logger.debug(f"Found PR ref in updatedRefs: PR #{pr_num}") break is_merge = ( payload.event.action == "update" and payload.event.scope == "repo.content" and not is_pr ) logger.info( f"Event classification: is_pr={is_pr}, is_merge={is_merge}, pr_num={pr_num}" ) def _run_in_background(coro): """Fire-and-forget: run coroutine as a background task on the event loop.""" task = asyncio.create_task(coro) _background_tasks.add(task) def _on_done(t): _background_tasks.discard(t) if t.exception(): logger.exception( "Error in background handler", exc_info=t.exception() ) task.add_done_callback(_on_done) if is_pr and pr_num is not None: logger.info(f"Handling as PR event: PR #{pr_num}") api = HfApi(token=os.environ.get("HF_TOKEN")) _run_in_background(handle_pr(pr_num, api)) return {"status": "processing", "action": "validating"} elif is_merge: logger.info("Handling as merge event") _run_in_background(handle_merge()) return {"status": "processing", "action": "importing"} else: logger.info( f"Ignoring webhook: unhandled event type " f"(action={payload.event.action}, scope={payload.event.scope})" ) return {"status": "ignored", "reason": "unhandled event type"} return app if __name__ == "__main__": app = create_app() app.launch(server_name="0.0.0.0", server_port=7860)