Spaces:
Running
Running
| """ | |
| API integration tests for the Helpdesk Ticket Routing OpenEnv server. | |
| Uses FastAPI's TestClient (via starlette) to test the live app without | |
| needing a running server. | |
| Run with: | |
| pytest meta-AIHack/tests/test_api_integration.py -v | |
| """ | |
| from __future__ import annotations | |
| import sys | |
| import os | |
| import types | |
| import unittest | |
| from typing import Any, Optional | |
| # Ensure the repo root (parent of tests/) is on sys.path. | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) | |
| # ----------------------------------------------------------------------- | |
| # Step 1: Install openenv type stubs BEFORE any openenv imports. | |
| # ----------------------------------------------------------------------- | |
| import openenv_test_stubs # noqa: F401 | |
| # ----------------------------------------------------------------------- | |
| # Step 2: Install the interfaces stub (Environment base class). | |
| # ----------------------------------------------------------------------- | |
| if "openenv.core.env_server.interfaces" not in sys.modules: | |
| _interfaces_mod = types.ModuleType("openenv.core.env_server.interfaces") | |
| class _Environment: | |
| """Minimal stub matching the openenv-core Environment base class.""" | |
| def __init__(self) -> None: | |
| pass | |
| def __init_subclass__(cls, **kwargs: object) -> None: | |
| super().__init_subclass__(**kwargs) | |
| def __class_getitem__(cls, item: object) -> type: | |
| return cls | |
| _interfaces_mod.Environment = _Environment # type: ignore[attr-defined] | |
| sys.modules["openenv.core.env_server.interfaces"] = _interfaces_mod | |
| # ----------------------------------------------------------------------- | |
| # Step 3: Install a create_app stub into openenv.core.env_server. | |
| # | |
| # The stub creates a real FastAPI app with the standard OpenEnv routes: | |
| # GET /health → {"status": "ok"} | |
| # POST /reset → calls env.reset(seed=..., task_id=...) → observation JSON | |
| # POST /step → calls env.step(action) → observation JSON | |
| # GET /state → calls env.state → state JSON | |
| # ----------------------------------------------------------------------- | |
| _env_server_mod = sys.modules["openenv.core.env_server"] | |
| if not hasattr(_env_server_mod, "create_app"): | |
| from fastapi import FastAPI, Request | |
| from pydantic import BaseModel | |
| # Define request models at module level so FastAPI/Pydantic can resolve them. | |
| class _ResetRequest(BaseModel): | |
| task_id: Optional[int] = 1 | |
| seed: Optional[int] = None | |
| def _create_app_stub(env_class, action_model, observation_model, env_name: str = ""): | |
| """ | |
| Stub for openenv.core.env_server.create_app. | |
| Returns a real FastAPI app with the standard OpenEnv routes wired up. | |
| The environment instance is shared across all requests within a session. | |
| """ | |
| _app = FastAPI(title=env_name) | |
| _env_instance = env_class() | |
| def health(): | |
| return {"status": "ok"} | |
| def reset(body: _ResetRequest): | |
| obs = _env_instance.reset(seed=body.seed, task_id=body.task_id) | |
| return obs.model_dump() | |
| async def step(request: Request): | |
| payload = await request.json() | |
| action = action_model.model_validate(payload) | |
| obs = _env_instance.step(action) | |
| return obs.model_dump() | |
| def state(): | |
| return _env_instance.state.model_dump() | |
| return _app | |
| _env_server_mod.create_app = _create_app_stub | |
| # ----------------------------------------------------------------------- | |
| # Now it is safe to import the app (which calls create_app internally). | |
| # ----------------------------------------------------------------------- | |
| from starlette.testclient import TestClient | |
| from server.app import app | |
| client = TestClient(app) | |
| # ----------------------------------------------------------------------- | |
| # Helper | |
| # ----------------------------------------------------------------------- | |
| def _reset(task_id: int = 1, seed: int = 42): | |
| return client.post("/reset", json={"task_id": task_id, "seed": seed}) | |
| # ----------------------------------------------------------------------- | |
| # Test classes | |
| # ----------------------------------------------------------------------- | |
| class TestHealthEndpoint(unittest.TestCase): | |
| """2.1.1 — GET /health returns HTTP 200 with {"status": "ok"}.""" | |
| def test_health_returns_200(self): | |
| resp = client.get("/health") | |
| self.assertEqual(resp.status_code, 200) | |
| def test_health_returns_ok_body(self): | |
| resp = client.get("/health") | |
| self.assertEqual(resp.json(), {"status": "ok"}) | |
| class TestTasksEndpoint(unittest.TestCase): | |
| """2.1.2 — GET /tasks returns HTTP 200 with exactly 3 tasks with IDs 1, 2, 3.""" | |
| def test_tasks_returns_200(self): | |
| resp = client.get("/tasks") | |
| self.assertEqual(resp.status_code, 200) | |
| def test_tasks_returns_exactly_3_tasks(self): | |
| resp = client.get("/tasks") | |
| data = resp.json() | |
| self.assertIn("tasks", data) | |
| self.assertEqual(len(data["tasks"]), 3) | |
| def test_tasks_have_ids_1_2_3(self): | |
| resp = client.get("/tasks") | |
| ids = {t["id"] for t in resp.json()["tasks"]} | |
| self.assertEqual(ids, {1, 2, 3}) | |
| class TestResetEndpoint(unittest.TestCase): | |
| """2.1.3 — POST /reset returns a valid observation JSON.""" | |
| def setUp(self): | |
| self.resp = _reset(task_id=1, seed=42) | |
| self.data = self.resp.json() | |
| def test_reset_returns_200(self): | |
| self.assertEqual(self.resp.status_code, 200) | |
| def test_reset_done_is_false(self): | |
| self.assertFalse(self.data["done"]) | |
| def test_reset_reward_is_null(self): | |
| self.assertIsNone(self.data["reward"]) | |
| def test_reset_rubric_reward_is_null(self): | |
| self.assertIsNone(self.data["rubric_reward"]) | |
| def test_reset_task_id_is_1(self): | |
| self.assertEqual(self.data["task_id"], 1) | |
| def test_reset_tickets_processed_is_0(self): | |
| self.assertEqual(self.data["tickets_processed"], 0) | |
| def test_reset_allowed_fields_non_empty(self): | |
| self.assertIsInstance(self.data["allowed_fields"], list) | |
| self.assertGreater(len(self.data["allowed_fields"]), 0) | |
| def test_reset_available_action_types_exposed(self): | |
| self.assertEqual(self.data["available_action_types"], ["submit", "investigate"]) | |
| def test_reset_progress_metrics_start_at_zero(self): | |
| self.assertEqual(self.data["average_score_so_far"], 0.0) | |
| self.assertEqual(self.data["progress_fraction"], 0.0) | |
| class TestStepEndpoint(unittest.TestCase): | |
| """2.1.4 — POST /step returns observation JSON with reward in [0.0, 1.0].""" | |
| def setUp(self): | |
| # Reset first so the environment is in a known state. | |
| _reset(task_id=1, seed=42) | |
| self.resp = client.post("/step", json={"issue_type": "billing_license"}) | |
| self.data = self.resp.json() | |
| def test_step_returns_200(self): | |
| self.assertEqual(self.resp.status_code, 200) | |
| def test_step_reward_is_float_in_unit_interval(self): | |
| reward = self.data["reward"] | |
| self.assertIsNotNone(reward) | |
| self.assertIsInstance(reward, float) | |
| self.assertGreaterEqual(reward, 0.0) | |
| self.assertLessEqual(reward, 1.0) | |
| def test_step_tickets_processed_is_1(self): | |
| self.assertEqual(self.data["tickets_processed"], 1) | |
| def test_step_metadata_exposes_last_feedback_summary(self): | |
| metadata = self.data.get("metadata", {}) | |
| self.assertIn("last_feedback_summary", metadata) | |
| self.assertIsInstance(metadata["last_feedback_summary"], str) | |
| self.assertTrue(metadata["last_feedback_summary"]) | |
| def test_step_history_entry_includes_feedback_summary(self): | |
| history = self.data.get("history", []) | |
| self.assertGreater(len(history), 0) | |
| self.assertIn("feedback_summary", history[-1]) | |
| self.assertIsInstance(history[-1]["feedback_summary"], str) | |
| self.assertTrue(history[-1]["feedback_summary"]) | |
| def test_step_exposes_structured_reward_components(self): | |
| self.assertIn("last_reward_components", self.data) | |
| self.assertIsInstance(self.data["last_reward_components"], dict) | |
| self.assertIn("ticket_score", self.data["last_reward_components"]) | |
| self.assertIn("final_reward", self.data["last_reward_components"]) | |
| self.assertEqual( | |
| self.data["metadata"].get("last_reward_components"), | |
| self.data["last_reward_components"], | |
| ) | |
| def test_step_progress_metrics_are_exposed(self): | |
| self.assertIn("average_score_so_far", self.data) | |
| self.assertIn("progress_fraction", self.data) | |
| self.assertGreaterEqual(self.data["progress_fraction"], 0.0) | |
| self.assertLessEqual(self.data["progress_fraction"], 1.0) | |
| class TestStateEndpoint(unittest.TestCase): | |
| """2.1.5 — GET /state returns current episode state JSON after a reset.""" | |
| def setUp(self): | |
| _reset(task_id=2, seed=7) | |
| self.resp = client.get("/state") | |
| self.data = self.resp.json() | |
| def test_state_returns_200(self): | |
| self.assertEqual(self.resp.status_code, 200) | |
| def test_state_current_task_id_is_2(self): | |
| self.assertEqual(self.data["current_task_id"], 2) | |
| def test_state_step_count_is_0(self): | |
| self.assertEqual(self.data["step_count"], 0) | |
| def test_state_queue_ticket_ids_non_empty(self): | |
| self.assertIsInstance(self.data["queue_ticket_ids"], list) | |
| self.assertGreater(len(self.data["queue_ticket_ids"]), 0) | |
| # ----------------------------------------------------------------------- | |
| # Task 4.1 — Full seeded episode and mid-episode state tests | |
| # ----------------------------------------------------------------------- | |
| class TestFullSeededEpisode(unittest.TestCase): | |
| """2.1.6 — One end-to-end seeded episode over HTTP completes all steps | |
| and returns a final trajectory reward in [0.0, 1.0]. | |
| Validates: Requirements 2.1.6 | |
| """ | |
| def test_full_episode_final_reward_in_unit_interval(self): | |
| """4.1.1 — reset → step loop until done → final trajectory reward in [0.0, 1.0].""" | |
| # Reset with a fixed seed for determinism. | |
| reset_resp = _reset(task_id=1, seed=42) | |
| self.assertEqual(reset_resp.status_code, 200) | |
| obs = reset_resp.json() | |
| self.assertFalse(obs["done"]) | |
| # Retrieve allowed_fields from the observation so we can build a valid action. | |
| allowed_fields = obs["allowed_fields"] | |
| self.assertGreater(len(allowed_fields), 0) | |
| final_reward = None | |
| max_steps = 20 # safety cap — queue is at most 5 tickets | |
| for _ in range(max_steps): | |
| # Build a minimal valid action using the first allowed field. | |
| action_payload: dict = {} | |
| if "issue_type" in allowed_fields: | |
| action_payload["issue_type"] = "general_inquiry" | |
| if "priority" in allowed_fields: | |
| action_payload["priority"] = "medium" | |
| if "assignment_group" in allowed_fields: | |
| action_payload["assignment_group"] = "service_desk" | |
| if "resolution_action" in allowed_fields: | |
| action_payload["resolution_action"] = "acknowledge" | |
| step_resp = client.post("/step", json=action_payload) | |
| self.assertEqual(step_resp.status_code, 200) | |
| obs = step_resp.json() | |
| reward = obs.get("reward") | |
| self.assertIsNotNone(reward) | |
| self.assertIsInstance(reward, float) | |
| self.assertGreaterEqual(reward, 0.0) | |
| self.assertLessEqual(reward, 1.0) | |
| if obs["done"]: | |
| final_reward = reward | |
| break | |
| self.assertIsNotNone(final_reward, "Episode did not complete within max_steps") | |
| self.assertGreaterEqual(final_reward, 0.0) | |
| self.assertLessEqual(final_reward, 1.0) | |
| def test_full_episode_terminal_rubric_reward_in_unit_interval(self): | |
| reset_resp = _reset(task_id=1, seed=42) | |
| self.assertEqual(reset_resp.status_code, 200) | |
| obs = reset_resp.json() | |
| allowed_fields = obs["allowed_fields"] | |
| final_rubric_reward = None | |
| for _ in range(20): | |
| action_payload: dict = {} | |
| if "issue_type" in allowed_fields: | |
| action_payload["issue_type"] = "general_inquiry" | |
| if "priority" in allowed_fields: | |
| action_payload["priority"] = "medium" | |
| if "assignment_group" in allowed_fields: | |
| action_payload["assignment_group"] = "service_desk" | |
| if "resolution_action" in allowed_fields: | |
| action_payload["resolution_action"] = "acknowledge" | |
| step_resp = client.post("/step", json=action_payload) | |
| self.assertEqual(step_resp.status_code, 200) | |
| obs = step_resp.json() | |
| if obs["done"]: | |
| final_rubric_reward = obs.get("rubric_reward") | |
| break | |
| self.assertIsNotNone( | |
| final_rubric_reward, "Terminal observation did not include rubric_reward" | |
| ) | |
| self.assertGreaterEqual(final_rubric_reward, 0.0) | |
| self.assertLessEqual(final_rubric_reward, 1.0) | |
| def test_full_episode_all_tasks_complete(self): | |
| """4.1.1 — Full seeded episode completes for each task ID (1, 2, 3).""" | |
| for task_id in (1, 2, 3): | |
| with self.subTest(task_id=task_id): | |
| reset_resp = _reset(task_id=task_id, seed=42) | |
| self.assertEqual(reset_resp.status_code, 200) | |
| obs = reset_resp.json() | |
| allowed_fields = obs["allowed_fields"] | |
| action_payload: dict = {} | |
| if "issue_type" in allowed_fields: | |
| action_payload["issue_type"] = "general_inquiry" | |
| if "priority" in allowed_fields: | |
| action_payload["priority"] = "medium" | |
| if "assignment_group" in allowed_fields: | |
| action_payload["assignment_group"] = "service_desk" | |
| if "resolution_action" in allowed_fields: | |
| action_payload["resolution_action"] = "acknowledge" | |
| completed = False | |
| for _ in range(20): | |
| step_resp = client.post("/step", json=action_payload) | |
| self.assertEqual(step_resp.status_code, 200) | |
| obs = step_resp.json() | |
| if obs["done"]: | |
| completed = True | |
| break | |
| self.assertTrue(completed, f"Task {task_id} episode did not complete") | |
| class TestStateMidEpisode(unittest.TestCase): | |
| """4.1.2 — GET /state reflects correct state mid-episode. | |
| After reset, step_count is 0. After one step, step_count increments to 1. | |
| Validates: Requirements 2.1.5 | |
| """ | |
| def test_state_step_count_is_0_after_reset(self): | |
| """step_count is 0 immediately after reset.""" | |
| _reset(task_id=1, seed=99) | |
| state_resp = client.get("/state") | |
| self.assertEqual(state_resp.status_code, 200) | |
| state = state_resp.json() | |
| self.assertEqual(state["step_count"], 0) | |
| def test_state_step_count_increments_after_step(self): | |
| """step_count increments from 0 to 1 after one step.""" | |
| _reset(task_id=1, seed=99) | |
| # Confirm step_count is 0 before stepping. | |
| state_before = client.get("/state").json() | |
| self.assertEqual(state_before["step_count"], 0) | |
| # Take one step. | |
| client.post("/step", json={"issue_type": "general_inquiry"}) | |
| # Confirm step_count is now 1. | |
| state_after = client.get("/state").json() | |
| self.assertEqual(state_after["step_count"], 1) | |
| def test_state_task_id_matches_reset(self): | |
| """current_task_id in state matches the task_id used in reset.""" | |
| for task_id in (1, 2, 3): | |
| with self.subTest(task_id=task_id): | |
| _reset(task_id=task_id, seed=42) | |
| state = client.get("/state").json() | |
| self.assertEqual(state["current_task_id"], task_id) | |
| # ----------------------------------------------------------------------- | |
| # Task 4.2 — Heuristic inference regression check | |
| # ----------------------------------------------------------------------- | |
| class TestHeuristicInferenceRegression(unittest.TestCase): | |
| """2.2 — Heuristic inference regression: all 3 tasks complete without error | |
| and overall average reward is in [0.8, 1.0]. | |
| This test drives the inference loop directly against the TestClient app, | |
| using the same heuristic_action logic as inference.py but routing HTTP | |
| calls through the in-process TestClient instead of a live server. | |
| Validates: Requirements 2.2.1, 2.2.2 | |
| """ | |
| # Import heuristic helpers from inference.py at class level so they are | |
| # available without a live server. | |
| def setUpClass(cls): | |
| import sys | |
| import os | |
| import types as _types | |
| # Ensure the repo root is on sys.path so inference.py is importable. | |
| repo_root = os.path.join(os.path.dirname(__file__), "..") | |
| if repo_root not in sys.path: | |
| sys.path.insert(0, repo_root) | |
| # The test stubs only cover openenv.core.env_server. inference.py | |
| # imports client.py which needs openenv.core.env_client. Install a | |
| # minimal stub so the import succeeds without a live openenv install. | |
| if "openenv.core.env_client" not in sys.modules: | |
| _ec_mod = _types.ModuleType("openenv.core.env_client") | |
| class _StepResult: | |
| def __init__(self, observation=None, reward=None, done=False): | |
| self.observation = observation | |
| self.reward = reward | |
| self.done = done | |
| class _EnvClient: | |
| def __class_getitem__(cls, item): | |
| return cls | |
| _ec_mod.EnvClient = _EnvClient # type: ignore[attr-defined] | |
| _ec_mod.StepResult = _StepResult # type: ignore[attr-defined] | |
| sys.modules["openenv.core.env_client"] = _ec_mod | |
| import inference as _inf | |
| cls._heuristic_action = staticmethod(_inf.heuristic_action) | |
| cls._SEED = _inf.SEED | |
| cls._TASKS = list(_inf.TASK_IDS) | |
| def _run_heuristic_episode(self, task_id: int) -> float: | |
| """Run one full heuristic episode for the given task_id via TestClient. | |
| Returns the final trajectory reward. | |
| """ | |
| reset_resp = client.post("/reset", json={"task_id": task_id, "seed": self._SEED}) | |
| self.assertEqual(reset_resp.status_code, 200, f"reset failed for task {task_id}") | |
| obs = reset_resp.json() | |
| self.assertFalse(obs["done"]) | |
| allowed_fields: list = obs["allowed_fields"] | |
| final_reward = 0.0 | |
| for _ in range(20): # safety cap | |
| ticket = obs.get("current_ticket") | |
| if ticket is None: | |
| break | |
| action_dict = self._heuristic_action(ticket, allowed_fields) | |
| step_resp = client.post("/step", json=action_dict) | |
| self.assertEqual(step_resp.status_code, 200, f"step failed for task {task_id}") | |
| obs = step_resp.json() | |
| reward = obs.get("reward") | |
| self.assertIsNotNone(reward) | |
| self.assertIsInstance(reward, float) | |
| self.assertGreaterEqual(reward, 0.0) | |
| self.assertLessEqual(reward, 1.0) | |
| if obs["done"]: | |
| final_reward = float(reward) | |
| break | |
| return final_reward | |
| def test_all_tasks_complete_without_error(self): | |
| """4.2.1 — All 3 tasks complete without raising an exception.""" | |
| for task_id in self._TASKS: | |
| with self.subTest(task_id=task_id): | |
| # Should not raise. | |
| reward = self._run_heuristic_episode(task_id) | |
| self.assertIsInstance(reward, float) | |
| def test_overall_average_reward_in_expected_range(self): | |
| """4.2.2 — Overall average reward across all 3 tasks stays in a healthy | |
| smoke-test range for the plain no-investigation heuristic baseline. | |
| """ | |
| rewards = [] | |
| for task_id in self._TASKS: | |
| reward = self._run_heuristic_episode(task_id) | |
| rewards.append(reward) | |
| self.assertEqual(len(rewards), 3, "Expected rewards for all 3 tasks") | |
| overall_avg = sum(rewards) / len(rewards) | |
| self.assertGreaterEqual( | |
| overall_avg, | |
| 0.25, | |
| f"Overall average reward {overall_avg:.4f} is below the smoke-test floor of 0.25", | |
| ) | |
| self.assertLessEqual( | |
| overall_avg, | |
| 1.0, | |
| f"Overall average reward {overall_avg:.4f} exceeds 1.0", | |
| ) | |
| if __name__ == "__main__": | |
| unittest.main() | |