""" Tests for the helpdesk-competitive-upgrade spec (Task 9). Covers: 9.1 test_inference_single_task_mode 9.2 test_state_has_reward_and_done 9.3 test_history_has_title_and_predicted 9.4 test_milestone_reward_shaping 9.5 test_trajectory_reward_no_overshoot 9.6 test_ambiguity_note_in_observation 9.7 test_dataset_nondefault_routing 9.9 test_concurrent_sessions_flag 9.10 test_web_ui_endpoint Run with: pytest tests/test_competitive_upgrade.py """ from __future__ import annotations import os import sys import types as _types import unittest # Ensure repo root is on sys.path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import openenv_test_stubs # noqa: F401 — must come before any openenv imports # Patch in the interfaces module so environment.py can import Environment. if "openenv.core.env_server.interfaces" not in sys.modules: _interfaces_mod = _types.ModuleType("openenv.core.env_server.interfaces") class _Environment: """Minimal stub matching the openenv-core Environment base class.""" def __init__(self) -> None: pass def __init_subclass__(cls, **kwargs: object) -> None: super().__init_subclass__(**kwargs) @classmethod def __class_getitem__(cls, item: object) -> type: return cls _interfaces_mod.Environment = _Environment # type: ignore[attr-defined] sys.modules["openenv.core.env_server.interfaces"] = _interfaces_mod from models import HelpdeskTicketAction, HelpdeskTicketObservation, HelpdeskTicketState from server.environment import HelpdeskTicketRoutingEnvironment from server.reward import compute_step_reward, compute_trajectory_reward from server.tasks import get_task_definition, load_dataset from vocabulary import ISSUE_TYPES, PRIORITIES, ASSIGNMENT_GROUPS, RESOLUTION_ACTIONS, TASK_IDS # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_env() -> HelpdeskTicketRoutingEnvironment: return HelpdeskTicketRoutingEnvironment() def _heuristic_action(obs: HelpdeskTicketObservation) -> HelpdeskTicketAction: allowed = obs.allowed_fields kwargs: dict = {} if "issue_type" in allowed: kwargs["issue_type"] = ISSUE_TYPES[0] if "priority" in allowed: kwargs["priority"] = PRIORITIES[0] if "assignment_group" in allowed: kwargs["assignment_group"] = ASSIGNMENT_GROUPS[0] if "resolution_action" in allowed: kwargs["resolution_action"] = RESOLUTION_ACTIONS[0] return HelpdeskTicketAction(**kwargs) # --------------------------------------------------------------------------- # 9.1 — Inference single-task mode # --------------------------------------------------------------------------- def _get_tasks_to_run_impl( task_id_env: str | None, available_tasks: dict, run_all_tasks: bool = False, ) -> list[int]: """ Standalone re-implementation of inference.get_tasks_to_run() logic for testing. This mirrors the logic in inference.py without importing the full module (which has heavy dependencies like openai, httpx, and client.py). """ if task_id_env: try: task_id = int(task_id_env) except ValueError: raise SystemExit(1) if task_id not in available_tasks: raise SystemExit(1) return [task_id] if not available_tasks: return [] if run_all_tasks: return sorted(available_tasks) return sorted(available_tasks) class TestInferenceSingleTaskMode(unittest.TestCase): """9.1 — get_tasks_to_run() respects TASK_ID env var.""" def test_task_id_set_to_valid_id_returns_single_element_list(self) -> None: available = {1: {}, 2: {}, 3: {}} result = _get_tasks_to_run_impl("1", available) self.assertEqual(result, [1]) def test_task_id_set_to_unavailable_id_exits(self) -> None: available = {1: {}, 2: {}, 3: {}} with self.assertRaises(SystemExit): _get_tasks_to_run_impl("999", available) def test_task_id_unset_defaults_to_all_available_tasks(self) -> None: available = {1: {}, 2: {}, 3: {}} result = _get_tasks_to_run_impl(None, available) self.assertEqual(result, [1, 2, 3]) def test_run_all_tasks_override_returns_all_task_ids(self) -> None: available = {1: {}, 2: {}, 3: {}} result = _get_tasks_to_run_impl(None, available, run_all_tasks=True) self.assertEqual(sorted(result), sorted(list(TASK_IDS))) def test_task_id_set_to_2_returns_only_task_2(self) -> None: available = {1: {}, 2: {}, 3: {}} result = _get_tasks_to_run_impl("2", available) self.assertEqual(result, [2]) def test_task_id_set_to_3_returns_only_task_3(self) -> None: available = {1: {}, 2: {}, 3: {}} result = _get_tasks_to_run_impl("3", available) self.assertEqual(result, [3]) # --------------------------------------------------------------------------- # 9.2 — State has last_step_reward and done after step() # --------------------------------------------------------------------------- class TestStateHasRewardAndDone(unittest.TestCase): """9.2 — state.last_step_reward and state.done are set after step().""" def test_last_step_reward_is_none_after_reset(self) -> None: env = _make_env() env.reset(seed=42, task_id=1) self.assertIsNone(env.state.last_step_reward) def test_done_is_false_after_reset(self) -> None: env = _make_env() env.reset(seed=42, task_id=1) self.assertFalse(env.state.done) def test_last_step_reward_set_after_step(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) action = _heuristic_action(obs) env.step(action) state = env.state self.assertIsNotNone(state.last_step_reward) self.assertGreaterEqual(state.last_step_reward, 0.0) self.assertLessEqual(state.last_step_reward, 1.0) def test_done_is_true_after_last_ticket(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) while not obs.done: obs = env.step(_heuristic_action(obs)) self.assertTrue(env.state.done) def test_done_is_false_before_last_ticket(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) if obs.queue_size > 1: obs = env.step(_heuristic_action(obs)) self.assertFalse(env.state.done) def test_state_tracks_average_score_and_reward_components(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) env.step(_heuristic_action(obs)) state = env.state self.assertGreaterEqual(state.average_score_so_far, 0.0) self.assertLessEqual(state.average_score_so_far, 1.0) self.assertIsInstance(state.last_reward_components, dict) self.assertIn("final_reward", state.last_reward_components) # --------------------------------------------------------------------------- # 9.3 — History entry contains title and predicted # --------------------------------------------------------------------------- class TestHistoryHasTitleAndPredicted(unittest.TestCase): """9.3 — observation.history[0] contains 'title' and 'predicted' keys.""" def test_history_entry_has_title(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) action = _heuristic_action(obs) obs2 = env.step(action) self.assertEqual(len(obs2.history), 1) self.assertIn("title", obs2.history[0]) self.assertIsInstance(obs2.history[0]["title"], str) self.assertTrue(obs2.history[0]["title"]) # non-empty def test_history_entry_has_predicted(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) action = _heuristic_action(obs) obs2 = env.step(action) self.assertIn("predicted", obs2.history[0]) self.assertIsInstance(obs2.history[0]["predicted"], dict) def test_history_predicted_matches_action(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) action = _heuristic_action(obs) obs2 = env.step(action) predicted = obs2.history[0]["predicted"] action_dict = action.model_dump(exclude_none=True) self.assertEqual(predicted, action_dict) def test_history_entry_has_ticket_id_and_score(self) -> None: env = _make_env() obs = env.reset(seed=42, task_id=1) obs2 = env.step(_heuristic_action(obs)) entry = obs2.history[0] self.assertIn("ticket_id", entry) self.assertIn("score", entry) # --------------------------------------------------------------------------- # 9.4 — Milestone reward shaping # --------------------------------------------------------------------------- class TestMilestoneRewardShaping(unittest.TestCase): """9.4 — compute_step_reward applies bonus at high scores, penalty at low scores.""" def test_high_score_gets_bonus(self) -> None: # score=0.9 >= 0.8 threshold → base=0.9, bonus=0.05 → 0.95 result = compute_step_reward(0.9, previous_average=0.9) self.assertAlmostEqual(result, 0.95, places=9) def test_low_score_gets_penalty(self) -> None: # score=0.1 < 0.2 threshold → base=0.1, penalty=0.05 → 0.05 result = compute_step_reward(0.1, previous_average=0.1) self.assertAlmostEqual(result, 0.05, places=9) def test_mid_score_is_neutral(self) -> None: # score=0.5 is in [0.2, 0.8) → no shaping → 0.5 result = compute_step_reward(0.5, previous_average=0.5) self.assertAlmostEqual(result, 0.5, places=9) def test_boundary_high_threshold_gets_bonus(self) -> None: # score=0.8 exactly → bonus applies → 0.85 result = compute_step_reward(0.8, previous_average=0.8) self.assertAlmostEqual(result, 0.85, places=9) def test_boundary_low_threshold_is_neutral(self) -> None: # score=0.2 exactly → not < 0.2, so neutral → 0.2 result = compute_step_reward(0.2, previous_average=0.2) self.assertAlmostEqual(result, 0.2, places=9) def test_reward_clamped_to_unit_interval(self) -> None: # score=1.0 → base=1.0, bonus would push to 1.05 → clamped to 1.0 result = compute_step_reward(1.0) self.assertLessEqual(result, 1.0) self.assertGreaterEqual(result, 0.0) def test_improvement_delta_adds_small_bonus(self) -> None: improved = compute_step_reward(0.7, previous_average=0.2) flat = compute_step_reward(0.7, previous_average=0.7) self.assertGreater(improved, flat) def test_zero_score_clamped_to_zero(self) -> None: # score=0.0 < 0.2 → base=0.0, penalty → max(0.0, -0.05) = 0.0 result = compute_step_reward(0.0) self.assertGreaterEqual(result, 0.0) # --------------------------------------------------------------------------- # 9.5 — Trajectory reward has no overshoot penalty # --------------------------------------------------------------------------- class TestTrajectoryRewardNoOvershoot(unittest.TestCase): """9.5 — compute_trajectory_reward does not penalise when steps > queue_size.""" def test_no_penalty_when_steps_exceed_queue_size(self) -> None: scores = [0.8, 0.9, 0.7] queue_size = 3 steps_taken = 10 # more steps than queue_size result = compute_trajectory_reward(scores, queue_size, steps_taken) expected_avg = sum(scores) / len(scores) self.assertAlmostEqual(result, expected_avg, places=9) def test_result_equals_average_regardless_of_steps(self) -> None: scores = [0.5, 0.6] for steps in [1, 2, 5, 100]: result = compute_trajectory_reward(scores, len(scores), steps) self.assertAlmostEqual(result, 0.55, places=9, msg=f"Failed for steps={steps}") def test_empty_scores_returns_zero(self) -> None: self.assertEqual(compute_trajectory_reward([], 3, 3), 0.0) def test_result_in_unit_interval(self) -> None: scores = [0.9, 1.0, 0.95] result = compute_trajectory_reward(scores, 3, 3) self.assertGreaterEqual(result, 0.0) self.assertLessEqual(result, 1.0) # --------------------------------------------------------------------------- # 9.6 — ambiguity_note appears in current_ticket observation # --------------------------------------------------------------------------- class TestAmbiguityNoteInObservation(unittest.TestCase): """9.6 — current_ticket includes ambiguity_note when the ticket has one.""" def _find_seed_with_ambiguity_note(self, task_id: int = 3) -> int | None: """Try seeds 0..999 to find one where the first ticket has ambiguity_note.""" env = _make_env() for seed in range(1000): obs = env.reset(seed=seed, task_id=task_id) if obs.current_ticket and obs.current_ticket.get("ambiguity_note"): return seed return None def test_ambiguity_note_hidden_until_internal_note_lookup(self) -> None: """Force a ticket with ambiguity_note by patching the dataset.""" from unittest.mock import patch from server.tasks import load_dataset dataset = load_dataset() # Find a ticket with ambiguity_note ambiguous_tickets = [t for t in dataset if t.ambiguity_note is not None] self.assertGreater(len(ambiguous_tickets), 0, "No tickets with ambiguity_note in dataset") target = ambiguous_tickets[0] env = _make_env() # Patch the dataset to only contain the ambiguous ticket with patch.object(env, "_dataset", [target]): obs = env.reset(seed=0, task_id=3) self.assertIsNotNone(obs.current_ticket) self.assertNotIn("ambiguity_note", obs.current_ticket) self.assertIn("context_status", obs.current_ticket) self.assertTrue(obs.current_ticket["context_status"]["hidden_context_remaining"]) self.assertGreater(obs.current_ticket["context_status"]["context_gap_count"], 0) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_internal_routing_note", ) ) self.assertEqual(obs.current_ticket["ambiguity_note"], target.ambiguity_note) self.assertGreater(obs.reward or 0.0, 0.0) def test_ambiguity_note_absent_when_ticket_has_none(self) -> None: """Tickets without ambiguity_note should not expose the key.""" from unittest.mock import patch from server.tasks import load_dataset dataset = load_dataset() non_ambiguous = [t for t in dataset if t.ambiguity_note is None] self.assertGreater(len(non_ambiguous), 0) target = non_ambiguous[0] env = _make_env() with patch.object(env, "_dataset", [target]): obs = env.reset(seed=0, task_id=3) self.assertIsNotNone(obs.current_ticket) self.assertNotIn("ambiguity_note", obs.current_ticket) def test_tkt_nondefault_001_has_ambiguity_note(self) -> None: """TKT-NONDEFAULT-001 specifically has ambiguity_note set.""" from unittest.mock import patch from server.tasks import load_dataset dataset = load_dataset() ticket = next((t for t in dataset if t.ticket_id == "TKT-NONDEFAULT-001"), None) self.assertIsNotNone(ticket, "TKT-NONDEFAULT-001 not found in dataset") self.assertIsNotNone(ticket.ambiguity_note) env = _make_env() with patch.object(env, "_dataset", [ticket]): obs = env.reset(seed=0, task_id=3) self.assertNotIn("ambiguity_note", obs.current_ticket) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_internal_routing_note", ) ) self.assertIn("ambiguity_note", obs.current_ticket) class TestRelatedTicketPreviewInObservation(unittest.TestCase): """Follow-up tickets expose a lightweight preview of the linked ticket.""" def _reset_linked_ticket_env(self): from unittest.mock import patch dataset = load_dataset() ticket = next((t for t in dataset if t.related_ticket_id is not None), None) self.assertIsNotNone(ticket, "No follow-up ticket found in dataset") related = next( (t for t in dataset if t.ticket_id == ticket.related_ticket_id), None, ) self.assertIsNotNone(related, "Linked ticket missing from dataset") env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object( env, "_tickets_by_id", {ticket.ticket_id: ticket, related.ticket_id: related}, ): obs = env.reset(seed=0, task_id=3, queue_size=1) return env, obs, ticket, related def test_related_ticket_preview_present_when_ticket_has_link(self) -> None: env, obs, ticket, related = self._reset_linked_ticket_env() self.assertIsNotNone(obs.current_ticket) self.assertNotIn("related_ticket_preview", obs.current_ticket) self.assertIn("context_status", obs.current_ticket) self.assertTrue(obs.current_ticket["context_status"]["hidden_context_remaining"]) self.assertGreater(obs.current_ticket["context_status"]["context_gap_count"], 0) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_related_ticket", tool_target_ticket_id=ticket.related_ticket_id, ) ) self.assertIn("related_ticket_preview", obs.current_ticket) self.assertEqual( obs.current_ticket["related_ticket_preview"]["ticket_id"], related.ticket_id, ) self.assertEqual( obs.current_ticket["related_ticket_preview"]["title"], related.title, ) def test_history_keeps_related_ticket_preview_after_step(self) -> None: env, obs, ticket, related = self._reset_linked_ticket_env() env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_related_ticket", tool_target_ticket_id=ticket.related_ticket_id, ) ) next_obs = env.step( HelpdeskTicketAction( issue_type=ticket.issue_type, priority=ticket.priority, assignment_group=ticket.assignment_group, resolution_action=ticket.resolution_action, ) ) self.assertGreaterEqual(len(next_obs.history), 1) self.assertIn("related_ticket_preview", next_obs.history[0]) self.assertEqual( next_obs.history[0]["related_ticket_preview"]["ticket_id"], related.ticket_id, ) class TestObservationQueueContext(unittest.TestCase): """Observation includes clearer queue-position counters.""" def test_reset_sets_queue_position_and_after_current_counts(self) -> None: env = _make_env() obs = env.reset(seed=0, task_id=1, queue_size=3) self.assertEqual(obs.queue_position, 1) self.assertEqual(obs.tickets_remaining, 3) self.assertEqual(obs.tickets_after_current, 2) def test_step_updates_queue_position_and_after_current_counts(self) -> None: env = _make_env() obs = env.reset(seed=0, task_id=1, queue_size=3) obs = env.step(_heuristic_action(obs)) if obs.done: self.assertEqual(obs.queue_position, 0) self.assertEqual(obs.tickets_after_current, 0) else: self.assertEqual(obs.queue_position, 2) self.assertEqual(obs.tickets_remaining, 2) self.assertEqual(obs.tickets_after_current, 1) # --------------------------------------------------------------------------- # 9.6b — investigation actions and queue economics # --------------------------------------------------------------------------- class TestInvestigationActions(unittest.TestCase): """Minimal tool-assisted investigate/submit flow works and stays backwards compatible.""" def _make_linked_env(self): from unittest.mock import patch dataset = load_dataset() ticket = next((t for t in dataset if t.related_ticket_id is not None), None) self.assertIsNotNone(ticket, "No follow-up ticket found in dataset") related = next( (t for t in dataset if t.ticket_id == ticket.related_ticket_id), None, ) self.assertIsNotNone(related, "Linked ticket missing from dataset") env = _make_env() patch_dataset = patch.object(env, "_dataset", [ticket]) patch_lookup = patch.object( env, "_tickets_by_id", {ticket.ticket_id: ticket, related.ticket_id: related}, ) patch_dataset.start() patch_lookup.start() self.addCleanup(patch_dataset.stop) self.addCleanup(patch_lookup.stop) obs = env.reset(seed=0, task_id=3, queue_size=1) return env, obs, ticket, related def _make_cluster_env(self): from unittest.mock import patch dataset = load_dataset() cluster_tickets = [ ticket for ticket in dataset if ticket.service_cluster_id == "atlasbank_lockout_bridge" ] self.assertGreaterEqual(len(cluster_tickets), 2, "Expected atlasbank cluster tickets") root = next( ( ticket for ticket in cluster_tickets if ticket.related_ticket_id is None ), None, ) follow_up = next( ( ticket for ticket in cluster_tickets if ticket.related_ticket_id is not None ), None, ) self.assertIsNotNone(root, "Cluster root ticket missing") self.assertIsNotNone(follow_up, "Cluster follow-up ticket missing") env = _make_env() patch_dataset = patch.object(env, "_dataset", [root, follow_up]) patch_lookup = patch.object( env, "_tickets_by_id", {root.ticket_id: root, follow_up.ticket_id: follow_up}, ) patch_dataset.start() patch_lookup.start() self.addCleanup(patch_dataset.stop) self.addCleanup(patch_lookup.stop) env.reset(seed=0, task_id=3, queue_size=2) env._queue = [root, follow_up] env._sync_queue_ticket_ids() env._state.current_ticket_index = 0 obs = env._build_observation(get_task_definition(3)) return env, obs, root, follow_up def test_investigation_action_does_not_advance_queue(self) -> None: env, obs, ticket, related = self._make_linked_env() investigate = HelpdeskTicketAction( action_type="investigate", tool_name="lookup_related_ticket", tool_target_ticket_id=ticket.related_ticket_id, ) obs2 = env.step(investigate) self.assertFalse(obs2.done) self.assertEqual(obs2.tickets_processed, 0) self.assertEqual(obs2.queue_position, 1) self.assertIsNotNone(obs2.last_tool_result) self.assertTrue(obs2.last_tool_result["found"]) self.assertEqual( obs2.last_tool_result["ticket"]["ticket_id"], related.ticket_id, ) def test_submit_after_investigation_completes_episode(self) -> None: env, obs, ticket, related = self._make_linked_env() obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_related_ticket", tool_target_ticket_id=ticket.related_ticket_id, ) ) operational_context = (obs.current_ticket or {}).get("operational_context", {}) if operational_context.get("incident_recommended"): obs = env.step(HelpdeskTicketAction(action_type="open_incident")) final_obs = env.step( HelpdeskTicketAction( issue_type=ticket.issue_type, priority=ticket.priority, assignment_group=ticket.assignment_group, resolution_action=ticket.resolution_action, ) ) self.assertTrue(final_obs.done) self.assertEqual(final_obs.tickets_processed, 1) self.assertGreaterEqual(final_obs.reward, 0.0) self.assertLessEqual(final_obs.reward, 1.0) def test_requester_history_tool_returns_matches_for_same_requester(self) -> None: from unittest.mock import patch dataset = load_dataset() requester_counts: dict[str, int] = {} for ticket in dataset: requester_counts[ticket.requester] = requester_counts.get(ticket.requester, 0) + 1 target_requester = next( (requester for requester, count in requester_counts.items() if count >= 2), None, ) self.assertIsNotNone(target_requester, "Dataset has no repeated requester") duplicate_requester_group = [ ticket for ticket in dataset if ticket.requester == target_requester ] self.assertGreaterEqual(len(duplicate_requester_group), 2) env = _make_env() with patch.object(env, "_dataset", duplicate_requester_group): with patch.object( env, "_tickets_by_id", {ticket.ticket_id: ticket for ticket in duplicate_requester_group}, ): obs = env.reset(seed=0, task_id=2, queue_size=1) obs2 = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_requester_history", ) ) self.assertIsNotNone(obs2.last_tool_result) self.assertEqual(obs2.last_tool_result["tool_name"], "lookup_requester_history") self.assertTrue(obs2.last_tool_result["found"]) self.assertGreaterEqual(len(obs2.last_tool_result["matches"]), 1) def test_internal_note_tool_reveals_hidden_hard_task_context(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = next((t for t in dataset if t.ticket_id == "TKT-NONDEFAULT-003"), None) self.assertIsNotNone(ticket) env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=3, queue_size=1) self.assertNotIn("ambiguity_note", obs.current_ticket) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_internal_routing_note", ) ) self.assertIn(ticket.ambiguity_note, obs.last_tool_result["routing_note"]) self.assertEqual(obs.current_ticket["ambiguity_note"], ticket.ambiguity_note) self.assertGreater(obs.reward or 0.0, 0.0) def test_queue_capacity_forecast_reveals_routing_options(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = next( (t for t in dataset if t.alternate_route_score_multiplier > 0.0), None, ) self.assertIsNotNone(ticket) env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=3, queue_size=1) self.assertNotIn("routing_options", obs.current_ticket) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_queue_capacity_forecast", ) ) self.assertEqual(obs.last_tool_result["tool_name"], "lookup_queue_capacity_forecast") self.assertTrue(obs.last_tool_result["found"]) self.assertIn("preferred_route_label", obs.last_tool_result) self.assertIn("routing_options", obs.current_ticket) self.assertGreaterEqual(len(obs.current_ticket["routing_options"]), 2) def test_queue_cluster_summary_reveals_future_cluster_load(self) -> None: env, obs, root, follow_up = self._make_cluster_env() self.assertNotIn("future_cluster_ticket_count", obs.current_ticket["operational_context"]) self.assertTrue(obs.current_ticket["operational_context"]["cluster_coordination_hint"]) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_queue_cluster_summary", ) ) self.assertEqual(obs.last_tool_result["tool_name"], "lookup_queue_cluster_summary") self.assertTrue(obs.last_tool_result["found"]) self.assertEqual(obs.last_tool_result["future_cluster_ticket_count"], 1) self.assertEqual( obs.last_tool_result["future_cluster_ticket_ids"], [follow_up.ticket_id], ) self.assertIn("cluster_summary", obs.current_ticket) def test_good_cluster_handling_stabilizes_future_follow_up(self) -> None: env, obs, root, follow_up = self._make_cluster_env() while (obs.current_ticket or {}).get("context_status", {}).get( "hidden_context_remaining" ): tool_name = ( (obs.current_ticket or {}) .get("context_status", {}) .get("recommended_tools", [None])[0] ) self.assertIsNotNone(tool_name) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name=tool_name, ) ) obs = env.step(HelpdeskTicketAction(action_type="open_incident")) obs = env.step( HelpdeskTicketAction( issue_type=root.issue_type, priority=root.priority, assignment_group=root.assignment_group, resolution_action=root.resolution_action, ) ) self.assertFalse(obs.done) self.assertEqual(obs.current_ticket["ticket_id"], follow_up.ticket_id) self.assertTrue(obs.current_ticket["operational_context"]["incident_open"]) self.assertIn( follow_up.ticket_id, obs.history[-1]["reward_components"]["cluster_stabilized_ticket_ids"], ) stabilized_follow_up = env._queue[env.state.current_ticket_index] self.assertEqual(stabilized_follow_up.alternate_assignment_group, "service_desk") self.assertGreaterEqual(stabilized_follow_up.alternate_route_score_multiplier, 0.9) def test_bad_cluster_handling_escalates_future_follow_up(self) -> None: env, obs, root, follow_up = self._make_cluster_env() obs = env.step( HelpdeskTicketAction( issue_type="general_inquiry", priority="low", assignment_group="service_desk", resolution_action="acknowledge", ) ) self.assertFalse(obs.done) self.assertEqual(obs.current_ticket["ticket_id"], follow_up.ticket_id) self.assertIn( follow_up.ticket_id, obs.history[-1]["reward_components"]["cluster_destabilized_ticket_ids"], ) escalated_follow_up = env._queue[env.state.current_ticket_index] self.assertNotEqual(escalated_follow_up.priority, follow_up.priority) self.assertIn( "did not fully resolve the blocker", escalated_follow_up.customer_update_note or "", ) def test_submit_without_required_investigation_gets_shaping_penalty(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = next((t for t in dataset if t.ticket_id == "TKT-NONDEFAULT-003"), None) self.assertIsNotNone(ticket) env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=3, queue_size=1) final_obs = env.step( HelpdeskTicketAction( issue_type=ticket.issue_type, priority=ticket.priority, assignment_group=ticket.assignment_group, resolution_action=ticket.resolution_action, ) ) self.assertTrue(final_obs.done) self.assertIsNotNone(final_obs.rubric_reward) self.assertLess(final_obs.reward, final_obs.rubric_reward) self.assertGreater( final_obs.last_reward_components.get("context_gap_penalty", 0.0), 0.0, ) def test_terminal_rubric_reports_queue_management_score(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = next((t for t in dataset if t.ticket_id == "TKT-NONDEFAULT-003"), None) self.assertIsNotNone(ticket) env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=3, queue_size=1) final_obs = env.step( HelpdeskTicketAction( issue_type=ticket.issue_type, priority=ticket.priority, assignment_group=ticket.assignment_group, resolution_action=ticket.resolution_action, ) ) self.assertTrue(final_obs.done) self.assertIn("queue_management_score", final_obs.last_reward_components) self.assertIn("queue_management_breakdown", final_obs.last_reward_components) self.assertIn("context_resolution", final_obs.last_reward_components["queue_management_breakdown"]) def test_capacity_forecast_hides_future_demand_until_tool_use(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = next( (t for t in dataset if t.alternate_route_score_multiplier > 0.0), None, ) self.assertIsNotNone(ticket) env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=3, queue_size=1) self.assertNotIn("future_queue_demand", obs.metadata) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_queue_capacity_forecast", ) ) self.assertIn("future_queue_demand", obs.last_tool_result) class TestQueueEconomics(unittest.TestCase): """Free investigations are allowed, but excessive investigation gets a queue-level penalty.""" def test_extra_investigations_reduce_final_reward(self) -> None: from unittest.mock import patch dataset = load_dataset() ticket = dataset[0] env = _make_env() with patch.object(env, "_dataset", [ticket]): with patch.object(env, "_tickets_by_id", {ticket.ticket_id: ticket}): obs = env.reset(seed=0, task_id=1, queue_size=1) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_requester_history", ) ) self.assertEqual(env.state.investigation_steps, 1) self.assertEqual(env.state.investigation_budget_remaining, 0) obs = env.step( HelpdeskTicketAction( action_type="investigate", tool_name="lookup_requester_history", ) ) self.assertEqual(env.state.investigation_steps, 2) final_obs = env.step(HelpdeskTicketAction(issue_type=ticket.issue_type)) self.assertTrue(final_obs.done) self.assertLess(final_obs.reward, 1.0) self.assertAlmostEqual( final_obs.last_reward_components.get("investigation_penalty_applied", 0.0), 0.04, places=9, ) class TestTerminalInvalidActionFinalReward(unittest.TestCase): """Terminal invalid submit actions should still return the queue-level final reward.""" def test_last_invalid_submit_returns_trajectory_reward_not_zero(self) -> None: from unittest.mock import patch from server.tasks import get_task_definition as base_get_task_definition dataset = load_dataset() first = dataset[0] second = dataset[1] env = _make_env() with patch.object(env, "_dataset", [first, second]): with patch.object( env, "_tickets_by_id", {first.ticket_id: first, second.ticket_id: second}, ): with patch( "server.environment.get_task_definition", side_effect=lambda task_id: ( { **base_get_task_definition(task_id), "allowed_fields": ["issue_type"], } if task_id == 1 else base_get_task_definition(task_id) ), ): obs = env.reset(seed=0, task_id=1, queue_size=2) tickets_by_id = {first.ticket_id: first, second.ticket_id: second} current = tickets_by_id[obs.current_ticket["ticket_id"]] obs = env.step(HelpdeskTicketAction(issue_type=current.issue_type)) self.assertFalse(obs.done) current = tickets_by_id[obs.current_ticket["ticket_id"]] final_obs = env.step( HelpdeskTicketAction( issue_type=current.issue_type, priority="medium", ) ) self.assertTrue(final_obs.done) expected_average = sum(env.state.per_ticket_scores) / len( env.state.per_ticket_scores ) self.assertGreater(final_obs.reward, 0.0) self.assertAlmostEqual(final_obs.reward, expected_average, places=9) self.assertAlmostEqual(env.state.total_reward, expected_average, places=9) self.assertAlmostEqual(env.state.reward or 0.0, expected_average, places=9) # --------------------------------------------------------------------------- # 9.7 — Dataset has >= 3 non-default routing tickets # --------------------------------------------------------------------------- class TestDatasetNonDefaultRouting(unittest.TestCase): """9.7 — Dataset contains at least 3 tickets with non-default assignment_group.""" def test_at_least_three_nondefault_routing_tickets(self) -> None: from vocabulary import ISSUE_TYPE_TO_ASSIGNMENT_GROUP dataset = load_dataset() non_default = [ t for t in dataset if t.assignment_group != ISSUE_TYPE_TO_ASSIGNMENT_GROUP.get(t.issue_type) ] self.assertGreaterEqual( len(non_default), 10, f"Expected >= 10 non-default routing tickets, found {len(non_default)}: " + str([(t.ticket_id, t.issue_type, t.assignment_group) for t in non_default]) ) def test_tkt_nondefault_tickets_exist(self) -> None: dataset = load_dataset() ids = {t.ticket_id for t in dataset} for expected_id in ("TKT-NONDEFAULT-001", "TKT-NONDEFAULT-002", "TKT-NONDEFAULT-003"): self.assertIn(expected_id, ids, f"{expected_id} not found in dataset") # --------------------------------------------------------------------------- # 9.9 — SUPPORTS_CONCURRENT_SESSIONS is True # --------------------------------------------------------------------------- class TestConcurrentSessionsFlag(unittest.TestCase): """9.9 — HelpdeskTicketRoutingEnvironment.SUPPORTS_CONCURRENT_SESSIONS is True.""" def test_supports_concurrent_sessions_is_true(self) -> None: self.assertTrue(HelpdeskTicketRoutingEnvironment.SUPPORTS_CONCURRENT_SESSIONS) def test_flag_is_boolean_true(self) -> None: flag = HelpdeskTicketRoutingEnvironment.SUPPORTS_CONCURRENT_SESSIONS self.assertIs(flag, True) # --------------------------------------------------------------------------- # 9.10 — GET /web returns 200 with HTML content # --------------------------------------------------------------------------- def _build_web_test_app(): """Build a minimal FastAPI app with only the /web route for testing.""" from fastapi import FastAPI from fastapi.responses import HTMLResponse from server.tasks import TASKS from vocabulary import APP_ENV_NAME _app = FastAPI() @_app.get("/web", response_class=HTMLResponse) def web_ui(): task_rows = "".join( f"
Version: 0.1.0 | Health | API Docs
| ID | Name | Difficulty |
|---|