""" Unit tests for the Terminal-Bench Leaderboard Importer. Run with: pytest test_app.py -v """ import json import tempfile from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch from uuid import uuid4 import pytest import yaml from huggingface_hub import RepoFolder from app import ( EXPECTED_TASK_COUNT, MIN_TRIALS_PER_TASK, ModelMetadata, SubmissionMetadata, ValidationResult, check_reward_hacking, find_metadata_file, format_validation_comment, batch_check_existing_jobs, get_changed_submission_names, get_new_job_ids_by_submission, get_job_dirs, get_job_id_from_dir, get_trial_dirs, has_metadata_file, import_submission, job_exists, load_json, load_yaml, validate_job_config, validate_submission, validate_trial_result, ) # ============================================================================ # Fixtures # ============================================================================ @pytest.fixture def temp_dir(): """Create a temporary directory for test files.""" with tempfile.TemporaryDirectory() as tmpdir: yield Path(tmpdir) @pytest.fixture def valid_metadata(): """Create valid metadata content.""" return { "agent_url": "https://example.com/agent", "models": [ {"model_name": "gpt-4", "model_provider": "openai"}, ], } @pytest.fixture def valid_job_config(): """Create a valid job config dict that matches Harbor's JobConfig schema.""" return { "job_name": "test-job", "timeout_multiplier": 1.0, "agents": [{"name": "test-agent"}], "verifier": {}, "environment": {}, } @pytest.fixture def valid_trial_result(): """Create a valid trial result dict that matches Harbor's TrialResult schema.""" job_id = str(uuid4()) return { "id": str(uuid4()), "task_name": "test-task", "trial_name": "test-trial", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": "abc123", "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "test-agent", "version": "1.0.0"}, "verifier_result": {"rewards": {"reward": 1.0}}, "agent_execution": { "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:01:00Z", }, "agent_setup": { "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:00:30Z", }, "environment_setup": { "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:00:15Z", }, "verifier": { "started_at": "2024-01-01T00:01:00Z", "finished_at": "2024-01-01T00:01:30Z", }, "agent_result": {"n_input_tokens": 1000, "n_output_tokens": 500}, "_job_id": job_id, # Store for later reference } @pytest.fixture def valid_job_result(valid_trial_result): """Create a valid job result dict that matches Harbor's JobResult schema.""" return { "id": valid_trial_result["config"]["job_id"], "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T01:00:00Z", "n_total_trials": 1, "stats": { "n_succeeded": 1, "n_failed": 0, "n_trials_with_exception": 0, }, } def make_full_trial_set(job_id: str, reward: float = 1.0) -> list[dict]: """Generate a full set of valid trials: EXPECTED_TASK_COUNT tasks * MIN_TRIALS_PER_TASK trials each.""" trials = [] for i in range(EXPECTED_TASK_COUNT): for _ in range(MIN_TRIALS_PER_TASK): trials.append( { "id": str(uuid4()), "task_name": f"task-{i}", "trial_name": "trial", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": f"checksum_{i:03d}", "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "test-agent", "version": "1.0.0"}, "verifier_result": {"rewards": {"reward": reward}}, } ) return trials def create_submission( temp_dir: Path, metadata: dict | None = None, job_config: dict | None = None, trial_results: list[dict] | None = None, job_result: dict | None = None, metadata_ext: str = "yaml", ) -> Path: """Helper to create a complete submission directory structure.""" submission_dir = temp_dir / "test-submission" submission_dir.mkdir(parents=True) # Create metadata file if metadata is not None: metadata_path = submission_dir / f"metadata.{metadata_ext}" with open(metadata_path, "w") as f: yaml.dump(metadata, f) # Create job directory if job_config is not None or trial_results is not None: job_dir = submission_dir / "2024-01-01__00-00-00" job_dir.mkdir() if job_config is not None: with open(job_dir / "config.json", "w") as f: json.dump(job_config, f) if job_result is not None: with open(job_dir / "result.json", "w") as f: json.dump(job_result, f) if trial_results is not None: for i, trial_result in enumerate(trial_results): trial_dir = job_dir / f"trial-{i}" trial_dir.mkdir() with open(trial_dir / "result.json", "w") as f: json.dump(trial_result, f) return submission_dir # ============================================================================ # Tests: Helper Functions # ============================================================================ class TestFindMetadataFile: """Tests for find_metadata_file function.""" def test_yaml_extension(self, temp_dir): (temp_dir / "metadata.yaml").touch() assert find_metadata_file(temp_dir) == temp_dir / "metadata.yaml" def test_yml_extension(self, temp_dir): (temp_dir / "metadata.yml").touch() assert find_metadata_file(temp_dir) == temp_dir / "metadata.yml" def test_prefers_yaml_over_yml(self, temp_dir): """When both exist, .yaml should be preferred.""" (temp_dir / "metadata.yaml").touch() (temp_dir / "metadata.yml").touch() assert find_metadata_file(temp_dir) == temp_dir / "metadata.yaml" def test_no_metadata_file(self, temp_dir): assert find_metadata_file(temp_dir) is None def test_wrong_filename(self, temp_dir): (temp_dir / "meta.yaml").touch() assert find_metadata_file(temp_dir) is None class TestHasMetadataFile: """Tests for has_metadata_file function.""" def test_returns_true_when_exists(self, temp_dir): (temp_dir / "metadata.yaml").touch() assert has_metadata_file(temp_dir) is True def test_returns_false_when_missing(self, temp_dir): assert has_metadata_file(temp_dir) is False class TestGetJobDirs: """Tests for get_job_dirs function.""" def test_finds_dirs_with_config_json(self, temp_dir): job1 = temp_dir / "job1" job1.mkdir() (job1 / "config.json").touch() job2 = temp_dir / "job2" job2.mkdir() (job2 / "config.json").touch() result = get_job_dirs(temp_dir) assert len(result) == 2 assert set(d.name for d in result) == {"job1", "job2"} def test_ignores_dirs_without_config_json(self, temp_dir): job1 = temp_dir / "job1" job1.mkdir() (job1 / "config.json").touch() job2 = temp_dir / "job2" job2.mkdir() # No config.json result = get_job_dirs(temp_dir) assert len(result) == 1 assert result[0].name == "job1" def test_ignores_files(self, temp_dir): (temp_dir / "config.json").touch() # File, not dir assert get_job_dirs(temp_dir) == [] def test_empty_directory(self, temp_dir): assert get_job_dirs(temp_dir) == [] class TestGetTrialDirs: """Tests for get_trial_dirs function.""" def test_finds_dirs_with_result_json(self, temp_dir): trial1 = temp_dir / "trial1" trial1.mkdir() (trial1 / "result.json").touch() result = get_trial_dirs(temp_dir) assert len(result) == 1 assert result[0].name == "trial1" def test_ignores_dirs_without_result_json(self, temp_dir): trial1 = temp_dir / "trial1" trial1.mkdir() (trial1 / "result.json").touch() trial2 = temp_dir / "trial2" trial2.mkdir() # No result.json result = get_trial_dirs(temp_dir) assert len(result) == 1 class TestLoadJson: """Tests for load_json function.""" def test_loads_valid_json(self, temp_dir): path = temp_dir / "test.json" with open(path, "w") as f: json.dump({"key": "value"}, f) assert load_json(path) == {"key": "value"} def test_raises_on_invalid_json(self, temp_dir): path = temp_dir / "test.json" with open(path, "w") as f: f.write("not valid json") with pytest.raises(json.JSONDecodeError): load_json(path) def test_raises_on_missing_file(self, temp_dir): with pytest.raises(FileNotFoundError): load_json(temp_dir / "missing.json") class TestLoadYaml: """Tests for load_yaml function.""" def test_loads_yaml_file(self, temp_dir): path = temp_dir / "test.yaml" with open(path, "w") as f: yaml.dump({"key": "value"}, f) assert load_yaml(path) == {"key": "value"} class TestGetJobIdFromDir: """Tests for get_job_id_from_dir function.""" def test_gets_id_from_job_result(self, temp_dir): job_dir = temp_dir / "job" job_dir.mkdir() with open(job_dir / "result.json", "w") as f: json.dump({"id": "job-uuid-123"}, f) trial_dir = job_dir / "trial1" trial_dir.mkdir() with open(trial_dir / "result.json", "w") as f: json.dump({"config": {"job_id": "different-uuid"}}, f) result = get_job_id_from_dir(job_dir, [trial_dir]) assert result == "job-uuid-123" def test_falls_back_to_trial_config(self, temp_dir): job_dir = temp_dir / "job" job_dir.mkdir() # No job result.json trial_dir = job_dir / "trial1" trial_dir.mkdir() with open(trial_dir / "result.json", "w") as f: json.dump({"config": {"job_id": "trial-job-uuid"}}, f) result = get_job_id_from_dir(job_dir, [trial_dir]) assert result == "trial-job-uuid" def test_returns_none_when_no_job_id(self, temp_dir): job_dir = temp_dir / "job" job_dir.mkdir() trial_dir = job_dir / "trial1" trial_dir.mkdir() with open(trial_dir / "result.json", "w") as f: json.dump({"config": {}}, f) result = get_job_id_from_dir(job_dir, [trial_dir]) assert result is None def test_handles_empty_trial_dirs(self, temp_dir): job_dir = temp_dir / "job" job_dir.mkdir() result = get_job_id_from_dir(job_dir, []) assert result is None # ============================================================================ # Tests: Validation Functions # ============================================================================ class TestValidateJobConfig: """Tests for validate_job_config function. Note: validate_job_config now accepts raw dicts instead of JobConfig models to support custom environment types that aren't in Harbor's EnvironmentType enum. """ def test_valid_config_returns_no_errors(self, valid_job_config): errors = validate_job_config(valid_job_config, "test-job") assert errors == [] def test_invalid_timeout_multiplier(self, valid_job_config): valid_job_config["timeout_multiplier"] = 2.0 errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 1 assert "timeout_multiplier" in errors[0] def test_agent_override_timeout(self, valid_job_config): valid_job_config["agents"] = [{"override_timeout_sec": 3600}] errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 1 assert "override_timeout_sec" in errors[0] def test_agent_max_timeout(self, valid_job_config): valid_job_config["agents"] = [{"max_timeout_sec": 7200}] errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 1 assert "max_timeout_sec" in errors[0] def test_verifier_override_timeout(self, valid_job_config): valid_job_config["verifier"] = {"override_timeout_sec": 600} errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 1 assert "verifier.override_timeout_sec" in errors[0] def test_verifier_max_timeout(self, valid_job_config): valid_job_config["verifier"] = {"max_timeout_sec": 1200} errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 1 assert "verifier.max_timeout_sec" in errors[0] def test_environment_overrides(self, valid_job_config): valid_job_config["environment"] = { "override_cpus": 4, "override_memory_mb": 8192, "override_storage_mb": 10240, } errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 3 def test_multiple_agents_with_issues(self, valid_job_config): valid_job_config["agents"] = [ {"override_timeout_sec": 100}, {"max_timeout_sec": 200}, {}, # Valid agent ] errors = validate_job_config(valid_job_config, "test-job") assert len(errors) == 2 assert "agents[0]" in errors[0] assert "agents[1]" in errors[1] class TestValidateTrialResult: """Tests for validate_trial_result function. Note: validate_trial_result now accepts raw dicts instead of TrialResult models to support custom environment types that aren't in Harbor's EnvironmentType enum. """ def test_valid_trial_returns_no_errors(self, valid_trial_result): errors = validate_trial_result(valid_trial_result, "job1", "trial1") assert errors == [] def test_missing_required_fields(self): """Test that missing required fields produce validation errors.""" # Missing id, trial_name, task_checksum, agent_info trial_data = {"task_name": "test"} errors = validate_trial_result(trial_data, "job1", "trial1") assert len(errors) >= 1 # At least id and agent_info should be flagged assert any("id" in e for e in errors) or any("agent_info" in e for e in errors) def test_missing_agent_info(self): """Test that missing agent_info produces validation error.""" trial_data = { "id": str(uuid4()), "trial_name": "test", "task_checksum": "abc", # Missing agent_info } errors = validate_trial_result(trial_data, "job1", "trial1") assert any("agent_info" in e for e in errors) def test_missing_agent_name_or_version(self): """Test that missing agent name or version produces validation error.""" trial_data = { "id": str(uuid4()), "trial_name": "test", "task_checksum": "abc", "agent_info": {"name": "test"}, # Missing version } errors = validate_trial_result(trial_data, "job1", "trial1") assert any("version" in e for e in errors) class TestCheckRewardHacking: """Tests for check_reward_hacking function.""" def test_clean_agent_files(self, temp_dir): """No errors when agent files contain no forbidden strings.""" trial_dir = temp_dir / "trial-0" agent_dir = trial_dir / "agent" agent_dir.mkdir(parents=True) (agent_dir / "trajectory.json").write_text('{"steps": ["ls", "cat foo.txt"]}') errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert errors == [] def test_no_agent_dir(self, temp_dir): """No errors when agent directory doesn't exist.""" trial_dir = temp_dir / "trial-0" trial_dir.mkdir() errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert errors == [] def test_detects_harbor_framework(self, temp_dir): trial_dir = temp_dir / "trial-0" agent_dir = trial_dir / "agent" agent_dir.mkdir(parents=True) (agent_dir / "trajectory.json").write_text("curl harbor-framework.com/api") errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert len(errors) == 1 assert "Reward hacking" in errors[0] def test_detects_laude_institute(self, temp_dir): trial_dir = temp_dir / "trial-0" agent_dir = trial_dir / "agent" agent_dir.mkdir(parents=True) (agent_dir / "trajectory.json").write_text("contact laude-institute for help") errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert len(errors) == 1 def test_case_insensitive(self, temp_dir): trial_dir = temp_dir / "trial-0" agent_dir = trial_dir / "agent" agent_dir.mkdir(parents=True) (agent_dir / "trajectory.json").write_text("HARBOR-FRAMEWORK is great") errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert len(errors) == 1 def test_binary_files_dont_crash(self, temp_dir): trial_dir = temp_dir / "trial-0" agent_dir = trial_dir / "agent" agent_dir.mkdir(parents=True) (agent_dir / "data.bin").write_bytes(b"\x00\x01\x02\xff\xfe") errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert errors == [] def test_nested_agent_files(self, temp_dir): """Detects forbidden strings in nested subdirectories.""" trial_dir = temp_dir / "trial-0" nested_dir = trial_dir / "agent" / "subdir" nested_dir.mkdir(parents=True) (nested_dir / "log.txt").write_text("harborframework exploit") errors = check_reward_hacking(trial_dir, "job1", "trial-0") assert len(errors) == 1 assert "subdir/log.txt" in errors[0] class TestValidateSubmission: """Tests for validate_submission function.""" def test_valid_submission( self, temp_dir, valid_metadata, valid_job_config, ): job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} trials = make_full_trial_set(job_id) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert result.is_valid assert result.job_count == 1 assert result.trial_count == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK assert result.successful_trials == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK def test_missing_metadata(self, temp_dir, valid_job_config, valid_trial_result): submission_dir = create_submission( temp_dir, metadata=None, job_config=valid_job_config, trial_results=[valid_trial_result], ) result = validate_submission(submission_dir) assert not result.is_valid assert "Missing `metadata.yaml`" in result.errors[0] def test_invalid_metadata(self, temp_dir, valid_job_config, valid_trial_result): submission_dir = create_submission( temp_dir, metadata={"invalid": "metadata"}, # Missing required fields job_config=valid_job_config, trial_results=[valid_trial_result], ) result = validate_submission(submission_dir) assert not result.is_valid assert "Invalid `metadata.yaml`" in result.errors[0] def test_no_job_directories(self, temp_dir, valid_metadata): submission_dir = create_submission(temp_dir, metadata=valid_metadata) result = validate_submission(submission_dir) assert not result.is_valid assert "No job directories found" in result.errors[0] def test_no_trial_directories(self, temp_dir, valid_metadata, valid_job_config): submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=None, # No trials ) result = validate_submission(submission_dir) assert not result.is_valid assert "No trial directories found" in result.errors[0] def test_job_id_mismatch( self, temp_dir, valid_metadata, valid_job_config, valid_trial_result, valid_job_result, ): # Create two trials with different job_ids (must be valid UUIDs) job_id_1 = str(uuid4()) job_id_2 = str(uuid4()) trial1 = valid_trial_result.copy() trial1["config"] = dict(valid_trial_result["config"]) trial1["config"]["job_id"] = job_id_1 trial2 = valid_trial_result.copy() trial2["id"] = str(uuid4()) trial2["config"] = dict(valid_trial_result["config"]) trial2["config"]["job_id"] = job_id_2 job_result = valid_job_result.copy() job_result["id"] = job_id_1 submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=[trial1, trial2], job_result=job_result, ) result = validate_submission(submission_dir) assert not result.is_valid assert "does not match job's id" in result.errors[0] def test_yml_metadata_extension( self, temp_dir, valid_metadata, valid_job_config, ): """Test that .yml extension works too.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} trials = make_full_trial_set(job_id) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, metadata_ext="yml", ) result = validate_submission(submission_dir) assert result.is_valid def test_trial_with_optional_null_fields( self, temp_dir, valid_metadata, valid_job_config ): """Test trial with optional fields as null.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} # Build a full trial set, but make the last trial have no optional fields trials = make_full_trial_set(job_id) # Replace the last trial with one that has no optional fields trials[-1] = { "id": str(uuid4()), "task_name": "test-task", "trial_name": "test-trial", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": trials[-1]["task_checksum"], "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "agent", "version": "1.0"}, # Optional fields not present (which is fine) } submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert result.is_valid # One trial has no verifier_result, so successful_trials should be total - 1 assert result.successful_trials == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK - 1 def test_accuracy_calculation(self, temp_dir, valid_metadata, valid_job_config): """Test accuracy is calculated correctly.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} # Start with full trial set (all reward=1.0) trials = make_full_trial_set(job_id, reward=1.0) total = len(trials) # Replace the last 2 trials: one with reward=0 and one with no reward trials[-2] = { "id": str(uuid4()), "task_name": "test", "trial_name": "t", "trial_uri": "file:///tmp/t", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": trials[-2]["task_checksum"], "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "a", "version": "1"}, "verifier_result": {"rewards": {"reward": 0.0}}, } trials[-1] = { "id": str(uuid4()), "task_name": "test", "trial_name": "t", "trial_uri": "file:///tmp/t", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": trials[-1]["task_checksum"], "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "a", "version": "1"}, "verifier_result": None, } submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert result.is_valid assert result.trial_count == total assert result.successful_trials == total - 2 assert result.accuracy == pytest.approx((total - 2) / total) class TestTaskCoverageValidation: """Tests for task coverage and minimum trials per task validation.""" def _make_trial(self, job_id: str, task_checksum: str) -> dict: """Create a valid trial result with a specific task_checksum.""" return { "id": str(uuid4()), "task_name": "test-task", "trial_name": "test-trial", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": task_checksum, "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "test-agent", "version": "1.0.0"}, "verifier_result": {"rewards": {"reward": 1.0}}, } def test_minimum_trials_per_task(self, temp_dir, valid_metadata, valid_job_config): """A task with fewer than MIN_TRIALS_PER_TASK trials should fail.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} # Create 89 tasks: 88 with 5 trials, 1 with only 3 trials trials = [] for i in range(88): for _ in range(MIN_TRIALS_PER_TASK): trials.append(self._make_trial(job_id, f"checksum_{i:03d}")) # One task with only 3 trials for _ in range(3): trials.append(self._make_trial(job_id, "checksum_088")) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert not result.is_valid assert any("has only 3 trial(s)" in e for e in result.errors) assert any("checksum_088" in e for e in result.errors) def test_minimum_trials_per_task_passes( self, temp_dir, valid_metadata, valid_job_config ): """A submission with exactly MIN_TRIALS_PER_TASK trials per task across all 89 tasks should pass.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} trials = [] for i in range(EXPECTED_TASK_COUNT): for _ in range(MIN_TRIALS_PER_TASK): trials.append(self._make_trial(job_id, f"checksum_{i:03d}")) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert result.is_valid assert result.trial_count == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK def test_insufficient_unique_tasks( self, temp_dir, valid_metadata, valid_job_config ): """A submission with fewer than EXPECTED_TASK_COUNT unique tasks should fail.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} # Only 10 unique tasks, each with 5 trials trials = [] for i in range(10): for _ in range(MIN_TRIALS_PER_TASK): trials.append(self._make_trial(job_id, f"checksum_{i:03d}")) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert not result.is_valid assert any( f"covers 10 unique task(s), expected {EXPECTED_TASK_COUNT}" in e for e in result.errors ) def test_multiple_tasks_mixed(self, temp_dir, valid_metadata, valid_job_config): """Only tasks with fewer than MIN_TRIALS_PER_TASK trials trigger per-task errors.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} # 89 tasks: 88 with 5 trials, 1 with only 2 trials trials = [] for i in range(88): for _ in range(MIN_TRIALS_PER_TASK): trials.append(self._make_trial(job_id, f"checksum_{i:03d}")) for _ in range(2): trials.append(self._make_trial(job_id, "checksum_short")) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) assert not result.is_valid # Only the under-count task should trigger a per-task error per_task_errors = [e for e in result.errors if "has only" in e] assert len(per_task_errors) == 1 assert "checksum_short" in per_task_errors[0] assert "has only 2 trial(s)" in per_task_errors[0] class TestFormatValidationComment: """Tests for format_validation_comment function.""" def test_all_valid(self): result = ValidationResult( models=["gpt-4 (openai)"], job_count=1, trial_count=10, successful_trials=8, ) comment = format_validation_comment([("my-agent", result)]) assert "passed validation" in comment assert "Ready to merge" in comment assert "80.0%" in comment assert "gpt-4 (openai)" in comment def test_validation_failed(self): result = ValidationResult(errors=["Error 1", "Error 2"]) comment = format_validation_comment([("my-agent", result)]) assert "Validation failed" in comment assert "Error 1" in comment assert "Error 2" in comment assert "fix the errors" in comment def test_truncates_errors(self): errors = [f"Error {i}" for i in range(30)] result = ValidationResult(errors=errors) comment = format_validation_comment([("my-agent", result)]) assert "... and 10 more errors" in comment def test_multiple_submissions(self): result1 = ValidationResult( models=["gpt-4 (openai)"], job_count=1, trial_count=5 ) result2 = ValidationResult( models=["claude-3 (anthropic)"], job_count=2, trial_count=10 ) comment = format_validation_comment([("agent1", result1), ("agent2", result2)]) assert "agent1" in comment assert "agent2" in comment assert "gpt-4" in comment assert "claude-3" in comment # ============================================================================ # Tests: Import Functions # ============================================================================ class TestJobExists: """Tests for job_exists function.""" @pytest.mark.asyncio async def test_returns_true_when_job_exists(self): mock_client = MagicMock() mock_client.table.return_value.select.return_value.eq.return_value.execute = ( AsyncMock(return_value=MagicMock(data=[{"id": "123"}])) ) assert await job_exists(mock_client, "123") is True @pytest.mark.asyncio async def test_returns_false_when_job_missing(self): mock_client = MagicMock() mock_client.table.return_value.select.return_value.eq.return_value.execute = ( AsyncMock(return_value=MagicMock(data=[])) ) assert await job_exists(mock_client, "123") is False class TestImportSubmission: """Tests for import_submission function.""" @pytest.mark.asyncio @patch("app.get_supabase_client", new_callable=AsyncMock) @patch("app.job_exists", new_callable=AsyncMock) @patch("app.upsert_with_retry", new_callable=AsyncMock) @patch("app.insert_ignore_conflicts", new_callable=AsyncMock) async def test_skips_existing_job( self, mock_insert_ignore, mock_upsert, mock_job_exists, mock_get_client, temp_dir, valid_metadata, valid_job_config, valid_trial_result, valid_job_result, ): """Test that existing jobs are skipped.""" mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_job_exists.return_value = True submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=[valid_trial_result], job_result=valid_job_result, ) metadata = SubmissionMetadata.model_validate(valid_metadata) stats = await import_submission(submission_dir, metadata) assert stats["jobs_imported"] == 0 assert stats["trials_imported"] == 0 mock_upsert.assert_not_called() @pytest.mark.asyncio @patch("app.get_supabase_client", new_callable=AsyncMock) @patch("app.job_exists", new_callable=AsyncMock) @patch("app.upsert_with_retry", new_callable=AsyncMock) @patch("app.insert_ignore_conflicts", new_callable=AsyncMock) @patch( "app.upload_trial_to_storage", new_callable=AsyncMock, return_value="https://storage.example.com/trial.tar.gz", ) async def test_imports_new_job( self, mock_upload, mock_insert_ignore, mock_upsert, mock_job_exists, mock_get_client, temp_dir, valid_metadata, valid_job_config, valid_trial_result, valid_job_result, ): """Test that new jobs are imported.""" mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_job_exists.return_value = False submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=[valid_trial_result], job_result=valid_job_result, ) metadata = SubmissionMetadata.model_validate(valid_metadata) stats = await import_submission(submission_dir, metadata) assert stats["jobs_imported"] == 1 assert stats["trials_imported"] == 1 assert mock_upsert.call_count >= 3 # job, trial, trial_model at minimum @pytest.mark.asyncio @patch("app.get_supabase_client", new_callable=AsyncMock) @patch("app.job_exists", new_callable=AsyncMock) @patch("app.upsert_with_retry", new_callable=AsyncMock) @patch("app.insert_ignore_conflicts", new_callable=AsyncMock) @patch( "app.upload_trial_to_storage", new_callable=AsyncMock, return_value="https://storage.example.com/trial.tar.gz", ) async def test_handles_trial_without_optional_fields( self, mock_upload, mock_insert_ignore, mock_upsert, mock_job_exists, mock_get_client, temp_dir, valid_metadata, valid_job_config, valid_job_result, ): """Test that trials without optional fields are handled correctly.""" mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_job_exists.return_value = False job_id = valid_job_result["id"] # Minimal trial with only required fields trial = { "id": str(uuid4()), "task_name": "test", "trial_name": "test", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": "abc", "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "agent", "version": "1.0"}, # No optional fields } submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=[trial], job_result=valid_job_result, ) metadata = SubmissionMetadata.model_validate(valid_metadata) # This should not raise an error stats = await import_submission(submission_dir, metadata) assert stats["jobs_imported"] == 1 assert stats["trials_imported"] == 1 assert len(stats["errors"]) == 0 @pytest.mark.asyncio @patch("app.get_supabase_client", new_callable=AsyncMock) @patch("app.job_exists", new_callable=AsyncMock) @patch("app.upsert_with_retry", new_callable=AsyncMock) @patch("app.insert_ignore_conflicts", new_callable=AsyncMock) async def test_parallel_uploads_get_correct_urls( self, mock_insert_ignore, mock_upsert, mock_job_exists, mock_get_client, temp_dir, valid_metadata, valid_job_config, valid_job_result, ): """Test that multiple trials are uploaded in parallel and each gets the correct storage URL.""" mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_job_exists.return_value = False job_id = valid_job_result["id"] trial_ids = [str(uuid4()) for _ in range(3)] trials = [] for tid in trial_ids: trials.append( { "id": tid, "task_name": "test", "trial_name": "test", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": "abc", "config": { "task": {"type": "local", "path": "/tmp/task"}, "job_id": job_id, }, "agent_info": {"name": "agent", "version": "1.0"}, } ) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=valid_job_result, ) metadata = SubmissionMetadata.model_validate(valid_metadata) # upload_trial_to_storage returns a URL based on the trial_id async def fake_upload(client, trial_dir, trial_id): return f"https://storage.example.com/{trial_id}.tar.gz" with patch( "app.upload_trial_to_storage", side_effect=fake_upload ) as mock_upload: stats = await import_submission(submission_dir, metadata) assert stats["jobs_imported"] == 1 assert stats["trials_imported"] == 3 assert len(stats["errors"]) == 0 # Verify upload was called for each trial assert mock_upload.call_count == 3 uploaded_ids = {call.args[2] for call in mock_upload.call_args_list} assert uploaded_ids == set(trial_ids) # Verify each trial insert has the correct URL # Find the upsert call for the "trial" table trial_upsert_calls = [ call for call in mock_upsert.call_args_list if call.args[1] == "trial" ] assert len(trial_upsert_calls) == 1 trial_data = trial_upsert_calls[0].args[2] for t in trial_data: expected_url = f"https://storage.example.com/{t['id']}.tar.gz" assert t["trial_uri"] == expected_url @pytest.mark.asyncio @patch("app.get_supabase_client", new_callable=AsyncMock) @patch("app.job_exists", new_callable=AsyncMock) @patch("app.upsert_with_retry", new_callable=AsyncMock) @patch("app.insert_ignore_conflicts", new_callable=AsyncMock) async def test_handles_missing_job_id( self, mock_insert_ignore, mock_upsert, mock_job_exists, mock_get_client, temp_dir, valid_metadata, valid_job_config, ): """Test that missing job_id is handled as an error.""" mock_client = AsyncMock() mock_get_client.return_value = mock_client # Trial without job_id trial = { "id": str(uuid4()), "task_name": "test", "trial_name": "test", "trial_uri": "file:///tmp/trial", "task_id": {"type": "local", "path": "/tmp/task"}, "task_checksum": "abc", "config": { "task": {"type": "local", "path": "/tmp/task"}, # No job_id }, "agent_info": {"name": "agent", "version": "1.0"}, } submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=[trial], # No job_result either ) metadata = SubmissionMetadata.model_validate(valid_metadata) stats = await import_submission(submission_dir, metadata) assert stats["jobs_imported"] == 0 assert "Could not find job_id" in stats["errors"][0] # ============================================================================ # Tests: HF Hub Operations # ============================================================================ class TestGetChangedSubmissionNames: """Tests for get_changed_submission_names function.""" @pytest.mark.asyncio async def test_detects_new_folders(self): """Folders present in PR but not in main are detected.""" mock_api = MagicMock() pr_folder = MagicMock(spec=RepoFolder) pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4" pr_folder.tree_id = "abc123" mock_api.list_repo_tree.side_effect = [ [pr_folder], # PR revision [], # main ] result = await get_changed_submission_names(mock_api, "refs/pr/1") assert result == ["agent1__gpt4"] @pytest.mark.asyncio async def test_detects_changed_folders(self): """Folders with different tree_ids are detected as changed.""" mock_api = MagicMock() pr_folder = MagicMock(spec=RepoFolder) pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4" pr_folder.tree_id = "new_hash" main_folder = MagicMock(spec=RepoFolder) main_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4" main_folder.tree_id = "old_hash" mock_api.list_repo_tree.side_effect = [ [pr_folder], # PR revision [main_folder], # main ] result = await get_changed_submission_names(mock_api, "refs/pr/1") assert result == ["agent1__gpt4"] @pytest.mark.asyncio async def test_ignores_unchanged_folders(self): """Folders with same tree_id in PR and main are not detected.""" mock_api = MagicMock() pr_folder = MagicMock(spec=RepoFolder) pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4" pr_folder.tree_id = "same_hash" main_folder = MagicMock(spec=RepoFolder) main_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4" main_folder.tree_id = "same_hash" mock_api.list_repo_tree.side_effect = [ [pr_folder], [main_folder], ] result = await get_changed_submission_names(mock_api, "refs/pr/1") assert result == [] class TestGetNewJobIdsBySubmission: """Tests for get_new_job_ids_by_submission function.""" def test_extracts_job_ids(self): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) / "submissions" / "terminal-bench" / "2.0" job_dir = base / "agent1__gpt4" / "job1" job_dir.mkdir(parents=True) (job_dir / "result.json").write_text(json.dumps({"id": "uuid-123"})) result = get_new_job_ids_by_submission(Path(tmpdir)) assert result == {"agent1__gpt4": ["uuid-123"]} def test_skips_missing_id(self): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) / "submissions" / "terminal-bench" / "2.0" job_dir = base / "agent1__gpt4" / "job1" job_dir.mkdir(parents=True) (job_dir / "result.json").write_text(json.dumps({"no_id": True})) result = get_new_job_ids_by_submission(Path(tmpdir)) assert result == {} def test_empty_dir(self): with tempfile.TemporaryDirectory() as tmpdir: result = get_new_job_ids_by_submission(Path(tmpdir)) assert result == {} class TestBatchCheckExistingJobs: """Tests for batch_check_existing_jobs function.""" @pytest.mark.asyncio async def test_returns_existing_ids(self): mock_client = MagicMock() mock_client.table.return_value.select.return_value.in_.return_value.execute = ( AsyncMock(return_value=MagicMock(data=[{"id": "uuid-1"}, {"id": "uuid-3"}])) ) result = await batch_check_existing_jobs( mock_client, ["uuid-1", "uuid-2", "uuid-3"] ) assert result == {"uuid-1", "uuid-3"} @pytest.mark.asyncio async def test_empty_input(self): mock_client = MagicMock() result = await batch_check_existing_jobs(mock_client, []) assert result == set() mock_client.table.assert_not_called() # ============================================================================ # Tests: ValidationResult # ============================================================================ class TestValidationResult: """Tests for ValidationResult dataclass.""" def test_is_valid_with_no_errors(self): result = ValidationResult() assert result.is_valid is True def test_is_valid_with_errors(self): result = ValidationResult(errors=["Error"]) assert result.is_valid is False def test_accuracy_with_trials(self): result = ValidationResult(trial_count=10, successful_trials=7) assert result.accuracy == pytest.approx(0.7) def test_accuracy_with_no_trials(self): result = ValidationResult(trial_count=0, successful_trials=0) assert result.accuracy is None # ============================================================================ # Tests: Pydantic Models # ============================================================================ class TestSubmissionMetadata: """Tests for SubmissionMetadata model.""" def test_valid_metadata(self, valid_metadata): metadata = SubmissionMetadata.model_validate(valid_metadata) assert metadata.agent_url == "https://example.com/agent" assert len(metadata.models) == 1 assert metadata.models[0].model_name == "gpt-4" def test_missing_agent_url(self, valid_metadata): del valid_metadata["agent_url"] with pytest.raises(Exception): # Pydantic validation error SubmissionMetadata.model_validate(valid_metadata) def test_missing_models(self, valid_metadata): del valid_metadata["models"] with pytest.raises(Exception): SubmissionMetadata.model_validate(valid_metadata) def test_optional_fields(self): metadata = SubmissionMetadata.model_validate( { "agent_url": "https://example.com", "models": [{"model_name": "gpt-4", "model_provider": "openai"}], "agent_display_name": "My Agent", "agent_org_display_name": "My Org", } ) assert metadata.agent_display_name == "My Agent" assert metadata.agent_org_display_name == "My Org" class TestModelMetadata: """Tests for ModelMetadata model.""" def test_valid_model(self): model = ModelMetadata.model_validate( { "model_name": "gpt-4", "model_provider": "openai", } ) assert model.model_name == "gpt-4" assert model.model_provider == "openai" def test_optional_fields(self): model = ModelMetadata.model_validate( { "model_name": "gpt-4", "model_provider": "openai", "model_display_name": "GPT-4", "model_org_display_name": "OpenAI", } ) assert model.model_display_name == "GPT-4" # ============================================================================ # Integration Tests # ============================================================================ class TestEndToEnd: """End-to-end integration tests.""" def test_full_validation_flow( self, temp_dir, valid_metadata, valid_job_config, ): """Test complete validation from directory to comment.""" job_id = str(uuid4()) job_result = {"id": job_id, "started_at": None, "finished_at": None} trials = make_full_trial_set(job_id) submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=valid_job_config, trial_results=trials, job_result=job_result, ) result = validate_submission(submission_dir) comment = format_validation_comment([(submission_dir.name, result)]) assert result.is_valid assert "passed validation" in comment assert "100.0%" in comment # all successful def test_validation_catches_config_errors( self, temp_dir, valid_metadata, valid_trial_result, valid_job_result ): """Test that validation catches job config errors.""" job_config = { "timeout_multiplier": 2.0, # Error "agents": [{"override_timeout_sec": 100}], # Error "environment": {"override_cpus": 4}, # Error } submission_dir = create_submission( temp_dir, metadata=valid_metadata, job_config=job_config, trial_results=[valid_trial_result], job_result=valid_job_result, ) result = validate_submission(submission_dir) assert not result.is_valid assert len(result.errors) >= 3 # At least 3 config errors # Note: TestHarborModels class was removed because we no longer use JobConfig # and TrialResult models for validation. This allows us to support custom # environment types that aren't in Harbor's EnvironmentType enum. # We still use JobResult model which doesn't have enum restrictions.