alexgshaw's picture
Add reward hacking validation check.
32096df
"""
Unit tests for the Terminal-Bench Leaderboard Importer.
Run with: pytest test_app.py -v
"""
import json
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import pytest
import yaml
from huggingface_hub import RepoFolder
from app import (
EXPECTED_TASK_COUNT,
MIN_TRIALS_PER_TASK,
ModelMetadata,
SubmissionMetadata,
ValidationResult,
check_reward_hacking,
find_metadata_file,
format_validation_comment,
batch_check_existing_jobs,
get_changed_submission_names,
get_new_job_ids_by_submission,
get_job_dirs,
get_job_id_from_dir,
get_trial_dirs,
has_metadata_file,
import_submission,
job_exists,
load_json,
load_yaml,
validate_job_config,
validate_submission,
validate_trial_result,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def temp_dir():
"""Create a temporary directory for test files."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def valid_metadata():
"""Create valid metadata content."""
return {
"agent_url": "https://example.com/agent",
"models": [
{"model_name": "gpt-4", "model_provider": "openai"},
],
}
@pytest.fixture
def valid_job_config():
"""Create a valid job config dict that matches Harbor's JobConfig schema."""
return {
"job_name": "test-job",
"timeout_multiplier": 1.0,
"agents": [{"name": "test-agent"}],
"verifier": {},
"environment": {},
}
@pytest.fixture
def valid_trial_result():
"""Create a valid trial result dict that matches Harbor's TrialResult schema."""
job_id = str(uuid4())
return {
"id": str(uuid4()),
"task_name": "test-task",
"trial_name": "test-trial",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": "abc123",
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "test-agent", "version": "1.0.0"},
"verifier_result": {"rewards": {"reward": 1.0}},
"agent_execution": {
"started_at": "2024-01-01T00:00:00Z",
"finished_at": "2024-01-01T00:01:00Z",
},
"agent_setup": {
"started_at": "2024-01-01T00:00:00Z",
"finished_at": "2024-01-01T00:00:30Z",
},
"environment_setup": {
"started_at": "2024-01-01T00:00:00Z",
"finished_at": "2024-01-01T00:00:15Z",
},
"verifier": {
"started_at": "2024-01-01T00:01:00Z",
"finished_at": "2024-01-01T00:01:30Z",
},
"agent_result": {"n_input_tokens": 1000, "n_output_tokens": 500},
"_job_id": job_id, # Store for later reference
}
@pytest.fixture
def valid_job_result(valid_trial_result):
"""Create a valid job result dict that matches Harbor's JobResult schema."""
return {
"id": valid_trial_result["config"]["job_id"],
"started_at": "2024-01-01T00:00:00Z",
"finished_at": "2024-01-01T01:00:00Z",
"n_total_trials": 1,
"stats": {
"n_succeeded": 1,
"n_failed": 0,
"n_trials_with_exception": 0,
},
}
def make_full_trial_set(job_id: str, reward: float = 1.0) -> list[dict]:
"""Generate a full set of valid trials: EXPECTED_TASK_COUNT tasks * MIN_TRIALS_PER_TASK trials each."""
trials = []
for i in range(EXPECTED_TASK_COUNT):
for _ in range(MIN_TRIALS_PER_TASK):
trials.append(
{
"id": str(uuid4()),
"task_name": f"task-{i}",
"trial_name": "trial",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": f"checksum_{i:03d}",
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "test-agent", "version": "1.0.0"},
"verifier_result": {"rewards": {"reward": reward}},
}
)
return trials
def create_submission(
temp_dir: Path,
metadata: dict | None = None,
job_config: dict | None = None,
trial_results: list[dict] | None = None,
job_result: dict | None = None,
metadata_ext: str = "yaml",
) -> Path:
"""Helper to create a complete submission directory structure."""
submission_dir = temp_dir / "test-submission"
submission_dir.mkdir(parents=True)
# Create metadata file
if metadata is not None:
metadata_path = submission_dir / f"metadata.{metadata_ext}"
with open(metadata_path, "w") as f:
yaml.dump(metadata, f)
# Create job directory
if job_config is not None or trial_results is not None:
job_dir = submission_dir / "2024-01-01__00-00-00"
job_dir.mkdir()
if job_config is not None:
with open(job_dir / "config.json", "w") as f:
json.dump(job_config, f)
if job_result is not None:
with open(job_dir / "result.json", "w") as f:
json.dump(job_result, f)
if trial_results is not None:
for i, trial_result in enumerate(trial_results):
trial_dir = job_dir / f"trial-{i}"
trial_dir.mkdir()
with open(trial_dir / "result.json", "w") as f:
json.dump(trial_result, f)
return submission_dir
# ============================================================================
# Tests: Helper Functions
# ============================================================================
class TestFindMetadataFile:
"""Tests for find_metadata_file function."""
def test_yaml_extension(self, temp_dir):
(temp_dir / "metadata.yaml").touch()
assert find_metadata_file(temp_dir) == temp_dir / "metadata.yaml"
def test_yml_extension(self, temp_dir):
(temp_dir / "metadata.yml").touch()
assert find_metadata_file(temp_dir) == temp_dir / "metadata.yml"
def test_prefers_yaml_over_yml(self, temp_dir):
"""When both exist, .yaml should be preferred."""
(temp_dir / "metadata.yaml").touch()
(temp_dir / "metadata.yml").touch()
assert find_metadata_file(temp_dir) == temp_dir / "metadata.yaml"
def test_no_metadata_file(self, temp_dir):
assert find_metadata_file(temp_dir) is None
def test_wrong_filename(self, temp_dir):
(temp_dir / "meta.yaml").touch()
assert find_metadata_file(temp_dir) is None
class TestHasMetadataFile:
"""Tests for has_metadata_file function."""
def test_returns_true_when_exists(self, temp_dir):
(temp_dir / "metadata.yaml").touch()
assert has_metadata_file(temp_dir) is True
def test_returns_false_when_missing(self, temp_dir):
assert has_metadata_file(temp_dir) is False
class TestGetJobDirs:
"""Tests for get_job_dirs function."""
def test_finds_dirs_with_config_json(self, temp_dir):
job1 = temp_dir / "job1"
job1.mkdir()
(job1 / "config.json").touch()
job2 = temp_dir / "job2"
job2.mkdir()
(job2 / "config.json").touch()
result = get_job_dirs(temp_dir)
assert len(result) == 2
assert set(d.name for d in result) == {"job1", "job2"}
def test_ignores_dirs_without_config_json(self, temp_dir):
job1 = temp_dir / "job1"
job1.mkdir()
(job1 / "config.json").touch()
job2 = temp_dir / "job2"
job2.mkdir()
# No config.json
result = get_job_dirs(temp_dir)
assert len(result) == 1
assert result[0].name == "job1"
def test_ignores_files(self, temp_dir):
(temp_dir / "config.json").touch() # File, not dir
assert get_job_dirs(temp_dir) == []
def test_empty_directory(self, temp_dir):
assert get_job_dirs(temp_dir) == []
class TestGetTrialDirs:
"""Tests for get_trial_dirs function."""
def test_finds_dirs_with_result_json(self, temp_dir):
trial1 = temp_dir / "trial1"
trial1.mkdir()
(trial1 / "result.json").touch()
result = get_trial_dirs(temp_dir)
assert len(result) == 1
assert result[0].name == "trial1"
def test_ignores_dirs_without_result_json(self, temp_dir):
trial1 = temp_dir / "trial1"
trial1.mkdir()
(trial1 / "result.json").touch()
trial2 = temp_dir / "trial2"
trial2.mkdir()
# No result.json
result = get_trial_dirs(temp_dir)
assert len(result) == 1
class TestLoadJson:
"""Tests for load_json function."""
def test_loads_valid_json(self, temp_dir):
path = temp_dir / "test.json"
with open(path, "w") as f:
json.dump({"key": "value"}, f)
assert load_json(path) == {"key": "value"}
def test_raises_on_invalid_json(self, temp_dir):
path = temp_dir / "test.json"
with open(path, "w") as f:
f.write("not valid json")
with pytest.raises(json.JSONDecodeError):
load_json(path)
def test_raises_on_missing_file(self, temp_dir):
with pytest.raises(FileNotFoundError):
load_json(temp_dir / "missing.json")
class TestLoadYaml:
"""Tests for load_yaml function."""
def test_loads_yaml_file(self, temp_dir):
path = temp_dir / "test.yaml"
with open(path, "w") as f:
yaml.dump({"key": "value"}, f)
assert load_yaml(path) == {"key": "value"}
class TestGetJobIdFromDir:
"""Tests for get_job_id_from_dir function."""
def test_gets_id_from_job_result(self, temp_dir):
job_dir = temp_dir / "job"
job_dir.mkdir()
with open(job_dir / "result.json", "w") as f:
json.dump({"id": "job-uuid-123"}, f)
trial_dir = job_dir / "trial1"
trial_dir.mkdir()
with open(trial_dir / "result.json", "w") as f:
json.dump({"config": {"job_id": "different-uuid"}}, f)
result = get_job_id_from_dir(job_dir, [trial_dir])
assert result == "job-uuid-123"
def test_falls_back_to_trial_config(self, temp_dir):
job_dir = temp_dir / "job"
job_dir.mkdir()
# No job result.json
trial_dir = job_dir / "trial1"
trial_dir.mkdir()
with open(trial_dir / "result.json", "w") as f:
json.dump({"config": {"job_id": "trial-job-uuid"}}, f)
result = get_job_id_from_dir(job_dir, [trial_dir])
assert result == "trial-job-uuid"
def test_returns_none_when_no_job_id(self, temp_dir):
job_dir = temp_dir / "job"
job_dir.mkdir()
trial_dir = job_dir / "trial1"
trial_dir.mkdir()
with open(trial_dir / "result.json", "w") as f:
json.dump({"config": {}}, f)
result = get_job_id_from_dir(job_dir, [trial_dir])
assert result is None
def test_handles_empty_trial_dirs(self, temp_dir):
job_dir = temp_dir / "job"
job_dir.mkdir()
result = get_job_id_from_dir(job_dir, [])
assert result is None
# ============================================================================
# Tests: Validation Functions
# ============================================================================
class TestValidateJobConfig:
"""Tests for validate_job_config function.
Note: validate_job_config now accepts raw dicts instead of JobConfig models
to support custom environment types that aren't in Harbor's EnvironmentType enum.
"""
def test_valid_config_returns_no_errors(self, valid_job_config):
errors = validate_job_config(valid_job_config, "test-job")
assert errors == []
def test_invalid_timeout_multiplier(self, valid_job_config):
valid_job_config["timeout_multiplier"] = 2.0
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 1
assert "timeout_multiplier" in errors[0]
def test_agent_override_timeout(self, valid_job_config):
valid_job_config["agents"] = [{"override_timeout_sec": 3600}]
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 1
assert "override_timeout_sec" in errors[0]
def test_agent_max_timeout(self, valid_job_config):
valid_job_config["agents"] = [{"max_timeout_sec": 7200}]
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 1
assert "max_timeout_sec" in errors[0]
def test_verifier_override_timeout(self, valid_job_config):
valid_job_config["verifier"] = {"override_timeout_sec": 600}
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 1
assert "verifier.override_timeout_sec" in errors[0]
def test_verifier_max_timeout(self, valid_job_config):
valid_job_config["verifier"] = {"max_timeout_sec": 1200}
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 1
assert "verifier.max_timeout_sec" in errors[0]
def test_environment_overrides(self, valid_job_config):
valid_job_config["environment"] = {
"override_cpus": 4,
"override_memory_mb": 8192,
"override_storage_mb": 10240,
}
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 3
def test_multiple_agents_with_issues(self, valid_job_config):
valid_job_config["agents"] = [
{"override_timeout_sec": 100},
{"max_timeout_sec": 200},
{}, # Valid agent
]
errors = validate_job_config(valid_job_config, "test-job")
assert len(errors) == 2
assert "agents[0]" in errors[0]
assert "agents[1]" in errors[1]
class TestValidateTrialResult:
"""Tests for validate_trial_result function.
Note: validate_trial_result now accepts raw dicts instead of TrialResult models
to support custom environment types that aren't in Harbor's EnvironmentType enum.
"""
def test_valid_trial_returns_no_errors(self, valid_trial_result):
errors = validate_trial_result(valid_trial_result, "job1", "trial1")
assert errors == []
def test_missing_required_fields(self):
"""Test that missing required fields produce validation errors."""
# Missing id, trial_name, task_checksum, agent_info
trial_data = {"task_name": "test"}
errors = validate_trial_result(trial_data, "job1", "trial1")
assert len(errors) >= 1
# At least id and agent_info should be flagged
assert any("id" in e for e in errors) or any("agent_info" in e for e in errors)
def test_missing_agent_info(self):
"""Test that missing agent_info produces validation error."""
trial_data = {
"id": str(uuid4()),
"trial_name": "test",
"task_checksum": "abc",
# Missing agent_info
}
errors = validate_trial_result(trial_data, "job1", "trial1")
assert any("agent_info" in e for e in errors)
def test_missing_agent_name_or_version(self):
"""Test that missing agent name or version produces validation error."""
trial_data = {
"id": str(uuid4()),
"trial_name": "test",
"task_checksum": "abc",
"agent_info": {"name": "test"}, # Missing version
}
errors = validate_trial_result(trial_data, "job1", "trial1")
assert any("version" in e for e in errors)
class TestCheckRewardHacking:
"""Tests for check_reward_hacking function."""
def test_clean_agent_files(self, temp_dir):
"""No errors when agent files contain no forbidden strings."""
trial_dir = temp_dir / "trial-0"
agent_dir = trial_dir / "agent"
agent_dir.mkdir(parents=True)
(agent_dir / "trajectory.json").write_text('{"steps": ["ls", "cat foo.txt"]}')
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert errors == []
def test_no_agent_dir(self, temp_dir):
"""No errors when agent directory doesn't exist."""
trial_dir = temp_dir / "trial-0"
trial_dir.mkdir()
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert errors == []
def test_detects_harbor_framework(self, temp_dir):
trial_dir = temp_dir / "trial-0"
agent_dir = trial_dir / "agent"
agent_dir.mkdir(parents=True)
(agent_dir / "trajectory.json").write_text("curl harbor-framework.com/api")
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert len(errors) == 1
assert "Reward hacking" in errors[0]
def test_detects_laude_institute(self, temp_dir):
trial_dir = temp_dir / "trial-0"
agent_dir = trial_dir / "agent"
agent_dir.mkdir(parents=True)
(agent_dir / "trajectory.json").write_text("contact laude-institute for help")
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert len(errors) == 1
def test_case_insensitive(self, temp_dir):
trial_dir = temp_dir / "trial-0"
agent_dir = trial_dir / "agent"
agent_dir.mkdir(parents=True)
(agent_dir / "trajectory.json").write_text("HARBOR-FRAMEWORK is great")
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert len(errors) == 1
def test_binary_files_dont_crash(self, temp_dir):
trial_dir = temp_dir / "trial-0"
agent_dir = trial_dir / "agent"
agent_dir.mkdir(parents=True)
(agent_dir / "data.bin").write_bytes(b"\x00\x01\x02\xff\xfe")
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert errors == []
def test_nested_agent_files(self, temp_dir):
"""Detects forbidden strings in nested subdirectories."""
trial_dir = temp_dir / "trial-0"
nested_dir = trial_dir / "agent" / "subdir"
nested_dir.mkdir(parents=True)
(nested_dir / "log.txt").write_text("harborframework exploit")
errors = check_reward_hacking(trial_dir, "job1", "trial-0")
assert len(errors) == 1
assert "subdir/log.txt" in errors[0]
class TestValidateSubmission:
"""Tests for validate_submission function."""
def test_valid_submission(
self,
temp_dir,
valid_metadata,
valid_job_config,
):
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
trials = make_full_trial_set(job_id)
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert result.is_valid
assert result.job_count == 1
assert result.trial_count == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK
assert result.successful_trials == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK
def test_missing_metadata(self, temp_dir, valid_job_config, valid_trial_result):
submission_dir = create_submission(
temp_dir,
metadata=None,
job_config=valid_job_config,
trial_results=[valid_trial_result],
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert "Missing `metadata.yaml`" in result.errors[0]
def test_invalid_metadata(self, temp_dir, valid_job_config, valid_trial_result):
submission_dir = create_submission(
temp_dir,
metadata={"invalid": "metadata"}, # Missing required fields
job_config=valid_job_config,
trial_results=[valid_trial_result],
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert "Invalid `metadata.yaml`" in result.errors[0]
def test_no_job_directories(self, temp_dir, valid_metadata):
submission_dir = create_submission(temp_dir, metadata=valid_metadata)
result = validate_submission(submission_dir)
assert not result.is_valid
assert "No job directories found" in result.errors[0]
def test_no_trial_directories(self, temp_dir, valid_metadata, valid_job_config):
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=None, # No trials
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert "No trial directories found" in result.errors[0]
def test_job_id_mismatch(
self,
temp_dir,
valid_metadata,
valid_job_config,
valid_trial_result,
valid_job_result,
):
# Create two trials with different job_ids (must be valid UUIDs)
job_id_1 = str(uuid4())
job_id_2 = str(uuid4())
trial1 = valid_trial_result.copy()
trial1["config"] = dict(valid_trial_result["config"])
trial1["config"]["job_id"] = job_id_1
trial2 = valid_trial_result.copy()
trial2["id"] = str(uuid4())
trial2["config"] = dict(valid_trial_result["config"])
trial2["config"]["job_id"] = job_id_2
job_result = valid_job_result.copy()
job_result["id"] = job_id_1
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=[trial1, trial2],
job_result=job_result,
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert "does not match job's id" in result.errors[0]
def test_yml_metadata_extension(
self,
temp_dir,
valid_metadata,
valid_job_config,
):
"""Test that .yml extension works too."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
trials = make_full_trial_set(job_id)
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
metadata_ext="yml",
)
result = validate_submission(submission_dir)
assert result.is_valid
def test_trial_with_optional_null_fields(
self, temp_dir, valid_metadata, valid_job_config
):
"""Test trial with optional fields as null."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
# Build a full trial set, but make the last trial have no optional fields
trials = make_full_trial_set(job_id)
# Replace the last trial with one that has no optional fields
trials[-1] = {
"id": str(uuid4()),
"task_name": "test-task",
"trial_name": "test-trial",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": trials[-1]["task_checksum"],
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "agent", "version": "1.0"},
# Optional fields not present (which is fine)
}
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert result.is_valid
# One trial has no verifier_result, so successful_trials should be total - 1
assert result.successful_trials == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK - 1
def test_accuracy_calculation(self, temp_dir, valid_metadata, valid_job_config):
"""Test accuracy is calculated correctly."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
# Start with full trial set (all reward=1.0)
trials = make_full_trial_set(job_id, reward=1.0)
total = len(trials)
# Replace the last 2 trials: one with reward=0 and one with no reward
trials[-2] = {
"id": str(uuid4()),
"task_name": "test",
"trial_name": "t",
"trial_uri": "file:///tmp/t",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": trials[-2]["task_checksum"],
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "a", "version": "1"},
"verifier_result": {"rewards": {"reward": 0.0}},
}
trials[-1] = {
"id": str(uuid4()),
"task_name": "test",
"trial_name": "t",
"trial_uri": "file:///tmp/t",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": trials[-1]["task_checksum"],
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "a", "version": "1"},
"verifier_result": None,
}
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert result.is_valid
assert result.trial_count == total
assert result.successful_trials == total - 2
assert result.accuracy == pytest.approx((total - 2) / total)
class TestTaskCoverageValidation:
"""Tests for task coverage and minimum trials per task validation."""
def _make_trial(self, job_id: str, task_checksum: str) -> dict:
"""Create a valid trial result with a specific task_checksum."""
return {
"id": str(uuid4()),
"task_name": "test-task",
"trial_name": "test-trial",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": task_checksum,
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "test-agent", "version": "1.0.0"},
"verifier_result": {"rewards": {"reward": 1.0}},
}
def test_minimum_trials_per_task(self, temp_dir, valid_metadata, valid_job_config):
"""A task with fewer than MIN_TRIALS_PER_TASK trials should fail."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
# Create 89 tasks: 88 with 5 trials, 1 with only 3 trials
trials = []
for i in range(88):
for _ in range(MIN_TRIALS_PER_TASK):
trials.append(self._make_trial(job_id, f"checksum_{i:03d}"))
# One task with only 3 trials
for _ in range(3):
trials.append(self._make_trial(job_id, "checksum_088"))
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert any("has only 3 trial(s)" in e for e in result.errors)
assert any("checksum_088" in e for e in result.errors)
def test_minimum_trials_per_task_passes(
self, temp_dir, valid_metadata, valid_job_config
):
"""A submission with exactly MIN_TRIALS_PER_TASK trials per task across all 89 tasks should pass."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
trials = []
for i in range(EXPECTED_TASK_COUNT):
for _ in range(MIN_TRIALS_PER_TASK):
trials.append(self._make_trial(job_id, f"checksum_{i:03d}"))
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert result.is_valid
assert result.trial_count == EXPECTED_TASK_COUNT * MIN_TRIALS_PER_TASK
def test_insufficient_unique_tasks(
self, temp_dir, valid_metadata, valid_job_config
):
"""A submission with fewer than EXPECTED_TASK_COUNT unique tasks should fail."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
# Only 10 unique tasks, each with 5 trials
trials = []
for i in range(10):
for _ in range(MIN_TRIALS_PER_TASK):
trials.append(self._make_trial(job_id, f"checksum_{i:03d}"))
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert any(
f"covers 10 unique task(s), expected {EXPECTED_TASK_COUNT}" in e
for e in result.errors
)
def test_multiple_tasks_mixed(self, temp_dir, valid_metadata, valid_job_config):
"""Only tasks with fewer than MIN_TRIALS_PER_TASK trials trigger per-task errors."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
# 89 tasks: 88 with 5 trials, 1 with only 2 trials
trials = []
for i in range(88):
for _ in range(MIN_TRIALS_PER_TASK):
trials.append(self._make_trial(job_id, f"checksum_{i:03d}"))
for _ in range(2):
trials.append(self._make_trial(job_id, "checksum_short"))
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
assert not result.is_valid
# Only the under-count task should trigger a per-task error
per_task_errors = [e for e in result.errors if "has only" in e]
assert len(per_task_errors) == 1
assert "checksum_short" in per_task_errors[0]
assert "has only 2 trial(s)" in per_task_errors[0]
class TestFormatValidationComment:
"""Tests for format_validation_comment function."""
def test_all_valid(self):
result = ValidationResult(
models=["gpt-4 (openai)"],
job_count=1,
trial_count=10,
successful_trials=8,
)
comment = format_validation_comment([("my-agent", result)])
assert "passed validation" in comment
assert "Ready to merge" in comment
assert "80.0%" in comment
assert "gpt-4 (openai)" in comment
def test_validation_failed(self):
result = ValidationResult(errors=["Error 1", "Error 2"])
comment = format_validation_comment([("my-agent", result)])
assert "Validation failed" in comment
assert "Error 1" in comment
assert "Error 2" in comment
assert "fix the errors" in comment
def test_truncates_errors(self):
errors = [f"Error {i}" for i in range(30)]
result = ValidationResult(errors=errors)
comment = format_validation_comment([("my-agent", result)])
assert "... and 10 more errors" in comment
def test_multiple_submissions(self):
result1 = ValidationResult(
models=["gpt-4 (openai)"], job_count=1, trial_count=5
)
result2 = ValidationResult(
models=["claude-3 (anthropic)"], job_count=2, trial_count=10
)
comment = format_validation_comment([("agent1", result1), ("agent2", result2)])
assert "agent1" in comment
assert "agent2" in comment
assert "gpt-4" in comment
assert "claude-3" in comment
# ============================================================================
# Tests: Import Functions
# ============================================================================
class TestJobExists:
"""Tests for job_exists function."""
@pytest.mark.asyncio
async def test_returns_true_when_job_exists(self):
mock_client = MagicMock()
mock_client.table.return_value.select.return_value.eq.return_value.execute = (
AsyncMock(return_value=MagicMock(data=[{"id": "123"}]))
)
assert await job_exists(mock_client, "123") is True
@pytest.mark.asyncio
async def test_returns_false_when_job_missing(self):
mock_client = MagicMock()
mock_client.table.return_value.select.return_value.eq.return_value.execute = (
AsyncMock(return_value=MagicMock(data=[]))
)
assert await job_exists(mock_client, "123") is False
class TestImportSubmission:
"""Tests for import_submission function."""
@pytest.mark.asyncio
@patch("app.get_supabase_client", new_callable=AsyncMock)
@patch("app.job_exists", new_callable=AsyncMock)
@patch("app.upsert_with_retry", new_callable=AsyncMock)
@patch("app.insert_ignore_conflicts", new_callable=AsyncMock)
async def test_skips_existing_job(
self,
mock_insert_ignore,
mock_upsert,
mock_job_exists,
mock_get_client,
temp_dir,
valid_metadata,
valid_job_config,
valid_trial_result,
valid_job_result,
):
"""Test that existing jobs are skipped."""
mock_client = AsyncMock()
mock_get_client.return_value = mock_client
mock_job_exists.return_value = True
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=[valid_trial_result],
job_result=valid_job_result,
)
metadata = SubmissionMetadata.model_validate(valid_metadata)
stats = await import_submission(submission_dir, metadata)
assert stats["jobs_imported"] == 0
assert stats["trials_imported"] == 0
mock_upsert.assert_not_called()
@pytest.mark.asyncio
@patch("app.get_supabase_client", new_callable=AsyncMock)
@patch("app.job_exists", new_callable=AsyncMock)
@patch("app.upsert_with_retry", new_callable=AsyncMock)
@patch("app.insert_ignore_conflicts", new_callable=AsyncMock)
@patch(
"app.upload_trial_to_storage",
new_callable=AsyncMock,
return_value="https://storage.example.com/trial.tar.gz",
)
async def test_imports_new_job(
self,
mock_upload,
mock_insert_ignore,
mock_upsert,
mock_job_exists,
mock_get_client,
temp_dir,
valid_metadata,
valid_job_config,
valid_trial_result,
valid_job_result,
):
"""Test that new jobs are imported."""
mock_client = AsyncMock()
mock_get_client.return_value = mock_client
mock_job_exists.return_value = False
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=[valid_trial_result],
job_result=valid_job_result,
)
metadata = SubmissionMetadata.model_validate(valid_metadata)
stats = await import_submission(submission_dir, metadata)
assert stats["jobs_imported"] == 1
assert stats["trials_imported"] == 1
assert mock_upsert.call_count >= 3 # job, trial, trial_model at minimum
@pytest.mark.asyncio
@patch("app.get_supabase_client", new_callable=AsyncMock)
@patch("app.job_exists", new_callable=AsyncMock)
@patch("app.upsert_with_retry", new_callable=AsyncMock)
@patch("app.insert_ignore_conflicts", new_callable=AsyncMock)
@patch(
"app.upload_trial_to_storage",
new_callable=AsyncMock,
return_value="https://storage.example.com/trial.tar.gz",
)
async def test_handles_trial_without_optional_fields(
self,
mock_upload,
mock_insert_ignore,
mock_upsert,
mock_job_exists,
mock_get_client,
temp_dir,
valid_metadata,
valid_job_config,
valid_job_result,
):
"""Test that trials without optional fields are handled correctly."""
mock_client = AsyncMock()
mock_get_client.return_value = mock_client
mock_job_exists.return_value = False
job_id = valid_job_result["id"]
# Minimal trial with only required fields
trial = {
"id": str(uuid4()),
"task_name": "test",
"trial_name": "test",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": "abc",
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "agent", "version": "1.0"},
# No optional fields
}
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=[trial],
job_result=valid_job_result,
)
metadata = SubmissionMetadata.model_validate(valid_metadata)
# This should not raise an error
stats = await import_submission(submission_dir, metadata)
assert stats["jobs_imported"] == 1
assert stats["trials_imported"] == 1
assert len(stats["errors"]) == 0
@pytest.mark.asyncio
@patch("app.get_supabase_client", new_callable=AsyncMock)
@patch("app.job_exists", new_callable=AsyncMock)
@patch("app.upsert_with_retry", new_callable=AsyncMock)
@patch("app.insert_ignore_conflicts", new_callable=AsyncMock)
async def test_parallel_uploads_get_correct_urls(
self,
mock_insert_ignore,
mock_upsert,
mock_job_exists,
mock_get_client,
temp_dir,
valid_metadata,
valid_job_config,
valid_job_result,
):
"""Test that multiple trials are uploaded in parallel and each gets the correct storage URL."""
mock_client = AsyncMock()
mock_get_client.return_value = mock_client
mock_job_exists.return_value = False
job_id = valid_job_result["id"]
trial_ids = [str(uuid4()) for _ in range(3)]
trials = []
for tid in trial_ids:
trials.append(
{
"id": tid,
"task_name": "test",
"trial_name": "test",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": "abc",
"config": {
"task": {"type": "local", "path": "/tmp/task"},
"job_id": job_id,
},
"agent_info": {"name": "agent", "version": "1.0"},
}
)
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=valid_job_result,
)
metadata = SubmissionMetadata.model_validate(valid_metadata)
# upload_trial_to_storage returns a URL based on the trial_id
async def fake_upload(client, trial_dir, trial_id):
return f"https://storage.example.com/{trial_id}.tar.gz"
with patch(
"app.upload_trial_to_storage", side_effect=fake_upload
) as mock_upload:
stats = await import_submission(submission_dir, metadata)
assert stats["jobs_imported"] == 1
assert stats["trials_imported"] == 3
assert len(stats["errors"]) == 0
# Verify upload was called for each trial
assert mock_upload.call_count == 3
uploaded_ids = {call.args[2] for call in mock_upload.call_args_list}
assert uploaded_ids == set(trial_ids)
# Verify each trial insert has the correct URL
# Find the upsert call for the "trial" table
trial_upsert_calls = [
call for call in mock_upsert.call_args_list if call.args[1] == "trial"
]
assert len(trial_upsert_calls) == 1
trial_data = trial_upsert_calls[0].args[2]
for t in trial_data:
expected_url = f"https://storage.example.com/{t['id']}.tar.gz"
assert t["trial_uri"] == expected_url
@pytest.mark.asyncio
@patch("app.get_supabase_client", new_callable=AsyncMock)
@patch("app.job_exists", new_callable=AsyncMock)
@patch("app.upsert_with_retry", new_callable=AsyncMock)
@patch("app.insert_ignore_conflicts", new_callable=AsyncMock)
async def test_handles_missing_job_id(
self,
mock_insert_ignore,
mock_upsert,
mock_job_exists,
mock_get_client,
temp_dir,
valid_metadata,
valid_job_config,
):
"""Test that missing job_id is handled as an error."""
mock_client = AsyncMock()
mock_get_client.return_value = mock_client
# Trial without job_id
trial = {
"id": str(uuid4()),
"task_name": "test",
"trial_name": "test",
"trial_uri": "file:///tmp/trial",
"task_id": {"type": "local", "path": "/tmp/task"},
"task_checksum": "abc",
"config": {
"task": {"type": "local", "path": "/tmp/task"},
# No job_id
},
"agent_info": {"name": "agent", "version": "1.0"},
}
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=[trial],
# No job_result either
)
metadata = SubmissionMetadata.model_validate(valid_metadata)
stats = await import_submission(submission_dir, metadata)
assert stats["jobs_imported"] == 0
assert "Could not find job_id" in stats["errors"][0]
# ============================================================================
# Tests: HF Hub Operations
# ============================================================================
class TestGetChangedSubmissionNames:
"""Tests for get_changed_submission_names function."""
@pytest.mark.asyncio
async def test_detects_new_folders(self):
"""Folders present in PR but not in main are detected."""
mock_api = MagicMock()
pr_folder = MagicMock(spec=RepoFolder)
pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4"
pr_folder.tree_id = "abc123"
mock_api.list_repo_tree.side_effect = [
[pr_folder], # PR revision
[], # main
]
result = await get_changed_submission_names(mock_api, "refs/pr/1")
assert result == ["agent1__gpt4"]
@pytest.mark.asyncio
async def test_detects_changed_folders(self):
"""Folders with different tree_ids are detected as changed."""
mock_api = MagicMock()
pr_folder = MagicMock(spec=RepoFolder)
pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4"
pr_folder.tree_id = "new_hash"
main_folder = MagicMock(spec=RepoFolder)
main_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4"
main_folder.tree_id = "old_hash"
mock_api.list_repo_tree.side_effect = [
[pr_folder], # PR revision
[main_folder], # main
]
result = await get_changed_submission_names(mock_api, "refs/pr/1")
assert result == ["agent1__gpt4"]
@pytest.mark.asyncio
async def test_ignores_unchanged_folders(self):
"""Folders with same tree_id in PR and main are not detected."""
mock_api = MagicMock()
pr_folder = MagicMock(spec=RepoFolder)
pr_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4"
pr_folder.tree_id = "same_hash"
main_folder = MagicMock(spec=RepoFolder)
main_folder.path = "submissions/terminal-bench/2.0/agent1__gpt4"
main_folder.tree_id = "same_hash"
mock_api.list_repo_tree.side_effect = [
[pr_folder],
[main_folder],
]
result = await get_changed_submission_names(mock_api, "refs/pr/1")
assert result == []
class TestGetNewJobIdsBySubmission:
"""Tests for get_new_job_ids_by_submission function."""
def test_extracts_job_ids(self):
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir) / "submissions" / "terminal-bench" / "2.0"
job_dir = base / "agent1__gpt4" / "job1"
job_dir.mkdir(parents=True)
(job_dir / "result.json").write_text(json.dumps({"id": "uuid-123"}))
result = get_new_job_ids_by_submission(Path(tmpdir))
assert result == {"agent1__gpt4": ["uuid-123"]}
def test_skips_missing_id(self):
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir) / "submissions" / "terminal-bench" / "2.0"
job_dir = base / "agent1__gpt4" / "job1"
job_dir.mkdir(parents=True)
(job_dir / "result.json").write_text(json.dumps({"no_id": True}))
result = get_new_job_ids_by_submission(Path(tmpdir))
assert result == {}
def test_empty_dir(self):
with tempfile.TemporaryDirectory() as tmpdir:
result = get_new_job_ids_by_submission(Path(tmpdir))
assert result == {}
class TestBatchCheckExistingJobs:
"""Tests for batch_check_existing_jobs function."""
@pytest.mark.asyncio
async def test_returns_existing_ids(self):
mock_client = MagicMock()
mock_client.table.return_value.select.return_value.in_.return_value.execute = (
AsyncMock(return_value=MagicMock(data=[{"id": "uuid-1"}, {"id": "uuid-3"}]))
)
result = await batch_check_existing_jobs(
mock_client, ["uuid-1", "uuid-2", "uuid-3"]
)
assert result == {"uuid-1", "uuid-3"}
@pytest.mark.asyncio
async def test_empty_input(self):
mock_client = MagicMock()
result = await batch_check_existing_jobs(mock_client, [])
assert result == set()
mock_client.table.assert_not_called()
# ============================================================================
# Tests: ValidationResult
# ============================================================================
class TestValidationResult:
"""Tests for ValidationResult dataclass."""
def test_is_valid_with_no_errors(self):
result = ValidationResult()
assert result.is_valid is True
def test_is_valid_with_errors(self):
result = ValidationResult(errors=["Error"])
assert result.is_valid is False
def test_accuracy_with_trials(self):
result = ValidationResult(trial_count=10, successful_trials=7)
assert result.accuracy == pytest.approx(0.7)
def test_accuracy_with_no_trials(self):
result = ValidationResult(trial_count=0, successful_trials=0)
assert result.accuracy is None
# ============================================================================
# Tests: Pydantic Models
# ============================================================================
class TestSubmissionMetadata:
"""Tests for SubmissionMetadata model."""
def test_valid_metadata(self, valid_metadata):
metadata = SubmissionMetadata.model_validate(valid_metadata)
assert metadata.agent_url == "https://example.com/agent"
assert len(metadata.models) == 1
assert metadata.models[0].model_name == "gpt-4"
def test_missing_agent_url(self, valid_metadata):
del valid_metadata["agent_url"]
with pytest.raises(Exception): # Pydantic validation error
SubmissionMetadata.model_validate(valid_metadata)
def test_missing_models(self, valid_metadata):
del valid_metadata["models"]
with pytest.raises(Exception):
SubmissionMetadata.model_validate(valid_metadata)
def test_optional_fields(self):
metadata = SubmissionMetadata.model_validate(
{
"agent_url": "https://example.com",
"models": [{"model_name": "gpt-4", "model_provider": "openai"}],
"agent_display_name": "My Agent",
"agent_org_display_name": "My Org",
}
)
assert metadata.agent_display_name == "My Agent"
assert metadata.agent_org_display_name == "My Org"
class TestModelMetadata:
"""Tests for ModelMetadata model."""
def test_valid_model(self):
model = ModelMetadata.model_validate(
{
"model_name": "gpt-4",
"model_provider": "openai",
}
)
assert model.model_name == "gpt-4"
assert model.model_provider == "openai"
def test_optional_fields(self):
model = ModelMetadata.model_validate(
{
"model_name": "gpt-4",
"model_provider": "openai",
"model_display_name": "GPT-4",
"model_org_display_name": "OpenAI",
}
)
assert model.model_display_name == "GPT-4"
# ============================================================================
# Integration Tests
# ============================================================================
class TestEndToEnd:
"""End-to-end integration tests."""
def test_full_validation_flow(
self,
temp_dir,
valid_metadata,
valid_job_config,
):
"""Test complete validation from directory to comment."""
job_id = str(uuid4())
job_result = {"id": job_id, "started_at": None, "finished_at": None}
trials = make_full_trial_set(job_id)
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=valid_job_config,
trial_results=trials,
job_result=job_result,
)
result = validate_submission(submission_dir)
comment = format_validation_comment([(submission_dir.name, result)])
assert result.is_valid
assert "passed validation" in comment
assert "100.0%" in comment # all successful
def test_validation_catches_config_errors(
self, temp_dir, valid_metadata, valid_trial_result, valid_job_result
):
"""Test that validation catches job config errors."""
job_config = {
"timeout_multiplier": 2.0, # Error
"agents": [{"override_timeout_sec": 100}], # Error
"environment": {"override_cpus": 4}, # Error
}
submission_dir = create_submission(
temp_dir,
metadata=valid_metadata,
job_config=job_config,
trial_results=[valid_trial_result],
job_result=valid_job_result,
)
result = validate_submission(submission_dir)
assert not result.is_valid
assert len(result.errors) >= 3 # At least 3 config errors
# Note: TestHarborModels class was removed because we no longer use JobConfig
# and TrialResult models for validation. This allows us to support custom
# environment types that aren't in Harbor's EnvironmentType enum.
# We still use JobResult model which doesn't have enum restrictions.