Spaces:
Running
Running
| """Tests for battery feature (benchmark test suite execution). | |
| Per ADR-006: BatteryRunner is orchestration layer. Tests mock MCP tools, not HTTP. | |
| """ | |
| import asyncio | |
| import json | |
| import pytest | |
| from pathlib import Path | |
| from unittest.mock import AsyncMock, patch | |
| from prompt_prix.benchmarks.base import BenchmarkCase | |
| from prompt_prix.benchmarks.custom import CustomJSONLoader | |
| from prompt_prix.battery import RunStatus, RunResult, BatteryRun, BatteryRunner | |
| # ───────────────────────────────────────────────────────────────────── | |
| # TEST DATA | |
| # ───────────────────────────────────────────────────────────────────── | |
| SAMPLE_BENCHMARK_JSON = { | |
| "test_suite": "test_battery", | |
| "version": "1.0", | |
| "prompts": [ | |
| { | |
| "id": "test_1", | |
| "name": "Test One", | |
| "category": "basic", | |
| "system": "You are a helpful assistant.", | |
| "user": "What is 2 + 2?", | |
| }, | |
| { | |
| "id": "test_2", | |
| "name": "Test Two", | |
| "category": "basic", | |
| "system": "You are a math tutor.", | |
| "user": "What is 3 + 3?", | |
| }, | |
| ] | |
| } | |
| # ───────────────────────────────────────────────────────────────────── | |
| # FIXTURES | |
| # ───────────────────────────────────────────────────────────────────── | |
| def sample_test_cases(): | |
| """Create sample BenchmarkCase objects.""" | |
| return [ | |
| BenchmarkCase( | |
| id="test_1", | |
| name="Test One", | |
| category="basic", | |
| system="You are a helpful assistant.", | |
| user="What is 2 + 2?" | |
| ), | |
| BenchmarkCase( | |
| id="test_2", | |
| name="Test Two", | |
| category="basic", | |
| system="You are a math tutor.", | |
| user="What is 3 + 3?" | |
| ), | |
| ] | |
| def sample_benchmark_file(tmp_path): | |
| """Create a temporary benchmark JSON file.""" | |
| file_path = tmp_path / "benchmark.json" | |
| file_path.write_text(json.dumps(SAMPLE_BENCHMARK_JSON)) | |
| return file_path | |
| def invalid_benchmark_file(tmp_path): | |
| """Create an invalid benchmark JSON file (missing prompts).""" | |
| file_path = tmp_path / "invalid.json" | |
| file_path.write_text(json.dumps({"version": "1.0"})) | |
| return file_path | |
| def malformed_json_file(tmp_path): | |
| """Create a malformed JSON file.""" | |
| file_path = tmp_path / "malformed.json" | |
| file_path.write_text("{ not valid json }") | |
| return file_path | |
| # ───────────────────────────────────────────────────────────────────── | |
| # TESTCASE MODEL TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestBenchmarkCase: | |
| """Tests for BenchmarkCase Pydantic model.""" | |
| def test_create_basic_test_case(self): | |
| """Test creating a basic test case.""" | |
| tc = BenchmarkCase( | |
| id="basic_test", | |
| user="Hello world" | |
| ) | |
| assert tc.id == "basic_test" | |
| assert tc.user == "Hello world" | |
| assert tc.system == "You are a helpful assistant." | |
| def test_create_full_test_case(self): | |
| """Test creating a test case with all fields.""" | |
| tc = BenchmarkCase( | |
| id="full_test", | |
| name="Full Test", | |
| category="advanced", | |
| severity="critical", | |
| system="You are a code assistant.", | |
| user="Write a function", | |
| tools=[{"type": "function", "function": {"name": "test"}}], | |
| tool_choice="required" | |
| ) | |
| assert tc.id == "full_test" | |
| assert tc.name == "Full Test" | |
| assert tc.category == "advanced" | |
| assert tc.tools is not None | |
| assert len(tc.tools) == 1 | |
| def test_empty_id_fails(self): | |
| """Test that empty id raises ValueError.""" | |
| with pytest.raises(ValueError, match="cannot be empty"): | |
| BenchmarkCase(id="", user="Hello") | |
| def test_empty_user_fails(self): | |
| """Test that empty user message raises ValueError.""" | |
| with pytest.raises(ValueError, match="cannot be empty"): | |
| BenchmarkCase(id="test", user="") | |
| def test_whitespace_only_id_fails(self): | |
| """Test that whitespace-only id raises ValueError.""" | |
| with pytest.raises(ValueError, match="cannot be empty"): | |
| BenchmarkCase(id=" ", user="Hello") | |
| def test_to_messages(self): | |
| """Test converting BenchmarkCase to OpenAI messages format.""" | |
| tc = BenchmarkCase( | |
| id="test", | |
| system="Be helpful", | |
| user="What time is it?" | |
| ) | |
| messages = tc.to_messages() | |
| assert len(messages) == 2 | |
| assert messages[0]["role"] == "system" | |
| assert messages[0]["content"] == "Be helpful" | |
| assert messages[1]["role"] == "user" | |
| assert messages[1]["content"] == "What time is it?" | |
| def test_to_messages_with_multi_turn_history(self): | |
| """Pre-defined messages array takes precedence over system+user.""" | |
| history = [ | |
| {"role": "system", "content": "You are a file assistant."}, | |
| {"role": "user", "content": "List the directory"}, | |
| {"role": "assistant", "content": "list_directory('./sort_test')"}, | |
| {"role": "user", "content": "[FILE] 1.txt\n[FILE] 2.txt"}, | |
| {"role": "assistant", "content": "read_file('./sort_test/1.txt')"}, | |
| {"role": "user", "content": "The zebra is a striped animal."}, | |
| {"role": "user", "content": "Now move 1.txt to animals/"}, | |
| ] | |
| tc = BenchmarkCase( | |
| id="multi_turn", | |
| user="Now move 1.txt to animals/", | |
| messages=history, | |
| ) | |
| messages = tc.to_messages() | |
| assert messages == history | |
| assert len(messages) == 7 | |
| def test_to_messages_without_messages_field_unchanged(self): | |
| """Without messages field, to_messages() returns system+user (backward compat).""" | |
| tc = BenchmarkCase(id="single", user="Hello", system="Be helpful") | |
| messages = tc.to_messages() | |
| assert len(messages) == 2 | |
| assert messages[0] == {"role": "system", "content": "Be helpful"} | |
| assert messages[1] == {"role": "user", "content": "Hello"} | |
| assert tc.messages is None | |
| def test_to_messages_returns_copy(self): | |
| """to_messages() returns a copy, not the original list.""" | |
| history = [ | |
| {"role": "user", "content": "Hello"}, | |
| {"role": "assistant", "content": "Hi"}, | |
| {"role": "user", "content": "Bye"}, | |
| ] | |
| tc = BenchmarkCase(id="copy_test", user="Bye", messages=history) | |
| result = tc.to_messages() | |
| assert result == history | |
| assert result is not history # Must be a copy | |
| def test_display_name_uses_name_if_set(self): | |
| """Test display_name property.""" | |
| tc = BenchmarkCase(id="test_id", name="Display Name", user="Hello") | |
| assert tc.display_name == "Display Name" | |
| def test_display_name_uses_id_if_no_name(self): | |
| """Test display_name falls back to id.""" | |
| tc = BenchmarkCase(id="test_id", user="Hello") | |
| assert tc.display_name == "test_id" | |
| def test_react_mode_fields(self): | |
| """BenchmarkCase accepts mode, mock_tools, max_iterations for ReAct.""" | |
| mock_tools = { | |
| "read_file": {"./1.txt": "Content about animals"}, | |
| "move_file": {"_default": "File moved"}, | |
| } | |
| tc = BenchmarkCase( | |
| id="react_test", | |
| user="Organize these files", | |
| mode="react", | |
| mock_tools=mock_tools, | |
| max_iterations=20, | |
| tools=[{"type": "function", "function": {"name": "read_file"}}], | |
| ) | |
| assert tc.mode == "react" | |
| assert tc.mock_tools == mock_tools | |
| assert tc.max_iterations == 20 | |
| def test_react_mode_defaults(self): | |
| """mode=None and max_iterations=15 by default (backward compat).""" | |
| tc = BenchmarkCase(id="basic", user="Hello") | |
| assert tc.mode is None | |
| assert tc.mock_tools is None | |
| assert tc.max_iterations == 15 | |
| def test_react_mode_via_json_loader(self, tmp_path): | |
| """JSON loader round-trips react fields through BenchmarkCase.""" | |
| import json | |
| battery_file = tmp_path / "react_battery.json" | |
| battery_file.write_text(json.dumps({ | |
| "prompts": [{ | |
| "id": "react_1", | |
| "user": "Organize files", | |
| "mode": "react", | |
| "mock_tools": { | |
| "list_directory": {"./sort_test": "[FILE] 1.txt"}, | |
| "read_file": {"./sort_test/1.txt": "Zebras are animals"}, | |
| }, | |
| "max_iterations": 10, | |
| "tools": [{"type": "function", "function": {"name": "list_directory"}}], | |
| "expected_response": "Files organized", | |
| }] | |
| })) | |
| from prompt_prix.benchmarks.custom import CustomJSONLoader | |
| cases = CustomJSONLoader.load(battery_file) | |
| assert len(cases) == 1 | |
| tc = cases[0] | |
| assert tc.mode == "react" | |
| assert tc.mock_tools["read_file"]["./sort_test/1.txt"] == "Zebras are animals" | |
| assert tc.max_iterations == 10 | |
| # ───────────────────────────────────────────────────────────────────── | |
| # CUSTOMJSONLOADER TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestCustomJSONLoader: | |
| """Tests for CustomJSONLoader.""" | |
| def test_load_valid_file(self, sample_benchmark_file): | |
| """Test loading a valid benchmark file.""" | |
| cases = CustomJSONLoader.load(sample_benchmark_file) | |
| assert len(cases) == 2 | |
| assert cases[0].id == "test_1" | |
| assert cases[1].id == "test_2" | |
| def test_load_missing_file(self, tmp_path): | |
| """Test loading a non-existent file raises FileNotFoundError.""" | |
| with pytest.raises(FileNotFoundError): | |
| CustomJSONLoader.load(tmp_path / "nonexistent.json") | |
| def test_load_missing_prompts_key(self, invalid_benchmark_file): | |
| """Test loading file without prompts key raises ValueError.""" | |
| with pytest.raises(ValueError, match="missing required 'prompts' key"): | |
| CustomJSONLoader.load(invalid_benchmark_file) | |
| def test_load_malformed_json(self, malformed_json_file): | |
| """Test loading malformed JSON raises JSONDecodeError.""" | |
| with pytest.raises(json.JSONDecodeError): | |
| CustomJSONLoader.load(malformed_json_file) | |
| def test_load_empty_prompts_array(self, tmp_path): | |
| """Test loading file with empty prompts array raises ValueError.""" | |
| file_path = tmp_path / "empty.json" | |
| file_path.write_text(json.dumps({"prompts": []})) | |
| with pytest.raises(ValueError, match="cannot be empty"): | |
| CustomJSONLoader.load(file_path) | |
| def test_validate_valid_file(self, sample_benchmark_file): | |
| """Test validate returns success for valid file.""" | |
| valid, message = CustomJSONLoader.validate(sample_benchmark_file) | |
| assert valid is True | |
| assert "2 tests" in message | |
| assert "1 categories" in message | |
| def test_validate_invalid_file(self, invalid_benchmark_file): | |
| """Test validate returns failure for invalid file.""" | |
| valid, message = CustomJSONLoader.validate(invalid_benchmark_file) | |
| assert valid is False | |
| assert "❌" in message | |
| def test_load_jsonl_file(self, tmp_path): | |
| """Test loading JSONL format (one test per line).""" | |
| file_path = tmp_path / "tests.jsonl" | |
| jsonl_content = '{"id": "test_1", "user": "Hello"}\n{"id": "test_2", "user": "World"}\n' | |
| file_path.write_text(jsonl_content) | |
| cases = CustomJSONLoader.load(file_path) | |
| assert len(cases) == 2 | |
| assert cases[0].id == "test_1" | |
| assert cases[1].id == "test_2" | |
| def test_load_jsonl_autodetect(self, tmp_path): | |
| """Test auto-detecting JSONL format from .json file.""" | |
| file_path = tmp_path / "tests.json" | |
| # Multiple JSON objects on separate lines = auto-detect as JSONL | |
| jsonl_content = '{"id": "test_1", "user": "Hello"}\n{"id": "test_2", "user": "World"}' | |
| file_path.write_text(jsonl_content) | |
| cases = CustomJSONLoader.load(file_path) | |
| assert len(cases) == 2 | |
| def test_load_jsonl_with_empty_lines(self, tmp_path): | |
| """Test JSONL with empty lines skipped.""" | |
| file_path = tmp_path / "tests.jsonl" | |
| jsonl_content = '{"id": "test_1", "user": "Hello"}\n\n{"id": "test_2", "user": "World"}\n' | |
| file_path.write_text(jsonl_content) | |
| cases = CustomJSONLoader.load(file_path) | |
| assert len(cases) == 2 | |
| # ───────────────────────────────────────────────────────────────────── | |
| # TEST RESULT TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestRunResult: | |
| """Tests for RunResult model.""" | |
| def test_create_pending_result(self): | |
| """Test creating a pending result.""" | |
| result = RunResult(test_id="t1", model_id="m1") | |
| assert result.status == RunStatus.PENDING | |
| assert result.status_symbol == "—" | |
| def test_status_symbols(self): | |
| """Test status symbols for all states.""" | |
| assert RunResult(test_id="t", model_id="m", status=RunStatus.PENDING).status_symbol == "—" | |
| assert RunResult(test_id="t", model_id="m", status=RunStatus.RUNNING).status_symbol == "⏳" | |
| assert RunResult(test_id="t", model_id="m", status=RunStatus.COMPLETED).status_symbol == "✓" | |
| assert RunResult(test_id="t", model_id="m", status=RunStatus.SEMANTIC_FAILURE).status_symbol == "❌" | |
| assert RunResult(test_id="t", model_id="m", status=RunStatus.ERROR).status_symbol == "⚠" | |
| # ───────────────────────────────────────────────────────────────────── | |
| # BATTERY RUN TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestBatteryRun: | |
| """Tests for BatteryRun state model.""" | |
| def test_create_empty_battery_run(self): | |
| """Test creating an empty battery run.""" | |
| run = BatteryRun(tests=["t1", "t2"], models=["m1", "m2"]) | |
| assert len(run.tests) == 2 | |
| assert len(run.models) == 2 | |
| assert len(run.results) == 0 | |
| def test_get_set_result(self): | |
| """Test getting and setting results.""" | |
| run = BatteryRun(tests=["t1"], models=["m1"]) | |
| result = RunResult(test_id="t1", model_id="m1", status=RunStatus.COMPLETED) | |
| run.set_result(result) | |
| retrieved = run.get_result("t1", "m1") | |
| assert retrieved is not None | |
| assert retrieved.status == RunStatus.COMPLETED | |
| def test_to_grid(self): | |
| """Test converting to DataFrame format.""" | |
| run = BatteryRun(tests=["t1", "t2"], models=["m1", "m2"]) | |
| run.set_result(RunResult(test_id="t1", model_id="m1", status=RunStatus.COMPLETED)) | |
| run.set_result(RunResult(test_id="t1", model_id="m2", status=RunStatus.ERROR)) | |
| df = run.to_grid() | |
| assert list(df.columns) == ["Test", "m1", "m2"] | |
| assert list(df.iloc[0]) == ["t1", "✓", "⚠"] # t1 results (ERROR=⚠) | |
| assert list(df.iloc[1]) == ["t2", "—", "—"] # t2 pending | |
| def test_to_grid_latency_mode(self): | |
| """Test grid with latency display mode.""" | |
| from prompt_prix.battery import GridDisplayMode | |
| run = BatteryRun(tests=["t1", "t2"], models=["m1", "m2"]) | |
| run.set_result(RunResult( | |
| test_id="t1", model_id="m1", | |
| status=RunStatus.COMPLETED, latency_ms=1500.0 | |
| )) | |
| run.set_result(RunResult( | |
| test_id="t1", model_id="m2", | |
| status=RunStatus.ERROR, latency_ms=2500.0 | |
| )) | |
| df = run.to_grid(GridDisplayMode.LATENCY) | |
| assert list(df.columns) == ["Test", "m1", "m2"] | |
| assert list(df.iloc[0]) == ["t1", "1.5s", "2.5s"] # t1 latencies | |
| assert list(df.iloc[1]) == ["t2", "—", "—"] # t2 pending | |
| def test_progress_tracking(self): | |
| """Test progress calculation.""" | |
| run = BatteryRun(tests=["t1", "t2"], models=["m1"]) | |
| assert run.total_count == 2 | |
| assert run.completed_count == 0 | |
| assert run.progress_percent == 0.0 | |
| run.set_result(RunResult(test_id="t1", model_id="m1", status=RunStatus.COMPLETED)) | |
| assert run.completed_count == 1 | |
| assert run.progress_percent == 50.0 | |
| # ───────────────────────────────────────────────────────────────────── | |
| # BATTERY RUNNER TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestBatteryRunner: | |
| """Tests for BatteryRunner orchestrator. | |
| Per ADR-006: BatteryRunner is orchestration layer - it calls MCP tools, | |
| doesn't know about servers or adapters. Tests mock the MCP layer. | |
| """ | |
| async def test_run_completes_all_tests_via_mcp(self, sample_test_cases): | |
| """Test that runner completes all tests BY CALLING MCP complete_stream. | |
| This test mocks the MCP layer, not HTTP. If BatteryRunner bypasses MCP | |
| and calls core.stream_completion directly, this test will FAIL. | |
| """ | |
| from unittest.mock import patch | |
| models = ["model_a", "model_b"] | |
| # Track calls to MCP complete_stream | |
| mcp_calls = [] | |
| async def mock_complete_stream(**kwargs): | |
| mcp_calls.append(kwargs) | |
| yield "Test response" | |
| # Patch at the MCP layer - BatteryRunner SHOULD call this | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream): | |
| runner = BatteryRunner( | |
| tests=sample_test_cases, | |
| models=models | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # BatteryRunner must call MCP complete_stream for each (test, model) | |
| assert len(mcp_calls) == 4, ( | |
| f"Expected 4 calls to MCP complete_stream (2 tests × 2 models), " | |
| f"got {len(mcp_calls)}. BatteryRunner is bypassing MCP layer." | |
| ) | |
| assert final_state is not None | |
| assert final_state.completed_count == 4 | |
| async def test_run_handles_errors_via_mcp(self, sample_test_cases): | |
| """Test that runner handles MCP-layer errors gracefully. | |
| This test mocks the MCP layer. If BatteryRunner bypasses MCP, | |
| this test will FAIL. | |
| Note: BatteryRunner has retry logic for transient errors, so | |
| LMStudioError will be retried. We verify final status, not call counts. | |
| """ | |
| from unittest.mock import patch | |
| from prompt_prix.core import LMStudioError | |
| models = ["model_a", "model_b"] | |
| mcp_calls = [] | |
| async def mock_complete_stream(**kwargs): | |
| mcp_calls.append(kwargs) | |
| model_id = kwargs.get("model_id", "") | |
| if model_id == "model_b": | |
| raise LMStudioError("Connection failed") | |
| yield "Success" | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream): | |
| runner = BatteryRunner( | |
| tests=sample_test_cases[:1], # Just one test | |
| models=models | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Must have called MCP for both models (model_b will have retries) | |
| model_a_calls = [c for c in mcp_calls if c.get("model_id") == "model_a"] | |
| model_b_calls = [c for c in mcp_calls if c.get("model_id") == "model_b"] | |
| assert len(model_a_calls) == 1, "Expected 1 call for model_a" | |
| assert len(model_b_calls) >= 1, "Expected at least 1 call for model_b (plus retries)" | |
| # Check model_a succeeded | |
| result_a = final_state.get_result("test_1", "model_a") | |
| assert result_a.status == RunStatus.COMPLETED | |
| # Check model_b errored after retries exhausted | |
| result_b = final_state.get_result("test_1", "model_b") | |
| assert result_b.status == RunStatus.ERROR | |
| async def test_run_yields_state_updates_via_mcp(self, sample_test_cases): | |
| """Test that runner yields state updates for UI. | |
| This test mocks the MCP layer. If BatteryRunner bypasses MCP, | |
| this test will FAIL. | |
| """ | |
| from unittest.mock import patch | |
| models = ["m1"] | |
| mcp_calls = [] | |
| async def mock_complete_stream(**kwargs): | |
| mcp_calls.append(kwargs) | |
| yield "Response" | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream): | |
| runner = BatteryRunner( | |
| tests=sample_test_cases[:1], | |
| models=models | |
| ) | |
| state_count = 0 | |
| async for state in runner.run(): | |
| state_count += 1 | |
| # Must have called MCP | |
| assert len(mcp_calls) == 1, ( | |
| f"Expected 1 call to MCP complete_stream, got {len(mcp_calls)}. " | |
| "BatteryRunner is bypassing MCP layer." | |
| ) | |
| # Should yield multiple times for progress tracking | |
| assert state_count >= 2 | |
| async def test_run_records_latency_via_mcp(self, sample_test_cases): | |
| """Test that runner records latency for completed tests. | |
| This test mocks the MCP layer. If BatteryRunner bypasses MCP, | |
| this test will FAIL. | |
| """ | |
| from unittest.mock import patch | |
| import asyncio | |
| models = ["m1"] | |
| mcp_calls = [] | |
| async def mock_complete_stream(**kwargs): | |
| mcp_calls.append(kwargs) | |
| await asyncio.sleep(0.01) # Small delay for latency measurement | |
| yield "Response" | |
| yield "__LATENCY_MS__:10.5" # Latency sentinel from adapter | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream): | |
| runner = BatteryRunner( | |
| tests=sample_test_cases[:1], | |
| models=models | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Must have called MCP | |
| assert len(mcp_calls) == 1, ( | |
| f"Expected 1 call to MCP complete_stream, got {len(mcp_calls)}. " | |
| "BatteryRunner is bypassing MCP layer." | |
| ) | |
| result = final_state.get_result("test_1", "m1") | |
| assert result.latency_ms is not None | |
| assert result.latency_ms == 10.5 # From latency sentinel | |
| # ───────────────────────────────────────────────────────────────────── | |
| # BATTERY HANDLER TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestBatteryExport: | |
| """Tests for battery export handlers.""" | |
| def test_export_json_no_results(self): | |
| """Test export returns error when no battery run exists.""" | |
| from prompt_prix import state | |
| from prompt_prix.tabs.battery.handlers import export_json | |
| state.battery_run = None | |
| status, file_update = export_json() | |
| assert "❌" in status | |
| # Returns gr.update(visible=False, value=None) | |
| assert file_update["visible"] is False | |
| assert file_update["value"] is None | |
| def test_export_json_with_results(self): | |
| """Test export creates file with results.""" | |
| import os | |
| from prompt_prix import state | |
| from prompt_prix.tabs.battery.handlers import export_json | |
| # Setup battery run with results | |
| run = BatteryRun(tests=["t1", "t2"], models=["m1"]) | |
| run.set_result(RunResult( | |
| test_id="t1", model_id="m1", | |
| status=RunStatus.COMPLETED, | |
| response="Test response", | |
| latency_ms=1234.5 | |
| )) | |
| run.set_result(RunResult( | |
| test_id="t2", model_id="m1", | |
| status=RunStatus.ERROR, | |
| error="Test error" | |
| )) | |
| state.battery_run = run | |
| status, file_update = export_json() | |
| assert "✅" in status | |
| # Returns gr.update(visible=False, value=filepath) - triggers auto-download | |
| assert file_update["visible"] is False | |
| filepath = file_update["value"] | |
| assert filepath is not None | |
| assert os.path.exists(filepath) | |
| # Verify file contents | |
| with open(filepath) as f: | |
| data = json.load(f) | |
| assert data["tests"] == ["t1", "t2"] | |
| assert data["models"] == ["m1"] | |
| assert len(data["results"]) == 2 | |
| # Verify failure_reason field is present | |
| assert "failure_reason" in data["results"][0] | |
| # Cleanup | |
| state.battery_run = None | |
| def test_export_csv_with_results(self): | |
| """Test CSV export creates file with results.""" | |
| import os | |
| from prompt_prix import state | |
| from prompt_prix.tabs.battery.handlers import export_csv | |
| run = BatteryRun(tests=["t1"], models=["m1"]) | |
| run.set_result(RunResult( | |
| test_id="t1", model_id="m1", | |
| status=RunStatus.COMPLETED, | |
| response="Hello\nWorld", # Test newline handling | |
| latency_ms=500.0 | |
| )) | |
| state.battery_run = run | |
| status, file_update = export_csv() | |
| assert "✅" in status | |
| # Returns gr.update(visible=False, value=filepath) - triggers auto-download | |
| assert file_update["visible"] is False | |
| filepath = file_update["value"] | |
| assert filepath is not None | |
| assert filepath.endswith(".csv") | |
| with open(filepath) as f: | |
| content = f.read() | |
| # CSV header now includes error and failure_reason columns | |
| assert "test_id" in content | |
| assert "failure_reason" in content | |
| assert "t1" in content | |
| assert "m1" in content | |
| assert "500" in content | |
| state.battery_run = None | |
| def test_export_basename_from_source_file(self): | |
| """Test export filename derives from source file with timestamp.""" | |
| import re | |
| from prompt_prix import state | |
| from prompt_prix.tabs.battery.handlers import _get_export_basename | |
| state.battery_source_file = "/path/to/my_test_suite.jsonl" | |
| basename = _get_export_basename() | |
| # Basename should be {stem}_results_{timestamp} | |
| assert re.match(r"my_test_suite_results_\d+$", basename) | |
| state.battery_source_file = None | |
| basename = _get_export_basename() | |
| assert re.match(r"battery_results_\d+$", basename) | |
| class TestBatteryStateClearing: | |
| """Tests for state clearing when file changes.""" | |
| def test_state_cleared_on_file_upload(self): | |
| """Test that battery_run is cleared when new file is uploaded.""" | |
| from prompt_prix import state | |
| # Setup existing battery run | |
| state.battery_run = BatteryRun(tests=["old"], models=["old"]) | |
| state.battery_source_file = "/old/file.json" | |
| # Simulate file change (what on_battery_file_change does) | |
| state.battery_run = None | |
| state.battery_source_file = "/new/file.json" | |
| assert state.battery_run is None | |
| assert state.battery_source_file == "/new/file.json" | |
| def test_state_cleared_on_file_removal(self): | |
| """Test that state is cleared when file is removed.""" | |
| from prompt_prix import state | |
| state.battery_run = BatteryRun(tests=["test"], models=["model"]) | |
| state.battery_source_file = "/some/file.json" | |
| # Simulate file removal | |
| state.battery_run = None | |
| state.battery_source_file = None | |
| assert state.battery_run is None | |
| assert state.battery_source_file is None | |
| class TestCooperativeCancellation: | |
| """Tests for cooperative cancellation via state.should_stop().""" | |
| def test_stop_flag_default_false(self): | |
| """Test stop flag starts as False.""" | |
| from prompt_prix import state | |
| state.clear_stop() | |
| assert state.should_stop() is False | |
| def test_request_stop_sets_flag(self): | |
| """Test request_stop sets the flag.""" | |
| from prompt_prix import state | |
| state.clear_stop() | |
| state.request_stop() | |
| assert state.should_stop() is True | |
| def test_clear_stop_resets_flag(self): | |
| """Test clear_stop resets the flag.""" | |
| from prompt_prix import state | |
| state.request_stop() | |
| assert state.should_stop() is True | |
| state.clear_stop() | |
| assert state.should_stop() is False | |
| # ───────────────────────────────────────────────────────────────────── | |
| # SEMANTIC VALIDATION INTEGRATION TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestBatterySemanticValidation: | |
| """Tests verifying BatteryRunner integrates semantic validation.""" | |
| async def test_refusal_with_required_tools_is_semantic_failure(self): | |
| """A refusal when tool_choice='required' must be SEMANTIC_FAILURE. | |
| If a model says "I'm sorry, but I cannot..." when tools were required, | |
| that's a semantic failure - the model completed HTTP but failed the task. | |
| """ | |
| from unittest.mock import patch | |
| models = ["model_a"] | |
| # MCP returns a refusal (no tool call made) | |
| refusal_text = "I'm sorry, but I cannot execute scripts or delete files." | |
| async def mock_complete_stream(**kwargs): | |
| yield refusal_text | |
| # Create test with tool_choice="required" | |
| test_with_tools = BenchmarkCase( | |
| id="tool_test", | |
| user="Delete the file report.pdf", | |
| tools=[{ | |
| "type": "function", | |
| "function": { | |
| "name": "delete_file", | |
| "description": "Delete a file", | |
| "parameters": { | |
| "type": "object", | |
| "properties": {"path": {"type": "string"}}, | |
| "required": ["path"] | |
| } | |
| } | |
| }], | |
| tool_choice="required" | |
| ) | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream): | |
| runner = BatteryRunner( | |
| tests=[test_with_tools], | |
| models=models | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| result = final_state.get_result("tool_test", "model_a") | |
| # Must be SEMANTIC_FAILURE, not COMPLETED | |
| assert result.status == RunStatus.SEMANTIC_FAILURE, ( | |
| f"Expected SEMANTIC_FAILURE for refusal with tool_choice='required', " | |
| f"got {result.status}. Response: {result.response}" | |
| ) | |
| # ───────────────────────────────────────────────────────────────────── | |
| # PIPELINED JUDGING TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestPipelinedJudging: | |
| """Tests for pipelined judge execution during inference. | |
| When judge_model is set, judge tasks are submitted eagerly as inference | |
| results complete — not batched after all inference finishes. | |
| """ | |
| def tests_with_criteria(self): | |
| """BenchmarkCases that have pass_criteria (trigger judging).""" | |
| return [ | |
| BenchmarkCase( | |
| id="judged_1", | |
| user="What is 2 + 2?", | |
| pass_criteria="Answer must contain the number 4", | |
| ), | |
| BenchmarkCase( | |
| id="judged_2", | |
| user="What is 3 + 3?", | |
| pass_criteria="Answer must contain the number 6", | |
| ), | |
| ] | |
| async def test_pipelined_all_results_judged(self, tests_with_criteria): | |
| """All COMPLETED results with criteria get judged in pipelined mode.""" | |
| async def mock_complete_stream(**kwargs): | |
| yield "The answer is 4" | |
| async def mock_judge(**kwargs): | |
| return {"pass": True, "reason": "Correct", "score": 10} | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge", side_effect=mock_judge): | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria, | |
| models=["model_a"], | |
| judge_model="judge-model", | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Both results should be COMPLETED with judge verdicts | |
| for test_id in ["judged_1", "judged_2"]: | |
| result = final_state.get_result(test_id, "model_a") | |
| assert result.status == RunStatus.COMPLETED | |
| assert result.judge_result is not None | |
| assert result.judge_result["pass"] is True | |
| assert result.judge_latency_ms is not None | |
| async def test_pipelined_judge_fail_downgrades_status(self, tests_with_criteria): | |
| """Judge failure downgrades COMPLETED to SEMANTIC_FAILURE.""" | |
| async def mock_complete_stream(**kwargs): | |
| yield "I don't know" | |
| async def mock_judge(**kwargs): | |
| return {"pass": False, "reason": "Missing expected number", "score": 0} | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge", side_effect=mock_judge): | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria[:1], | |
| models=["model_a"], | |
| judge_model="judge-model", | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| result = final_state.get_result("judged_1", "model_a") | |
| assert result.status == RunStatus.SEMANTIC_FAILURE | |
| assert result.failure_reason == "Missing expected number" | |
| async def test_no_judge_uses_inference_only(self, tests_with_criteria): | |
| """Without judge_model, only inference runs (no _execute_pipelined).""" | |
| mcp_calls = [] | |
| async def mock_complete_stream(**kwargs): | |
| mcp_calls.append(kwargs["model_id"]) | |
| yield "Response" | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge") as mock_judge: | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria, | |
| models=["model_a"], | |
| # No judge_model | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Inference happened | |
| assert len(mcp_calls) == 2 | |
| # Judge was never called | |
| mock_judge.assert_not_called() | |
| # Phase stays as inference | |
| assert final_state.phase == "inference" | |
| async def test_pipelined_skips_failed_results(self, tests_with_criteria): | |
| """Results that fail semantic validation in inference are not judged.""" | |
| call_count = {"judge": 0} | |
| async def mock_complete_stream(**kwargs): | |
| model_id = kwargs.get("model_id", "") | |
| if model_id == "model_fail": | |
| raise Exception("Connection failed") | |
| yield "The answer is 4" | |
| async def mock_judge(**kwargs): | |
| call_count["judge"] += 1 | |
| return {"pass": True, "reason": "OK", "score": 10} | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge", side_effect=mock_judge): | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria[:1], | |
| models=["model_ok", "model_fail"], | |
| judge_model="judge-model", | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Only the successful model's result gets judged | |
| assert call_count["judge"] == 1 | |
| result_ok = final_state.get_result("judged_1", "model_ok") | |
| assert result_ok.judge_result is not None | |
| result_fail = final_state.get_result("judged_1", "model_fail") | |
| assert result_fail.status == RunStatus.ERROR | |
| assert result_fail.judge_result is None | |
| async def test_pipelined_judge_total_increments_during_inference(self, tests_with_criteria): | |
| """judge_total grows as inference results complete (not set all at once).""" | |
| judge_totals_observed = [] | |
| async def mock_complete_stream(**kwargs): | |
| yield "The answer is 4" | |
| async def mock_judge(**kwargs): | |
| await asyncio.sleep(0.05) # Small delay so we can observe state | |
| return {"pass": True, "reason": "OK", "score": 10} | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge", side_effect=mock_judge): | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria, | |
| models=["model_a"], | |
| judge_model="judge-model", | |
| ) | |
| async for state in runner.run(): | |
| if state.judge_total > 0: | |
| judge_totals_observed.append(state.judge_total) | |
| # judge_total should have been observed at least once | |
| assert len(judge_totals_observed) > 0 | |
| # Final judge_total should match number of results that needed judging | |
| assert judge_totals_observed[-1] == 2 | |
| async def test_pipelined_phase_transition(self, tests_with_criteria): | |
| """Phase transitions from 'inference' to 'judging' when inference done.""" | |
| phases_observed = [] | |
| async def mock_complete_stream(**kwargs): | |
| yield "The answer is 4" | |
| async def mock_judge(**kwargs): | |
| await asyncio.sleep(0.1) # Judge takes longer than inference | |
| return {"pass": True, "reason": "OK", "score": 10} | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_complete_stream), \ | |
| patch("prompt_prix.battery.judge", side_effect=mock_judge): | |
| runner = BatteryRunner( | |
| tests=tests_with_criteria, | |
| models=["model_a"], | |
| judge_model="judge-model", | |
| ) | |
| async for state in runner.run(): | |
| phases_observed.append(state.phase) | |
| # Should have seen both phases | |
| assert "inference" in phases_observed | |
| # If judge tasks outlast inference, we see "judging" phase | |
| # (this depends on timing — judge sleeps 0.1s, inference is instant) | |
| # ───────────────────────────────────────────────────────────────────── | |
| # REACT-AS-ATOM: UNIFIED PIPELINE TESTS | |
| # ───────────────────────────────────────────────────────────────────── | |
| class TestReactAsAtom: | |
| """Tests proving react tests flow through BatteryRunner as standard RunResults. | |
| React-as-atom: a react loop is an execution detail, not an orchestration path. | |
| BatteryRunner has zero mode awareness — dispatch handles mode internally. | |
| """ | |
| def react_test(self): | |
| """A react-mode BenchmarkCase.""" | |
| return BenchmarkCase( | |
| id="react_categorize", | |
| user="Organize the files", | |
| mode="react", | |
| system="You are a file organizer.", | |
| mock_tools={ | |
| "read_file": {"./1.txt": "Content about animals"}, | |
| "move_file": {"_default": "File moved"}, | |
| }, | |
| tools=[ | |
| {"type": "function", "function": {"name": "read_file"}}, | |
| {"type": "function", "function": {"name": "move_file"}}, | |
| ], | |
| max_iterations=10, | |
| ) | |
| def mixed_tests(self, react_test): | |
| """Mixed battery: standard + react tests in one file.""" | |
| return [ | |
| BenchmarkCase(id="simple_math", user="What is 2+2?"), | |
| react_test, | |
| ] | |
| async def test_react_completed_produces_run_result(self, react_test): | |
| """Completed react loop produces a standard RunResult with react_trace.""" | |
| from prompt_prix.react.schemas import ReActIteration, ToolCall | |
| step_count = {"n": 0} | |
| async def mock_step(**kwargs): | |
| step_count["n"] += 1 | |
| if step_count["n"] <= 2: | |
| return { | |
| "completed": False, | |
| "final_response": None, | |
| "new_iterations": [ | |
| ReActIteration( | |
| iteration=1, | |
| tool_call=ToolCall( | |
| id=f"call_{step_count['n']}", | |
| name="read_file", | |
| args={"path": f"./file{step_count['n']}.txt"}, | |
| ), | |
| observation="mock data", | |
| success=True, | |
| latency_ms=50.0, | |
| ) | |
| ], | |
| "call_counter": step_count["n"], | |
| "latency_ms": 50.0, | |
| } | |
| return { | |
| "completed": True, | |
| "final_response": "All files organized.", | |
| "new_iterations": [], | |
| "call_counter": step_count["n"], | |
| "latency_ms": 30.0, | |
| } | |
| with patch("prompt_prix.react.dispatch.react_step", side_effect=mock_step): | |
| runner = BatteryRunner(tests=[react_test], models=["model_a"]) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| result = final_state.get_result("react_categorize", "model_a") | |
| assert result.status == RunStatus.COMPLETED | |
| assert result.response == "All files organized." | |
| assert result.react_trace is not None | |
| assert result.react_trace["completed"] is True | |
| assert result.react_trace["total_iterations"] == 2 | |
| async def test_react_incomplete_is_semantic_failure(self, react_test): | |
| """React loop hitting max_iterations produces SEMANTIC_FAILURE.""" | |
| step_count = {"n": 0} | |
| async def mock_step(**kwargs): | |
| step_count["n"] += 1 | |
| return { | |
| "completed": False, | |
| "final_response": None, | |
| "new_iterations": [ | |
| __import__("prompt_prix.react.schemas", fromlist=["ReActIteration"]).ReActIteration( | |
| iteration=1, | |
| tool_call=__import__("prompt_prix.react.schemas", fromlist=["ToolCall"]).ToolCall( | |
| id=f"call_{step_count['n']}", | |
| name="read_file", | |
| args={"path": f"./unique_{step_count['n']}.txt"}, | |
| ), | |
| observation="data", | |
| success=True, | |
| latency_ms=50.0, | |
| ) | |
| ], | |
| "call_counter": step_count["n"], | |
| "latency_ms": 50.0, | |
| } | |
| with patch("prompt_prix.react.dispatch.react_step", side_effect=mock_step): | |
| runner = BatteryRunner(tests=[react_test], models=["model_a"]) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| result = final_state.get_result("react_categorize", "model_a") | |
| assert result.status == RunStatus.SEMANTIC_FAILURE | |
| assert "max_iterations" in result.failure_reason | |
| assert result.react_trace is not None | |
| assert result.react_trace["termination_reason"] == "max_iterations" | |
| async def test_mixed_battery_unified_grid(self, mixed_tests): | |
| """Standard and react tests appear in the same grid.""" | |
| from prompt_prix.react.schemas import ReActIteration, ToolCall | |
| async def mock_stream(**kwargs): | |
| yield "The answer is 4." | |
| yield "__LATENCY_MS__:100" | |
| async def mock_step(**kwargs): | |
| return { | |
| "completed": True, | |
| "final_response": "Files organized.", | |
| "new_iterations": [], | |
| "call_counter": 0, | |
| "latency_ms": 50.0, | |
| } | |
| with patch("prompt_prix.react.dispatch.complete_stream", side_effect=mock_stream), \ | |
| patch("prompt_prix.react.dispatch.react_step", side_effect=mock_step): | |
| runner = BatteryRunner(tests=mixed_tests, models=["model_a"]) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| # Both tests in the same grid | |
| assert final_state.completed_count == 2 | |
| assert final_state.total_count == 2 | |
| # Standard test: no react_trace | |
| simple = final_state.get_result("simple_math", "model_a") | |
| assert simple.status == RunStatus.COMPLETED | |
| assert simple.react_trace is None | |
| # React test: has react_trace | |
| react = final_state.get_result("react_categorize", "model_a") | |
| assert react.status == RunStatus.COMPLETED | |
| assert react.react_trace is not None | |
| assert react.react_trace["completed"] is True | |
| async def test_react_with_drift_validation(self): | |
| """React test with expected_response gets drift validation on final_response.""" | |
| test = BenchmarkCase( | |
| id="react_drift", | |
| user="Organize files", | |
| mode="react", | |
| mock_tools={"read_file": {"_default": "data"}}, | |
| tools=[{"type": "function", "function": {"name": "read_file"}}], | |
| expected_response="Files organized into animals and fruits", | |
| max_iterations=10, | |
| ) | |
| async def mock_step(**kwargs): | |
| return { | |
| "completed": True, | |
| "final_response": "Something completely different about weather.", | |
| "new_iterations": [], | |
| "call_counter": 0, | |
| "latency_ms": 50.0, | |
| } | |
| async def mock_drift(response, expected): | |
| return 0.8 # High drift | |
| with patch("prompt_prix.react.dispatch.react_step", side_effect=mock_step), \ | |
| patch("prompt_prix.mcp.tools.drift.calculate_drift", side_effect=mock_drift): | |
| runner = BatteryRunner( | |
| tests=[test], models=["model_a"], | |
| drift_threshold=0.3, | |
| ) | |
| final_state = None | |
| async for state in runner.run(): | |
| final_state = state | |
| result = final_state.get_result("react_drift", "model_a") | |
| assert result.status == RunStatus.SEMANTIC_FAILURE | |
| assert "Drift" in result.failure_reason | |
| assert result.drift_score == 0.8 | |
| assert result.react_trace is not None # Trace preserved even on drift failure | |