Spaces:
Sleeping
Sleeping
| """Tests for core modules (delivery_search, delivery_planner).""" | |
| import pytest | |
| from app.core.delivery_search import DeliverySearch | |
| from app.core.delivery_planner import DeliveryPlanner | |
| from app.core.node import SearchNode | |
| from app.core.frontier import QueueFrontier, StackFrontier, PriorityQueueFrontier | |
| from app.models.grid import Grid | |
| from app.models.entities import Store, Destination, Tunnel | |
| class TestSearchNode: | |
| """Tests for SearchNode.""" | |
| def test_node_creation(self): | |
| """Test basic node creation.""" | |
| node = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| assert node.state == (0, 0) | |
| assert node.path_cost == 0 | |
| assert node.depth == 0 | |
| assert node.parent is None | |
| assert node.action is None | |
| def test_node_with_parent(self): | |
| """Test node with parent.""" | |
| parent = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| child = SearchNode( | |
| state=(1, 0), | |
| parent=parent, | |
| action="right", | |
| path_cost=1, | |
| depth=1, | |
| ) | |
| assert child.parent == parent | |
| assert child.action == "right" | |
| assert child.depth == 1 | |
| def test_get_path(self): | |
| """Test path reconstruction.""" | |
| root = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| child1 = SearchNode(state=(1, 0), parent=root, action="right", path_cost=1, depth=1) | |
| child2 = SearchNode(state=(2, 0), parent=child1, action="right", path_cost=2, depth=2) | |
| path = child2.get_path() | |
| assert path == [(0, 0), (1, 0), (2, 0)] | |
| def test_get_solution(self): | |
| """Test solution string generation.""" | |
| root = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| child1 = SearchNode(state=(1, 0), parent=root, action="right", path_cost=1, depth=1) | |
| child2 = SearchNode(state=(1, 1), parent=child1, action="up", path_cost=2, depth=2) | |
| solution = child2.get_solution() | |
| assert solution == "right,up" | |
| def test_get_solution_single_node(self): | |
| """Test solution for single node (start = goal).""" | |
| root = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| assert root.get_solution() == "" | |
| class TestFrontiers: | |
| """Tests for frontier data structures.""" | |
| def test_queue_frontier_fifo(self): | |
| """Test queue frontier is FIFO.""" | |
| frontier = QueueFrontier() | |
| node1 = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| node2 = SearchNode(state=(1, 0), path_cost=1, depth=1) | |
| node3 = SearchNode(state=(2, 0), path_cost=2, depth=2) | |
| frontier.push(node1) | |
| frontier.push(node2) | |
| frontier.push(node3) | |
| assert frontier.pop().state == (0, 0) | |
| assert frontier.pop().state == (1, 0) | |
| assert frontier.pop().state == (2, 0) | |
| def test_stack_frontier_lifo(self): | |
| """Test stack frontier is LIFO.""" | |
| frontier = StackFrontier() | |
| node1 = SearchNode(state=(0, 0), path_cost=0, depth=0) | |
| node2 = SearchNode(state=(1, 0), path_cost=1, depth=1) | |
| node3 = SearchNode(state=(2, 0), path_cost=2, depth=2) | |
| frontier.push(node1) | |
| frontier.push(node2) | |
| frontier.push(node3) | |
| assert frontier.pop().state == (2, 0) | |
| assert frontier.pop().state == (1, 0) | |
| assert frontier.pop().state == (0, 0) | |
| def test_priority_queue_frontier(self): | |
| """Test priority queue frontier orders by priority.""" | |
| frontier = PriorityQueueFrontier() | |
| # Priority is stored in node.priority attribute | |
| node1 = SearchNode(state=(0, 0), path_cost=5, depth=0) | |
| node1.priority = 5 | |
| node2 = SearchNode(state=(1, 0), path_cost=1, depth=1) | |
| node2.priority = 1 | |
| node3 = SearchNode(state=(2, 0), path_cost=3, depth=2) | |
| node3.priority = 3 | |
| frontier.push(node1) | |
| frontier.push(node2) | |
| frontier.push(node3) | |
| assert frontier.pop().state == (1, 0) # Lowest priority first | |
| assert frontier.pop().state == (2, 0) | |
| assert frontier.pop().state == (0, 0) | |
| def test_frontier_is_empty(self): | |
| """Test frontier empty check.""" | |
| frontier = QueueFrontier() | |
| assert frontier.is_empty() is True | |
| frontier.push(SearchNode(state=(0, 0), path_cost=0, depth=0)) | |
| assert frontier.is_empty() is False | |
| frontier.pop() | |
| assert frontier.is_empty() is True | |
| def test_frontier_contains_state(self): | |
| """Test frontier state containment check.""" | |
| frontier = QueueFrontier() | |
| frontier.push(SearchNode(state=(0, 0), path_cost=0, depth=0)) | |
| frontier.push(SearchNode(state=(1, 0), path_cost=1, depth=1)) | |
| assert frontier.contains_state((0, 0)) is True | |
| assert frontier.contains_state((1, 0)) is True | |
| assert frontier.contains_state((2, 0)) is False | |
| def test_frontier_get_states(self): | |
| """Test getting all states in frontier.""" | |
| frontier = QueueFrontier() | |
| frontier.push(SearchNode(state=(0, 0), path_cost=0, depth=0)) | |
| frontier.push(SearchNode(state=(1, 0), path_cost=1, depth=1)) | |
| states = frontier.get_states() | |
| assert (0, 0) in states | |
| assert (1, 0) in states | |
| class TestDeliverySearch: | |
| """Tests for DeliverySearch.""" | |
| def test_initial_state(self, simple_grid): | |
| """Test initial state is set correctly.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| assert search.initial_state() == (0, 0) | |
| def test_goal_test(self, simple_grid): | |
| """Test goal test function.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| assert search.goal_test((2, 2)) is True | |
| assert search.goal_test((0, 0)) is False | |
| assert search.goal_test((1, 1)) is False | |
| def test_actions(self, simple_grid): | |
| """Test available actions.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| # Corner has 2 actions | |
| actions = search.actions((0, 0)) | |
| assert "up" in actions | |
| assert "right" in actions | |
| assert "down" not in actions | |
| assert "left" not in actions | |
| # Center has 4 actions | |
| actions = search.actions((1, 1)) | |
| assert len(actions) == 4 | |
| def test_actions_with_tunnel(self, simple_grid): | |
| """Test tunnel action availability.""" | |
| tunnel = Tunnel(entrance1=(0, 0), entrance2=(2, 2)) | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), [tunnel]) | |
| actions = search.actions((0, 0)) | |
| assert "tunnel" in actions | |
| actions = search.actions((1, 1)) | |
| assert "tunnel" not in actions | |
| def test_result_movement(self, simple_grid): | |
| """Test result of movement actions.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| assert search.result((1, 1), "up") == (1, 2) | |
| assert search.result((1, 1), "down") == (1, 0) | |
| assert search.result((1, 1), "left") == (0, 1) | |
| assert search.result((1, 1), "right") == (2, 1) | |
| def test_result_tunnel(self, simple_grid): | |
| """Test result of tunnel action.""" | |
| tunnel = Tunnel(entrance1=(0, 0), entrance2=(2, 2)) | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), [tunnel]) | |
| assert search.result((0, 0), "tunnel") == (2, 2) | |
| assert search.result((2, 2), "tunnel") == (0, 0) | |
| def test_step_cost_movement(self, grid_with_varied_traffic): | |
| """Test step cost for movement.""" | |
| search = DeliverySearch(grid_with_varied_traffic, (0, 0), (2, 2), []) | |
| # Cost should equal traffic level | |
| cost = search.step_cost((0, 0), "right", (1, 0)) | |
| assert cost == grid_with_varied_traffic.get_traffic((0, 0), (1, 0)) | |
| def test_step_cost_tunnel(self, simple_grid): | |
| """Test step cost for tunnel (Manhattan distance).""" | |
| tunnel = Tunnel(entrance1=(0, 0), entrance2=(2, 2)) | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), [tunnel]) | |
| cost = search.step_cost((0, 0), "tunnel", (2, 2)) | |
| assert cost == tunnel.cost # Manhattan distance = 4 | |
| def test_solve_all_strategies(self, simple_grid): | |
| """Test solve method with all strategies.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| strategies = ["BF", "DF", "ID", "UC", "GR1", "GR2", "AS1", "AS2"] | |
| for strategy in strategies: | |
| result, _ = search.solve(strategy, visualize=False) | |
| assert result.path[-1] == (2, 2), f"Strategy {strategy} failed" | |
| def test_solve_invalid_strategy(self, simple_grid): | |
| """Test solve with invalid strategy.""" | |
| search = DeliverySearch(simple_grid, (0, 0), (2, 2), []) | |
| with pytest.raises(ValueError): | |
| search.solve("INVALID", visualize=False) | |
| def test_path_static_method(self, simple_grid): | |
| """Test static path method.""" | |
| result, steps = DeliverySearch.path( | |
| simple_grid, | |
| (0, 0), | |
| (2, 2), | |
| [], | |
| "BF", | |
| visualize=False, | |
| ) | |
| assert result.path[0] == (0, 0) | |
| assert result.path[-1] == (2, 2) | |
| class TestDeliveryPlanner: | |
| """Tests for DeliveryPlanner.""" | |
| def test_planner_single_destination(self, simple_grid): | |
| """Test planner with single destination.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, _ = planner.plan("BF", visualize=False) | |
| assert len(result.assignments) == 1 | |
| assert result.assignments[0].store_id == 1 | |
| assert result.assignments[0].destination_id == 1 | |
| assert result.total_cost > 0 | |
| def test_planner_multiple_destinations(self, simple_grid): | |
| """Test planner with multiple destinations.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [ | |
| Destination(id=1, position=(2, 0)), | |
| Destination(id=2, position=(0, 2)), | |
| Destination(id=3, position=(2, 2)), | |
| ] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, _ = planner.plan("BF", visualize=False) | |
| assert len(result.assignments) == 3 | |
| assert result.total_cost > 0 | |
| def test_planner_multiple_stores(self, simple_grid): | |
| """Test planner assigns to nearest store.""" | |
| stores = [ | |
| Store(id=1, position=(0, 0)), | |
| Store(id=2, position=(2, 2)), | |
| ] | |
| destinations = [Destination(id=1, position=(2, 0))] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, _ = planner.plan("UC", visualize=False) | |
| # Should assign to store 1 (closer to destination (2,0)) | |
| assert len(result.assignments) == 1 | |
| def test_planner_with_tunnels(self, simple_grid): | |
| """Test planner uses tunnels.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| tunnels = [Tunnel(entrance1=(0, 0), entrance2=(2, 2))] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, tunnels) | |
| result, _ = planner.plan("UC", visualize=False) | |
| # Tunnel cost is 4 (Manhattan distance) | |
| # Without tunnel, minimum cost would be 4 (each segment has traffic 1) | |
| assert result.total_cost == 4 | |
| def test_planner_from_state(self, simple_grid): | |
| """Test static plan_from_state method.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| result, _ = DeliveryPlanner.plan_from_state( | |
| simple_grid, | |
| stores, | |
| destinations, | |
| [], | |
| "BF", | |
| visualize=False, | |
| ) | |
| assert len(result.assignments) == 1 | |
| assert result.total_cost > 0 | |
| def test_planner_total_nodes(self, simple_grid): | |
| """Test total nodes expanded tracking.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [ | |
| Destination(id=1, position=(2, 0)), | |
| Destination(id=2, position=(0, 2)), | |
| ] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, _ = planner.plan("BF", visualize=False) | |
| assert result.total_nodes_expanded > 0 | |
| def test_planner_visualization(self, simple_grid): | |
| """Test planner with visualization.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, viz_data = planner.plan("BF", visualize=True) | |
| assert viz_data is not None | |
| assert 1 in viz_data # Destination ID 1 should have steps | |
| def test_planner_result_to_string(self, simple_grid): | |
| """Test plan result string formatting.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| planner = DeliveryPlanner(simple_grid, stores, destinations, []) | |
| result, _ = planner.plan("BF", visualize=False) | |
| output = result.to_string() | |
| assert isinstance(output, str) | |
| assert len(output) > 0 | |
| def test_planner_all_strategies(self, simple_grid): | |
| """Test planner works with all strategies.""" | |
| stores = [Store(id=1, position=(0, 0))] | |
| destinations = [Destination(id=1, position=(2, 2))] | |
| strategies = ["BF", "DF", "ID", "UC", "GR1", "GR2", "AS1", "AS2"] | |
| for strategy in strategies: | |
| result, _ = DeliveryPlanner.plan_from_state( | |
| simple_grid, stores, destinations, [], strategy, False | |
| ) | |
| assert len(result.assignments) == 1, f"Strategy {strategy} failed" | |