| """ |
| Terminal-Bench Leaderboard Importer |
| |
| A HuggingFace Space that: |
| 1. Validates submissions on PR (posts comment with validation results) |
| 2. Imports validated submissions to Supabase on merge |
| """ |
|
|
| import asyncio |
| import json |
| import logging |
| import os |
| import re |
| import tarfile |
| import tempfile |
| from collections import defaultdict |
| from dataclasses import dataclass, field |
| from decimal import Decimal |
| from pathlib import Path |
| from typing import Any, TypedDict, cast |
| from uuid import UUID |
|
|
| import gradio as gr |
| import harbor |
| import yaml |
| from harbor.models.job.result import JobResult |
|
|
| |
| |
| |
| from huggingface_hub import ( |
| HfApi, |
| RepoFile, |
| RepoFolder, |
| WebhookPayload, |
| WebhooksServer, |
| hf_hub_download, |
| ) |
| from litellm import model_cost |
| from pydantic import UUID4, BaseModel, ConfigDict, Field, ValidationError |
| from supabase._async.client import AsyncClient, create_client as create_async_client |
| from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| DATASET_REPO = "harborframework/terminal-bench-2-leaderboard" |
| SUBMISSIONS_PATH = "submissions/terminal-bench/2.0" |
| MAX_CONCURRENT_DOWNLOADS = 20 |
| MIN_TRIALS_PER_TASK = 5 |
| EXPECTED_TASK_COUNT = 89 |
| MAX_UPLOAD_WORKERS = 8 |
|
|
|
|
| |
| |
| |
|
|
|
|
| class ImportStats(TypedDict): |
| jobs_imported: int |
| trials_imported: int |
| errors: list[str] |
|
|
|
|
| class TotalStats(TypedDict): |
| jobs: int |
| trials: int |
| errors: list[str] |
|
|
|
|
| |
| |
| |
|
|
|
|
| class AgentInsert(BaseModel): |
| name: str |
| version: str |
| created_at: Any | None = None |
| description: str | None = None |
| display_name: str | None = None |
| org_display_name: str | None = None |
| url: str | None = None |
|
|
|
|
| class JobInsert(BaseModel): |
| id: UUID4 |
| config: dict | list[dict] | list[Any] |
| job_name: str |
| n_trials: int |
| username: str |
| created_at: Any | None = None |
| ended_at: Any | None = None |
| git_commit_id: str | None = None |
| include_on_leaderboard: bool | None = None |
| metrics: dict | list[dict] | list[Any] | None = None |
| package_version: str | None = None |
| started_at: Any | None = None |
| stats: dict | list[dict] | list[Any] | None = None |
| verified: bool | None = None |
|
|
|
|
| class ModelInsert(BaseModel): |
| name: str |
| provider: str |
| cents_per_million_cache_tokens: Decimal | None = None |
| cents_per_million_input_tokens: int | None = None |
| cents_per_million_output_tokens: int | None = None |
| created_at: Any | None = None |
| description: str | None = None |
| display_name: str | None = None |
| org_display_name: str | None = None |
|
|
|
|
| class TrialInsert(BaseModel): |
| id: UUID4 |
| agent_name: str |
| agent_version: str |
| config: dict | list[dict] | list[Any] |
| task_checksum: str |
| trial_name: str |
| agent_execution_ended_at: Any | None = None |
| agent_execution_started_at: Any | None = None |
| agent_metadata: dict | list[dict] | list[Any] | None = None |
| agent_setup_ended_at: Any | None = None |
| agent_setup_started_at: Any | None = None |
| created_at: Any | None = None |
| ended_at: Any | None = None |
| environment_setup_ended_at: Any | None = None |
| environment_setup_started_at: Any | None = None |
| exception_info: dict | list[dict] | list[Any] | None = None |
| job_id: UUID4 | None = None |
| reward: Decimal | None = None |
| started_at: Any | None = None |
| trial_uri: str | None = None |
| verifier_ended_at: Any | None = None |
| verifier_started_at: Any | None = None |
|
|
|
|
| class TrialModelInsert(BaseModel): |
| model_config = ConfigDict(populate_by_name=True) |
|
|
| model_name: str = Field(alias="model_name") |
| model_provider: str = Field(alias="model_provider") |
| trial_id: UUID4 |
| created_at: Any | None = None |
| n_cache_tokens: int | None = None |
| n_input_tokens: int | None = None |
| n_output_tokens: int | None = None |
|
|
|
|
| |
| |
| |
|
|
|
|
| class ModelMetadata(BaseModel): |
| model_name: str |
| model_provider: str |
| model_display_name: str | None = None |
| model_org_display_name: str | None = None |
|
|
|
|
| class SubmissionMetadata(BaseModel): |
| agent_url: str |
| models: list[ModelMetadata] |
| agent_display_name: str | None = None |
| agent_org_display_name: str | None = None |
|
|
|
|
| |
| |
| |
|
|
|
|
| def find_metadata_file(directory: Path) -> Path | None: |
| """Find metadata file in a directory, supporting both .yaml and .yml extensions.""" |
| for ext in ("yaml", "yml"): |
| path = directory / f"metadata.{ext}" |
| if path.exists(): |
| return path |
| return None |
|
|
|
|
| def has_metadata_file(directory: Path) -> bool: |
| """Check if a directory has a metadata file (.yaml or .yml).""" |
| return find_metadata_file(directory) is not None |
|
|
|
|
| def get_job_dirs(submission_dir: Path) -> list[Path]: |
| """Get job directories (those with config.json) from a submission directory.""" |
| return [ |
| d |
| for d in submission_dir.iterdir() |
| if d.is_dir() and (d / "config.json").exists() |
| ] |
|
|
|
|
| def get_trial_dirs(job_dir: Path) -> list[Path]: |
| """Get trial directories (those with result.json) from a job directory.""" |
| return [d for d in job_dir.iterdir() if d.is_dir() and (d / "result.json").exists()] |
|
|
|
|
| def load_json(path: Path) -> dict: |
| """Load a JSON file.""" |
| with open(path) as f: |
| return json.load(f) |
|
|
|
|
| def load_yaml(path: Path) -> dict: |
| """Load a YAML file.""" |
| with open(path) as f: |
| return yaml.safe_load(f) |
|
|
|
|
| async def upload_trial_to_storage( |
| client: AsyncClient, trial_dir: Path, trial_id: str |
| ) -> str | None: |
| """Upload trial directory as tar.gz and trajectory.json to Supabase storage. |
| |
| Returns: |
| Public URL of the trial archive in storage, or None if upload failed. |
| """ |
| bucket_name = "trials" |
|
|
| with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: |
| tmp_path = Path(tmp_file.name) |
|
|
| try: |
| |
| logger.info(f"Creating tar.gz archive for trial {trial_id}...") |
| with tarfile.open(tmp_path, "w:gz") as tar: |
| tar.add(trial_dir, arcname=trial_dir.name) |
|
|
| archive_size = tmp_path.stat().st_size |
| logger.info(f"Archive created: {archive_size / (1024 * 1024):.2f} MB") |
|
|
| |
| with open(tmp_path, "rb") as f: |
| await client.storage.from_(bucket_name).upload( |
| file=f, |
| path=f"{trial_id}.tar.gz", |
| file_options={"upsert": "true"}, |
| ) |
| logger.info(f"Successfully uploaded {trial_id}.tar.gz") |
|
|
| |
| trajectory_path = trial_dir / "agent" / "trajectory.json" |
| if trajectory_path.exists(): |
| logger.info(f"Found trajectory.json for trial {trial_id}, uploading...") |
| try: |
| with open(trajectory_path, "rb") as f: |
| await client.storage.from_(bucket_name).upload( |
| file=f, |
| path=f"{trial_id}-traj.json", |
| file_options={"upsert": "true"}, |
| ) |
| logger.info(f"Successfully uploaded {trial_id}-traj.json") |
| except Exception as e: |
| logger.warning( |
| f"Failed to upload trajectory.json for trial {trial_id}: {e}" |
| ) |
|
|
| |
| public_url = await client.storage.from_(bucket_name).get_public_url( |
| f"{trial_id}.tar.gz" |
| ) |
| return public_url |
|
|
| except Exception as e: |
| logger.error(f"Failed to upload trial archive {trial_id}.tar.gz: {e}") |
| return None |
|
|
| finally: |
| if tmp_path.exists(): |
| tmp_path.unlink() |
|
|
|
|
| def get_job_id_from_dir(job_dir: Path, trial_dirs: list[Path]) -> str | None: |
| """Extract job_id from job result.json or first trial's config. |
| |
| This is used when we can't parse the full models (e.g., custom environment types). |
| """ |
| logger.debug(f"Looking for job_id in {job_dir.name}") |
|
|
| |
| job_result_path = job_dir / "result.json" |
| if job_result_path.exists(): |
| logger.debug(f"Found job result.json at {job_result_path}") |
| try: |
| data = load_json(job_result_path) |
| if data.get("id"): |
| logger.debug(f"Got job_id from job result.json: {data['id']}") |
| return data["id"] |
| else: |
| logger.debug("job result.json exists but has no 'id' field") |
| except Exception as e: |
| logger.debug(f"Failed to parse job result.json: {e}") |
| else: |
| logger.debug("No job result.json found") |
|
|
| |
| if trial_dirs: |
| logger.debug("Falling back to first trial's config.job_id") |
| try: |
| trial_data = load_json(trial_dirs[0] / "result.json") |
| config = trial_data.get("config") |
| if config and isinstance(config, dict): |
| job_id = config.get("job_id") |
| if job_id: |
| logger.debug(f"Got job_id from trial config: {job_id}") |
| return str(job_id) |
| else: |
| logger.debug("Trial config has no job_id") |
| else: |
| logger.debug("Trial has no config dict") |
| except Exception as e: |
| logger.debug(f"Failed to parse trial result.json: {e}") |
| else: |
| logger.debug("No trial directories to fall back to") |
|
|
| logger.debug("Could not find job_id") |
| return None |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class ValidationResult: |
| errors: list[str] = field(default_factory=list) |
| warnings: list[str] = field(default_factory=list) |
| models: list[str] = field(default_factory=list) |
| job_count: int = 0 |
| trial_count: int = 0 |
| successful_trials: int = 0 |
|
|
| @property |
| def is_valid(self) -> bool: |
| return len(self.errors) == 0 |
|
|
| @property |
| def accuracy(self) -> float | None: |
| """Calculate accuracy as successful trials / total trials.""" |
| if self.trial_count == 0: |
| return None |
| return self.successful_trials / self.trial_count |
|
|
|
|
| def validate_job_config(config: dict, job_name: str) -> list[str]: |
| """Validate a job config dict meets terminal-bench submission requirements. |
| |
| Uses raw dict instead of JobConfig model to allow custom environment types |
| that aren't in Harbor's EnvironmentType enum. |
| """ |
| errors: list[str] = [] |
|
|
| |
| timeout_mult = config.get("timeout_multiplier") |
| if timeout_mult is not None and timeout_mult != 1.0: |
| errors.append(f"`timeout_multiplier` is {timeout_mult}, expected 1.0") |
|
|
| |
| for i, agent in enumerate(config.get("agents") or []): |
| if not isinstance(agent, dict): |
| continue |
| if agent.get("override_timeout_sec") is not None: |
| errors.append( |
| f"`agents[{i}].override_timeout_sec` is set to {agent['override_timeout_sec']}" |
| ) |
| if agent.get("max_timeout_sec") is not None: |
| errors.append( |
| f"`agents[{i}].max_timeout_sec` is set to {agent['max_timeout_sec']}" |
| ) |
|
|
| |
| verifier = config.get("verifier") |
| if isinstance(verifier, dict): |
| if verifier.get("override_timeout_sec") is not None: |
| errors.append( |
| f"`verifier.override_timeout_sec` is set to {verifier['override_timeout_sec']}" |
| ) |
| if verifier.get("max_timeout_sec") is not None: |
| errors.append( |
| f"`verifier.max_timeout_sec` is set to {verifier['max_timeout_sec']}" |
| ) |
|
|
| |
| environment = config.get("environment") |
| if isinstance(environment, dict): |
| if environment.get("override_cpus") is not None: |
| errors.append( |
| f"`environment.override_cpus` is set to {environment['override_cpus']}" |
| ) |
| if environment.get("override_memory_mb") is not None: |
| errors.append( |
| f"`environment.override_memory_mb` is set to {environment['override_memory_mb']}" |
| ) |
| if environment.get("override_storage_mb") is not None: |
| errors.append( |
| f"`environment.override_storage_mb` is set to {environment['override_storage_mb']}" |
| ) |
|
|
| return errors |
|
|
|
|
| def validate_trial_result( |
| trial: dict, job_dir_name: str, trial_dir_name: str |
| ) -> list[str]: |
| """Validate a trial result dict has required fields. |
| |
| Uses raw dict to allow custom environment types in trial config. |
| """ |
| errors: list[str] = [] |
| prefix = f"`{job_dir_name}/{trial_dir_name}/result.json`" |
|
|
| |
| required_fields = ["id", "trial_name", "task_checksum", "agent_info"] |
| missing = [f for f in required_fields if f not in trial or trial[f] is None] |
| if missing: |
| errors.append(f"{prefix}: Missing required fields: {missing}") |
| return errors |
|
|
| |
| agent_info = trial.get("agent_info") |
| if not isinstance(agent_info, dict): |
| errors.append(f"{prefix}: `agent_info` must be an object") |
| return errors |
|
|
| missing_agent = [] |
| if not agent_info.get("name"): |
| missing_agent.append("agent_info.name") |
| if not agent_info.get("version"): |
| missing_agent.append("agent_info.version") |
| if missing_agent: |
| errors.append(f"{prefix}: Missing required fields: {missing_agent}") |
|
|
| return errors |
|
|
|
|
| REWARD_HACKING_PATTERN = re.compile( |
| r"harbor[-\s]?framework|harborframework|laude[-\s]?institute|tbench\.ai", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def check_reward_hacking(trial_dir: Path, job_name: str, trial_name: str) -> list[str]: |
| """Scan agent/* files in a trial directory for reward hacking strings.""" |
| agent_dir = trial_dir / "agent" |
| if not agent_dir.is_dir(): |
| return [] |
|
|
| errors: list[str] = [] |
| for file_path in agent_dir.rglob("*"): |
| if not file_path.is_file(): |
| continue |
| try: |
| text = file_path.read_text(errors="ignore") |
| except Exception: |
| continue |
| matches = set(REWARD_HACKING_PATTERN.findall(text)) |
| if matches: |
| rel = file_path.relative_to(trial_dir) |
| errors.append( |
| f"`{job_name}/{trial_name}/{rel}`: " |
| f"Reward hacking detected: {', '.join(sorted(matches))}" |
| ) |
| return errors |
|
|
|
|
| def validate_submission(submission_dir: Path) -> ValidationResult: |
| """Validate a complete submission directory.""" |
| logger.info(f"Validating submission: {submission_dir.name}") |
| result = ValidationResult() |
|
|
| |
| metadata_path = find_metadata_file(submission_dir) |
| if not metadata_path: |
| logger.warning(f"Missing metadata.yaml in {submission_dir.name}") |
| result.errors.append(f"Missing `metadata.yaml` in `{submission_dir.name}/`") |
| return result |
|
|
| logger.debug(f"Found metadata at {metadata_path}") |
| try: |
| metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path)) |
| result.models = [ |
| f"{m.model_name} ({m.model_provider})" for m in metadata.models |
| ] |
| logger.info(f"Metadata valid, models: {result.models}") |
| except ValidationError as e: |
| logger.warning(f"Invalid metadata.yaml: {e}") |
| result.errors.append(f"Invalid `metadata.yaml`: {e}") |
| return result |
|
|
| |
| job_dirs = get_job_dirs(submission_dir) |
| if not job_dirs: |
| logger.warning(f"No job directories found in {submission_dir.name}") |
| result.errors.append(f"No job directories found in `{submission_dir.name}/`") |
| return result |
|
|
| result.job_count = len(job_dirs) |
| logger.info(f"Found {len(job_dirs)} job directories") |
|
|
| task_trial_counts: defaultdict[str, int] = defaultdict(int) |
|
|
| for job_dir in job_dirs: |
| logger.debug(f"Validating job directory: {job_dir.name}") |
|
|
| |
| try: |
| config_raw = load_json(job_dir / "config.json") |
| config_errors = validate_job_config(config_raw, job_dir.name) |
| if config_errors: |
| logger.warning(f"Job config errors in {job_dir.name}: {config_errors}") |
| for error in config_errors: |
| result.errors.append(f"`{job_dir.name}`: {error}") |
| except json.JSONDecodeError as e: |
| logger.warning(f"Invalid JSON in {job_dir.name}/config.json: {e}") |
| result.errors.append(f"`{job_dir.name}/config.json`: Invalid JSON: {e}") |
| continue |
|
|
| |
| trial_dirs = get_trial_dirs(job_dir) |
| if not trial_dirs: |
| logger.warning(f"No trial directories in {job_dir.name}") |
| result.errors.append(f"`{job_dir.name}`: No trial directories found") |
| continue |
|
|
| logger.debug(f"Found {len(trial_dirs)} trials in {job_dir.name}") |
|
|
| |
| canonical_job_id: str | None = None |
| job_result_path = job_dir / "result.json" |
| if job_result_path.exists(): |
| logger.debug(f"Found job result.json in {job_dir.name}") |
| try: |
| job_result = JobResult.model_validate(load_json(job_result_path)) |
| canonical_job_id = str(job_result.id) |
| logger.debug(f"Canonical job_id from result.json: {canonical_job_id}") |
| except (ValidationError, json.JSONDecodeError) as e: |
| logger.debug(f"Could not parse job result.json: {e}") |
|
|
| for trial_dir in trial_dirs: |
| logger.debug(f"Validating trial: {trial_dir.name}") |
| try: |
| trial_raw = load_json(trial_dir / "result.json") |
|
|
| |
| trial_errors = validate_trial_result( |
| trial_raw, job_dir.name, trial_dir.name |
| ) |
| if trial_errors: |
| logger.warning(f"Trial validation errors: {trial_errors}") |
| result.errors.extend(trial_errors) |
| continue |
|
|
| |
| trial_config = trial_raw.get("config") or {} |
| trial_job_id = ( |
| str(trial_config.get("job_id")) |
| if trial_config.get("job_id") |
| else None |
| ) |
| if canonical_job_id is None and trial_job_id: |
| canonical_job_id = trial_job_id |
| logger.debug(f"Set canonical job_id from trial: {canonical_job_id}") |
| elif ( |
| canonical_job_id |
| and trial_job_id |
| and trial_job_id != canonical_job_id |
| ): |
| logger.warning( |
| f"Job ID mismatch in {trial_dir.name}: " |
| f"{trial_job_id} != {canonical_job_id}" |
| ) |
| result.errors.append( |
| f"`{job_dir.name}/{trial_dir.name}/result.json`: " |
| f"`config.job_id` ({trial_job_id}) does not match job's id ({canonical_job_id})" |
| ) |
| continue |
|
|
| |
| rh_errors = check_reward_hacking( |
| trial_dir, job_dir.name, trial_dir.name |
| ) |
| if rh_errors: |
| result.errors.extend(rh_errors) |
|
|
| result.trial_count += 1 |
| task_trial_counts[trial_raw["task_checksum"]] += 1 |
|
|
| |
| verifier_result = trial_raw.get("verifier_result") or {} |
| rewards = verifier_result.get("rewards") or {} |
| reward = rewards.get("reward", 0) |
| if reward and float(reward) > 0: |
| result.successful_trials += 1 |
| logger.debug(f"Trial {trial_dir.name} successful (reward={reward})") |
| else: |
| logger.debug(f"Trial {trial_dir.name} failed (reward={reward})") |
|
|
| except json.JSONDecodeError as e: |
| logger.warning(f"Invalid JSON in {trial_dir.name}/result.json: {e}") |
| result.errors.append( |
| f"`{job_dir.name}/{trial_dir.name}/result.json`: Invalid JSON: {e}" |
| ) |
|
|
| |
| if len(task_trial_counts) < EXPECTED_TASK_COUNT: |
| result.errors.append( |
| f"Submission covers {len(task_trial_counts)} unique task(s), " |
| f"expected {EXPECTED_TASK_COUNT}" |
| ) |
|
|
| |
| for task_checksum, count in sorted(task_trial_counts.items()): |
| if count < MIN_TRIALS_PER_TASK: |
| result.errors.append( |
| f"Task `{task_checksum}` has only {count} trial(s), " |
| f"minimum {MIN_TRIALS_PER_TASK} required" |
| ) |
|
|
| logger.info( |
| f"Validation complete: {result.trial_count} trials, " |
| f"{result.successful_trials} successful, {len(result.errors)} errors" |
| ) |
| return result |
|
|
|
|
| def format_validation_comment(submissions: list[tuple[str, ValidationResult]]) -> str: |
| """Format validation results as a markdown comment.""" |
| lines = ["## Terminal-Bench Submission Validation", ""] |
| all_valid = all(r.is_valid for _, r in submissions) |
|
|
| if all_valid: |
| lines.append("**All submissions passed validation!**") |
| lines.append("") |
| for name, result in submissions: |
| lines.append(f"### `{name}`") |
| if result.models: |
| lines.append(f"- Models: {', '.join(result.models)}") |
| lines.append(f"- Jobs: {result.job_count}") |
| lines.append(f"- Trials: {result.trial_count}") |
| if result.accuracy is not None: |
| lines.append(f"- Accuracy: {result.accuracy:.1%}") |
| lines.append("") |
| lines.append("Ready to merge.") |
| else: |
| lines.append("**Validation failed**") |
| lines.append("") |
| for name, result in submissions: |
| if not result.is_valid: |
| lines.append(f"### `{name}`") |
| max_errors = 20 |
| for error in result.errors[:max_errors]: |
| lines.append(f"- {error}") |
| if len(result.errors) > max_errors: |
| lines.append( |
| f"- ... and {len(result.errors) - max_errors} more errors" |
| ) |
| lines.append("") |
| lines.append("Please fix the errors and push again.") |
|
|
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def get_supabase_client() -> AsyncClient: |
| """Create async Supabase client from environment variables.""" |
| url = os.environ.get("SUPABASE_URL") |
| key = os.environ.get("SUPABASE_SECRET_KEY") |
| if not url or not key: |
| raise ValueError("SUPABASE_URL and SUPABASE_SECRET_KEY must be set") |
| return await create_async_client(url, key) |
|
|
|
|
| async def job_exists(client: AsyncClient, job_id: str) -> bool: |
| """Check if a job already exists in the database.""" |
| result = await client.table("job").select("id").eq("id", job_id).execute() |
| return len(result.data) > 0 |
|
|
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) |
| async def upsert_with_retry(client: AsyncClient, table: str, data: dict | list[dict]): |
| """Upsert data with retry logic.""" |
| return await client.table(table).upsert(data).execute() |
|
|
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) |
| async def insert_ignore_conflicts( |
| client: AsyncClient, table: str, data: dict | list[dict] |
| ): |
| """Insert data, ignoring conflicts (existing rows).""" |
| return await ( |
| client.table(table) |
| .upsert(data, on_conflict="name,provider", ignore_duplicates=True) |
| .execute() |
| ) |
|
|
|
|
| async def import_submission( |
| submission_dir: Path, |
| metadata: SubmissionMetadata, |
| *, |
| client: AsyncClient | None = None, |
| dry_run: bool = False, |
| ) -> ImportStats: |
| """Import a validated submission to the database. |
| |
| If dry_run=True, parses all data and validates structure but skips |
| database writes and storage uploads. |
| """ |
| logger.info( |
| f"Starting import for submission: {submission_dir.name}" |
| + (" (DRY RUN)" if dry_run else "") |
| ) |
|
|
| if not dry_run: |
| db = client if client is not None else await get_supabase_client() |
|
|
| stats: ImportStats = {"jobs_imported": 0, "trials_imported": 0, "errors": []} |
|
|
| for job_dir in get_job_dirs(submission_dir): |
| try: |
| |
| config_raw = load_json(job_dir / "config.json") |
| job_name = config_raw.get("job_name") or job_dir.name |
| logger.info(f"Processing job: {job_name}") |
|
|
| |
| job_result: JobResult | None = None |
| job_result_path = job_dir / "result.json" |
| if job_result_path.exists(): |
| logger.debug(f"Found job result.json for {job_name}") |
| try: |
| job_result = JobResult.model_validate(load_json(job_result_path)) |
| logger.debug("Parsed job result successfully") |
| except ValidationError as e: |
| logger.debug(f"Could not parse job result.json: {e}") |
| else: |
| logger.debug(f"No job result.json for {job_name}") |
|
|
| trial_dirs = get_trial_dirs(job_dir) |
| if not trial_dirs: |
| logger.warning(f"No trials in {job_dir.name}, skipping") |
| stats["errors"].append(f"No trials in {job_dir.name}") |
| continue |
|
|
| logger.debug(f"Found {len(trial_dirs)} trials") |
|
|
| |
| job_id: str | None = None |
| if job_result: |
| job_id = str(job_result.id) |
| logger.debug(f"Got job_id from job result: {job_id}") |
| else: |
| logger.debug("Getting job_id from directory") |
| job_id = get_job_id_from_dir(job_dir, trial_dirs) |
|
|
| if not job_id: |
| logger.warning(f"Could not find job_id in {job_dir.name}, skipping") |
| stats["errors"].append(f"Could not find job_id in {job_dir.name}") |
| continue |
|
|
| |
| if not dry_run and await job_exists(db, job_id): |
| logger.info(f"Job {job_id} already exists in database, skipping") |
| continue |
|
|
| logger.info( |
| f"{'[DRY RUN] Would import' if dry_run else 'Importing new'} job: {job_id}" |
| ) |
|
|
| |
| job_insert = JobInsert( |
| id=UUID(job_id), |
| config=config_raw, |
| job_name=job_name, |
| n_trials=len(trial_dirs), |
| username="hf-submission", |
| package_version=harbor.__version__, |
| include_on_leaderboard=True, |
| started_at=job_result.started_at if job_result else None, |
| ended_at=job_result.finished_at if job_result else None, |
| stats=job_result.stats.model_dump(mode="json") |
| if job_result and job_result.stats |
| else None, |
| ) |
| if not dry_run: |
| await upsert_with_retry( |
| db, |
| "job", |
| job_insert.model_dump(mode="json", exclude_none=True), |
| ) |
| logger.info( |
| f"{'[DRY RUN] Parsed' if dry_run else 'Inserted'} job: {job_id}" |
| ) |
| stats["jobs_imported"] += 1 |
|
|
| |
| |
| |
| |
| agent_inserts: dict[tuple[str, str], dict] = {} |
| model_inserts: dict[tuple[str, str], dict] = {} |
| trial_inserts: list[dict] = [] |
| trial_model_inserts: list[dict] = [] |
|
|
| |
| parsed_trials: list[tuple[Path, dict]] = [] |
| for trial_dir in trial_dirs: |
| try: |
| trial_raw = load_json(trial_dir / "result.json") |
|
|
| |
| agent_info = trial_raw.get("agent_info") or {} |
| agent_name = agent_info.get("name", "unknown") |
| agent_version = agent_info.get("version", "unknown") |
|
|
| |
| agent_key = (agent_name, agent_version) |
| if agent_key not in agent_inserts: |
| agent_inserts[agent_key] = AgentInsert( |
| name=agent_name, |
| version=agent_version, |
| display_name=metadata.agent_display_name, |
| url=metadata.agent_url, |
| org_display_name=metadata.agent_org_display_name, |
| ).model_dump(mode="json", exclude_none=True) |
|
|
| |
| for model in metadata.models: |
| model_key = (model.model_name, model.model_provider) |
| if model_key not in model_inserts: |
| |
| key = f"{model.model_provider}/{model.model_name}" |
| token_costs = model_cost.get(key) or model_cost.get( |
| model.model_name |
| ) |
| input_cost = ( |
| token_costs.get("input_cost_per_token") |
| if token_costs |
| else None |
| ) |
| output_cost = ( |
| token_costs.get("output_cost_per_token") |
| if token_costs |
| else None |
| ) |
|
|
| model_inserts[model_key] = ModelInsert( |
| name=model.model_name, |
| provider=model.model_provider, |
| display_name=model.model_display_name, |
| org_display_name=model.model_org_display_name, |
| cents_per_million_input_tokens=round(input_cost * 1e8) |
| if input_cost |
| else None, |
| cents_per_million_output_tokens=round(output_cost * 1e8) |
| if output_cost |
| else None, |
| ).model_dump(mode="json", exclude_none=True) |
|
|
| parsed_trials.append((trial_dir, trial_raw)) |
|
|
| except Exception as e: |
| logger.error(f"Error parsing trial {trial_dir.name}: {e}") |
| stats["errors"].append(f"Trial {trial_dir.name}: {e}") |
|
|
| |
| storage_urls: dict[str, str | None] = {} |
| if parsed_trials and not dry_run: |
| logger.info( |
| f"Uploading {len(parsed_trials)} trials to storage " |
| f"with {MAX_UPLOAD_WORKERS} workers" |
| ) |
| sem = asyncio.Semaphore(MAX_UPLOAD_WORKERS) |
|
|
| async def _upload(td: Path, tr: dict) -> None: |
| tid = str(tr["id"]) |
| async with sem: |
| try: |
| url = await upload_trial_to_storage(db, td, tid) |
| if not url: |
| logger.warning( |
| f"Storage upload failed for trial {tid}, " |
| "trial_uri will be null" |
| ) |
| storage_urls[tid] = url |
| except Exception as e: |
| logger.error(f"Storage upload error for trial {tid}: {e}") |
| storage_urls[tid] = None |
|
|
| async with asyncio.TaskGroup() as tg: |
| for td, tr in parsed_trials: |
| tg.create_task(_upload(td, tr)) |
| elif parsed_trials and dry_run: |
| logger.info( |
| f"[DRY RUN] Would upload {len(parsed_trials)} trials to storage" |
| ) |
|
|
| |
| for trial_dir, trial_raw in parsed_trials: |
| try: |
| agent_info = trial_raw.get("agent_info") or {} |
| agent_name = agent_info.get("name", "unknown") |
| agent_version = agent_info.get("version", "unknown") |
|
|
| |
| reward: Decimal | None = None |
| verifier_result = trial_raw.get("verifier_result") or {} |
| rewards = verifier_result.get("rewards") or {} |
| reward_val = rewards.get("reward") |
| if reward_val is not None: |
| reward = Decimal(str(reward_val)) |
|
|
| |
| agent_execution = trial_raw.get("agent_execution") or {} |
| agent_setup = trial_raw.get("agent_setup") or {} |
| environment_setup = trial_raw.get("environment_setup") or {} |
| verifier_timing = trial_raw.get("verifier") or {} |
| agent_result = trial_raw.get("agent_result") or {} |
|
|
| trial_id = str(trial_raw["id"]) |
| storage_url = storage_urls.get(trial_id) |
|
|
| |
| trial_inserts.append( |
| TrialInsert( |
| id=trial_raw["id"], |
| agent_name=agent_name, |
| agent_version=agent_version, |
| config=trial_raw.get("config") or {}, |
| task_checksum=trial_raw.get("task_checksum", ""), |
| trial_name=trial_raw.get("trial_name", ""), |
| trial_uri=storage_url, |
| |
| agent_execution_started_at=agent_execution.get( |
| "started_at" |
| ), |
| agent_execution_ended_at=agent_execution.get("finished_at"), |
| agent_setup_started_at=agent_setup.get("started_at"), |
| agent_setup_ended_at=agent_setup.get("finished_at"), |
| environment_setup_started_at=environment_setup.get( |
| "started_at" |
| ), |
| environment_setup_ended_at=environment_setup.get( |
| "finished_at" |
| ), |
| verifier_started_at=verifier_timing.get("started_at"), |
| verifier_ended_at=verifier_timing.get("finished_at"), |
| |
| exception_info=trial_raw.get("exception_info"), |
| job_id=UUID(job_id), |
| reward=reward, |
| started_at=trial_raw.get("started_at"), |
| ended_at=trial_raw.get("finished_at"), |
| agent_metadata=agent_result.get("metadata"), |
| ).model_dump(mode="json", exclude_none=True) |
| ) |
|
|
| |
| for model in metadata.models: |
| trial_model_inserts.append( |
| TrialModelInsert( |
| trial_id=trial_raw["id"], |
| model_name=model.model_name, |
| model_provider=model.model_provider, |
| n_input_tokens=agent_result.get("n_input_tokens"), |
| n_output_tokens=agent_result.get("n_output_tokens"), |
| n_cache_tokens=agent_result.get("n_cache_tokens"), |
| ).model_dump(mode="json", by_alias=True, exclude_none=True) |
| ) |
|
|
| stats["trials_imported"] += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error processing trial {trial_dir.name}: {e}") |
| stats["errors"].append(f"Trial {trial_dir.name}: {e}") |
|
|
| |
| if dry_run: |
| logger.info( |
| f"[DRY RUN] Would insert: {len(agent_inserts)} agents, " |
| f"{len(model_inserts)} models, {len(trial_inserts)} trials, " |
| f"{len(trial_model_inserts)} trial_models" |
| ) |
| else: |
| if agent_inserts: |
| logger.debug(f"Upserting {len(agent_inserts)} agents") |
| await upsert_with_retry(db, "agent", list(agent_inserts.values())) |
|
|
| if model_inserts: |
| logger.debug(f"Inserting {len(model_inserts)} models") |
| await insert_ignore_conflicts( |
| db, "model", list(model_inserts.values()) |
| ) |
|
|
| if trial_inserts: |
| logger.debug(f"Upserting {len(trial_inserts)} trials") |
| await upsert_with_retry(db, "trial", trial_inserts) |
|
|
| if trial_model_inserts: |
| logger.debug(f"Upserting {len(trial_model_inserts)} trial_models") |
| await upsert_with_retry(db, "trial_model", trial_model_inserts) |
|
|
| logger.info( |
| f"Job {job_dir.name} import complete: " |
| f"{len(trial_inserts)} trials, {len(agent_inserts)} agents, {len(model_inserts)} models" |
| ) |
|
|
| except ValidationError as e: |
| logger.exception(f"Error importing job {job_dir.name}") |
| stats["errors"].append(f"Job {job_dir.name}: {e}") |
| except Exception as e: |
| logger.exception(f"Error importing job {job_dir.name}") |
| stats["errors"].append(f"Job {job_dir.name}: {e}") |
|
|
| logger.info(f"Submission import complete: {stats}") |
| return stats |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def _list_submission_folders( |
| api: HfApi, revision: str | None = None |
| ) -> list[RepoFolder]: |
| """List submission folders (non-recursive, fast).""" |
| items = await asyncio.to_thread( |
| lambda: list( |
| api.list_repo_tree( |
| repo_id=DATASET_REPO, |
| path_in_repo=SUBMISSIONS_PATH, |
| repo_type="dataset", |
| revision=revision, |
| ) |
| ) |
| ) |
| return [item for item in items if isinstance(item, RepoFolder)] |
|
|
|
|
| async def _list_job_folders( |
| api: HfApi, submission_path: str, revision: str | None = None |
| ) -> list[RepoFolder]: |
| """List job folders inside a submission (non-recursive, fast).""" |
| items = await asyncio.to_thread( |
| lambda: list( |
| api.list_repo_tree( |
| repo_id=DATASET_REPO, |
| path_in_repo=submission_path, |
| repo_type="dataset", |
| revision=revision, |
| ) |
| ) |
| ) |
| return [item for item in items if isinstance(item, RepoFolder)] |
|
|
|
|
| def _download_file( |
| file_path: str, |
| local_dir: str | Path, |
| revision: str | None = None, |
| ) -> Path: |
| """Download a single file from the dataset repo.""" |
| return Path( |
| hf_hub_download( |
| repo_id=DATASET_REPO, |
| filename=file_path, |
| repo_type="dataset", |
| revision=revision, |
| local_dir=str(local_dir), |
| ) |
| ) |
|
|
|
|
| async def _download_submission_files( |
| api: HfApi, |
| submission_name: str, |
| local_dir: str | Path, |
| revision: str | None = None, |
| ) -> None: |
| """Download essential files for a submission: metadata, configs, and results. |
| |
| Uses level-by-level tree listing + parallel file downloads to avoid |
| the recursive tree listing that causes timeouts on large repos. |
| """ |
| sub_path = f"{SUBMISSIONS_PATH}/{submission_name}" |
|
|
| |
| for ext in ("yaml", "yml"): |
| try: |
| await asyncio.to_thread( |
| _download_file, f"{sub_path}/metadata.{ext}", local_dir, revision |
| ) |
| break |
| except Exception: |
| pass |
|
|
| |
| files_to_download: list[str] = [] |
|
|
| for job_folder in await _list_job_folders(api, sub_path, revision): |
| job_path = job_folder.path |
| files_to_download.append(f"{job_path}/config.json") |
| files_to_download.append(f"{job_path}/result.json") |
|
|
| |
| trial_items = await asyncio.to_thread( |
| lambda jp=job_path: list( |
| api.list_repo_tree( |
| repo_id=DATASET_REPO, |
| path_in_repo=jp, |
| repo_type="dataset", |
| revision=revision, |
| ) |
| ) |
| ) |
| for trial_item in trial_items: |
| if isinstance(trial_item, RepoFolder): |
| files_to_download.append(f"{trial_item.path}/result.json") |
| |
| try: |
| agent_items = await asyncio.to_thread( |
| lambda tp=trial_item.path: list( |
| api.list_repo_tree( |
| repo_id=DATASET_REPO, |
| path_in_repo=f"{tp}/agent", |
| repo_type="dataset", |
| revision=revision, |
| recursive=True, |
| ) |
| ) |
| ) |
| for agent_item in agent_items: |
| if isinstance(agent_item, RepoFile): |
| files_to_download.append(agent_item.path) |
| except Exception: |
| pass |
|
|
| logger.info(f"Downloading {len(files_to_download)} files for {submission_name}") |
|
|
| sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) |
|
|
| async def _safe_download(file_path: str) -> None: |
| async with sem: |
| try: |
| await asyncio.to_thread(_download_file, file_path, local_dir, revision) |
| except Exception as e: |
| logger.debug(f"Could not download {file_path}: {e}") |
|
|
| async with asyncio.TaskGroup() as tg: |
| for f in files_to_download: |
| tg.create_task(_safe_download(f)) |
|
|
|
|
| async def get_changed_submission_names(api: HfApi, pr_revision: str) -> list[str]: |
| """Detect which submission folders changed in a PR vs main. |
| |
| Compares submission folder tree_ids (content hashes of the whole subtree) |
| between the PR and main. A different tree_id means something changed inside. |
| """ |
| logger.info(f"Comparing submission folders: PR ({pr_revision}) vs main") |
|
|
| pr_folders = { |
| f.path.split("/")[-1]: f.tree_id |
| for f in await _list_submission_folders(api, pr_revision) |
| } |
| main_folders = { |
| f.path.split("/")[-1]: f.tree_id |
| for f in await _list_submission_folders(api, "main") |
| } |
|
|
| changed = [ |
| name |
| for name, tree_id in pr_folders.items() |
| if main_folders.get(name) != tree_id |
| ] |
|
|
| logger.info(f"Changed submission folders: {sorted(changed)}") |
| return sorted(changed) |
|
|
|
|
| def get_new_job_ids_by_submission( |
| probe_dir: Path, |
| ) -> dict[str, list[str]]: |
| """From a probe download of result.json files, extract job IDs grouped by submission. |
| |
| Returns a dict mapping submission folder name -> list of job IDs found. |
| """ |
| submissions_dir = probe_dir / SUBMISSIONS_PATH |
| if not submissions_dir.exists(): |
| return {} |
|
|
| result: dict[str, list[str]] = {} |
| for sub_dir in submissions_dir.iterdir(): |
| if not sub_dir.is_dir(): |
| continue |
| job_ids: list[str] = [] |
| for job_dir in sub_dir.iterdir(): |
| if not job_dir.is_dir(): |
| continue |
| result_path = job_dir / "result.json" |
| if not result_path.exists(): |
| continue |
| try: |
| data = load_json(result_path) |
| job_id = data.get("id") |
| if job_id: |
| job_ids.append(str(job_id)) |
| except Exception as e: |
| logger.debug(f"Failed to parse {result_path}: {e}") |
| if job_ids: |
| result[sub_dir.name] = job_ids |
|
|
| return result |
|
|
|
|
| async def batch_check_existing_jobs( |
| client: AsyncClient, job_ids: list[str] |
| ) -> set[str]: |
| """Check which job IDs already exist in the database. Returns set of existing IDs.""" |
| if not job_ids: |
| return set() |
| |
| existing: set[str] = set() |
| chunk_size = 100 |
| for i in range(0, len(job_ids), chunk_size): |
| chunk = job_ids[i : i + chunk_size] |
| result = await client.table("job").select("id").in_("id", chunk).execute() |
| for row in cast(list[dict[str, Any]], result.data): |
| existing.add(row["id"]) |
| return existing |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def handle_pr(pr_num: int, api: HfApi) -> dict: |
| """Handle PR validation.""" |
| logger.info(f"Processing PR #{pr_num}") |
|
|
| pr_revision = f"refs/pr/{pr_num}" |
|
|
| |
| try: |
| changed_names = await get_changed_submission_names(api, pr_revision) |
| except Exception as e: |
| logger.error(f"Failed to detect changed submissions: {e}") |
| return {"status": "error", "message": "Failed to determine changed files"} |
|
|
| if not changed_names: |
| comment = "No new or changed submissions found in this PR." |
| else: |
| |
| logger.info( |
| f"Downloading {len(changed_names)} changed submissions: {changed_names}" |
| ) |
| with tempfile.TemporaryDirectory() as tmpdir: |
| local_dir = os.path.join(tmpdir, "repo") |
| for name in changed_names: |
| await _download_submission_files( |
| api, name, local_dir, revision=pr_revision |
| ) |
|
|
| submissions_dir = Path(local_dir) / SUBMISSIONS_PATH |
|
|
| subs_to_validate: list[Path] = [] |
| missing_metadata: list[str] = [] |
|
|
| for name in changed_names: |
| sub_dir = submissions_dir / name |
| if not sub_dir.exists() or not sub_dir.is_dir(): |
| continue |
| if has_metadata_file(sub_dir): |
| subs_to_validate.append(sub_dir) |
| else: |
| missing_metadata.append(name) |
|
|
| logger.info(f"Will validate {len(subs_to_validate)} submissions") |
| if missing_metadata: |
| logger.warning(f"Submissions missing metadata: {missing_metadata}") |
|
|
| if missing_metadata: |
| comment = ( |
| "## Terminal-Bench Submission Validation\n\n" |
| "**Validation failed**\n\n" |
| "The following submission directories are missing `metadata.yaml`:\n" |
| + "\n".join(f"- `{name}/`" for name in missing_metadata) |
| + "\n\nPlease add a `metadata.yaml` file to each submission " |
| "directory, as described in the README." |
| ) |
| elif not subs_to_validate: |
| comment = "No new or changed submissions found in this PR." |
| else: |
| logger.info(f"Validating {len(subs_to_validate)} submissions") |
| results = [(d.name, validate_submission(d)) for d in subs_to_validate] |
| all_valid = all(r.is_valid for _, r in results) |
| logger.info(f"Validation complete, all_valid={all_valid}") |
| comment = format_validation_comment(results) |
|
|
| logger.info(f"Posting comment to PR #{pr_num}") |
| await asyncio.to_thread( |
| api.comment_discussion, |
| repo_id=DATASET_REPO, |
| repo_type="dataset", |
| discussion_num=pr_num, |
| comment=comment, |
| ) |
|
|
| logger.info(f"PR #{pr_num} handling complete") |
| return {"status": "success", "action": "validated"} |
|
|
|
|
| async def handle_merge(*, dry_run: bool = False) -> dict: |
| """Handle merge to main - import only new submissions. |
| |
| If dry_run=True, downloads and parses all data but skips database writes |
| and storage uploads. Useful for testing the full pipeline. |
| """ |
| logger.info(f"Processing merge to main{' (DRY RUN)' if dry_run else ''}") |
|
|
| api = HfApi(token=os.environ.get("HF_TOKEN")) |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| probe_dir = os.path.join(tmpdir, "probe") |
|
|
| |
| |
| logger.info("Phase 1: Probing for job IDs via result.json files") |
| sub_folders = await _list_submission_folders(api) |
| logger.info(f"Found {len(sub_folders)} submission folders") |
|
|
| |
| probe_files: list[str] = [] |
| for sub_folder in sub_folders: |
| for job_folder in await _list_job_folders(api, sub_folder.path): |
| probe_files.append(f"{job_folder.path}/result.json") |
|
|
| logger.info(f"Downloading {len(probe_files)} job result.json files") |
|
|
| sem = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) |
|
|
| async def _probe_download(file_path: str) -> None: |
| async with sem: |
| try: |
| await asyncio.to_thread(_download_file, file_path, probe_dir, None) |
| except Exception as e: |
| logger.debug(f"No result.json at {file_path}: {e}") |
|
|
| async with asyncio.TaskGroup() as tg: |
| for f in probe_files: |
| tg.create_task(_probe_download(f)) |
|
|
| jobs_by_submission = get_new_job_ids_by_submission(Path(probe_dir)) |
| all_job_ids = [jid for ids in jobs_by_submission.values() for jid in ids] |
| logger.info( |
| f"Found {len(all_job_ids)} jobs across " |
| f"{len(jobs_by_submission)} submissions" |
| ) |
|
|
| if not all_job_ids: |
| return {"status": "success", "message": "no jobs found"} |
|
|
| |
| client: AsyncClient | None = None if dry_run else await get_supabase_client() |
|
|
| if dry_run: |
| |
| new_submission_names = sorted(jobs_by_submission.keys()) |
| logger.info( |
| f"[DRY RUN] Skipping DB check, treating all " |
| f"{len(new_submission_names)} submissions as new" |
| ) |
| else: |
| |
| assert client is not None |
| existing_ids = await batch_check_existing_jobs(client, all_job_ids) |
| logger.info(f"{len(existing_ids)} of {len(all_job_ids)} jobs already in DB") |
|
|
| |
| new_submission_names = [] |
| for name, job_ids in jobs_by_submission.items(): |
| new_ids = [jid for jid in job_ids if jid not in existing_ids] |
| if new_ids: |
| logger.info( |
| f"Submission {name}: {len(new_ids)} new jobs " |
| f"(of {len(job_ids)} total)" |
| ) |
| new_submission_names.append(name) |
| else: |
| logger.debug( |
| f"Submission {name}: all {len(job_ids)} jobs exist, skipping" |
| ) |
|
|
| if not new_submission_names: |
| logger.info("All jobs already imported, nothing to do") |
| return {"status": "success", "message": "all jobs already imported"} |
|
|
| |
| local_dir = os.path.join(tmpdir, "repo") |
| logger.info( |
| f"Phase 2: Downloading {len(new_submission_names)} submissions " |
| f"with new jobs: {new_submission_names}" |
| ) |
| for name in new_submission_names: |
| await _download_submission_files(api, name, local_dir) |
|
|
| submissions_dir = Path(local_dir) / SUBMISSIONS_PATH |
|
|
| total: TotalStats = {"jobs": 0, "trials": 0, "errors": []} |
|
|
| for name in new_submission_names: |
| sub_dir = submissions_dir / name |
| if not sub_dir.exists() or not sub_dir.is_dir(): |
| logger.warning(f"Expected folder not found after download: {name}") |
| total["errors"].append(f"{name}: folder not found after download") |
| continue |
|
|
| metadata_path = find_metadata_file(sub_dir) |
| if not metadata_path: |
| logger.debug(f"Skipping {name}: no metadata file") |
| continue |
|
|
| try: |
| logger.info(f"Importing submission: {name}") |
| metadata = SubmissionMetadata.model_validate(load_yaml(metadata_path)) |
| stats = await import_submission( |
| sub_dir, metadata, client=client, dry_run=dry_run |
| ) |
|
|
| total["jobs"] += stats["jobs_imported"] |
| total["trials"] += stats["trials_imported"] |
| total["errors"].extend(stats["errors"]) |
| logger.info( |
| f"Submission {name} imported: " |
| f"{stats['jobs_imported']} jobs, {stats['trials_imported']} trials" |
| ) |
| except Exception as e: |
| logger.exception(f"Error importing {name}") |
| total["errors"].append(f"{name}: {e}") |
|
|
| logger.info( |
| f"Merge handling complete: {total['jobs']} jobs, " |
| f"{total['trials']} trials, {len(total['errors'])} errors" |
| ) |
| return {"status": "success", "action": "imported", "stats": total} |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks() as ui: |
| gr.Markdown(""" |
| # Terminal-Bench Leaderboard Importer |
| |
| This Space automatically processes submissions to the Terminal-Bench leaderboard. |
| |
| ## How it works |
| |
| 1. **On Pull Request**: Validates the submission and posts a comment with results |
| 2. **On Merge**: Imports validated submissions to the leaderboard database |
| |
| ## Submission Format |
| |
| Submissions should be placed in `submissions/terminal-bench/2.0/<agent>__<model>/`: |
| |
| ``` |
| submissions/terminal-bench/2.0/my-agent__gpt-5/ |
| metadata.yaml |
| job-folder/ |
| config.json |
| trial-1/result.json |
| trial-2/result.json |
| ... |
| ``` |
| |
| See the [leaderboard repo](https://huggingface.co/datasets/alexgshaw/terminal-bench-2-leaderboard) |
| for detailed submission instructions. |
| """) |
|
|
|
|
| _background_tasks: set[asyncio.Task] = set() |
|
|
|
|
| def create_app() -> WebhooksServer: |
| """Create and configure the webhook server.""" |
| app = WebhooksServer(ui=ui, webhook_secret=os.environ.get("WEBHOOK_SECRET")) |
|
|
| @app.add_webhook() |
| async def handle_submission(payload: WebhookPayload) -> dict: |
| """Handle webhook events from the leaderboard repo.""" |
| logger.info( |
| f"Received webhook: action={payload.event.action}, scope={payload.event.scope}" |
| ) |
| logger.debug( |
| f"Webhook payload repo: type={payload.repo.type}, name={payload.repo.name}" |
| ) |
|
|
| |
| if payload.repo.type != "dataset" or payload.repo.name != DATASET_REPO: |
| logger.info( |
| f"Ignoring webhook: wrong repo " |
| f"(type={payload.repo.type}, name={payload.repo.name})" |
| ) |
| return {"status": "ignored", "reason": "wrong repo"} |
|
|
| logger.debug("Webhook is for our dataset repo") |
|
|
| |
| is_pr = payload.discussion is not None and payload.discussion.isPullRequest |
| pr_num = payload.discussion.num if payload.discussion else None |
| logger.debug(f"Initial PR detection: is_pr={is_pr}, pr_num={pr_num}") |
|
|
| |
| if not is_pr and payload.event.scope == "repo.content": |
| logger.debug("Checking updatedRefs for PR refs") |
| for ref_info in getattr(payload, "updatedRefs", None) or []: |
| ref = ( |
| ref_info.get("ref", "") |
| if isinstance(ref_info, dict) |
| else getattr(ref_info, "ref", "") |
| ) |
| logger.debug(f"Checking ref: {ref}") |
| match = re.match(r"refs/pr/(\d+)", ref) |
| if match: |
| is_pr = True |
| pr_num = int(match.group(1)) |
| logger.debug(f"Found PR ref in updatedRefs: PR #{pr_num}") |
| break |
|
|
| is_merge = ( |
| payload.event.action == "update" |
| and payload.event.scope == "repo.content" |
| and not is_pr |
| ) |
|
|
| logger.info( |
| f"Event classification: is_pr={is_pr}, is_merge={is_merge}, pr_num={pr_num}" |
| ) |
|
|
| def _run_in_background(coro): |
| """Fire-and-forget: run coroutine as a background task on the event loop.""" |
| task = asyncio.create_task(coro) |
| _background_tasks.add(task) |
|
|
| def _on_done(t): |
| _background_tasks.discard(t) |
| if t.exception(): |
| logger.exception( |
| "Error in background handler", exc_info=t.exception() |
| ) |
|
|
| task.add_done_callback(_on_done) |
|
|
| if is_pr and pr_num is not None: |
| logger.info(f"Handling as PR event: PR #{pr_num}") |
| api = HfApi(token=os.environ.get("HF_TOKEN")) |
| _run_in_background(handle_pr(pr_num, api)) |
| return {"status": "processing", "action": "validating"} |
| elif is_merge: |
| logger.info("Handling as merge event") |
| _run_in_background(handle_merge()) |
| return {"status": "processing", "action": "importing"} |
| else: |
| logger.info( |
| f"Ignoring webhook: unhandled event type " |
| f"(action={payload.event.action}, scope={payload.event.scope})" |
| ) |
| return {"status": "ignored", "reason": "unhandled event type"} |
|
|
| return app |
|
|
|
|
| if __name__ == "__main__": |
| app = create_app() |
| app.launch(server_name="0.0.0.0", server_port=7860) |
|
|