File size: 6,022 Bytes
e067c2d
47bba68
e067c2d
47bba68
e067c2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47bba68
e067c2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47bba68
 
 
 
 
 
 
 
 
 
e067c2d
 
 
 
 
 
 
 
 
47bba68
e067c2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47bba68
 
 
 
 
 
 
 
 
 
 
e067c2d
 
 
47bba68
 
 
 
 
 
 
 
 
e067c2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47bba68
e067c2d
 
 
 
47bba68
 
 
e067c2d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Generic search problem abstract base class."""

from abc import ABC, abstractmethod
from typing import List, Tuple, Optional
from .node import SearchNode
from .frontier import Frontier
from ..models.state import PathResult, SearchStep


class GenericSearch(ABC):
    """
    Abstract base class for search problems.

    Subclasses must implement:
        - initial_state(): Return the starting state
        - goal_test(state): Return True if state is a goal
        - actions(state): Return list of valid actions from state
        - result(state, action): Return new state after action
        - step_cost(state, action, next_state): Return cost of action
    """

    @abstractmethod
    def initial_state(self) -> Tuple[int, int]:
        """Return the initial state."""
        pass

    @abstractmethod
    def goal_test(self, state: Tuple[int, int]) -> bool:
        """Return True if state is a goal state."""
        pass

    @abstractmethod
    def actions(self, state: Tuple[int, int]) -> List[str]:
        """Return list of valid actions from given state."""
        pass

    @abstractmethod
    def result(self, state: Tuple[int, int], action: str) -> Tuple[int, int]:
        """Return the state resulting from taking action in given state."""
        pass

    @abstractmethod
    def step_cost(
        self, state: Tuple[int, int], action: str, next_state: Tuple[int, int]
    ) -> float:
        """Return the cost of taking action from state to next_state."""
        pass

    def heuristic(self, state: Tuple[int, int]) -> float:
        """
        Heuristic function h(n) estimating cost from state to goal.
        Override in subclass for informed search.
        """
        return 0.0

    def solve(
        self, strategy: str, visualize: bool = False
    ) -> Tuple[PathResult, Optional[List[SearchStep]]]:
        """
        Solve the search problem using the specified strategy.

        Args:
            strategy: One of 'BF', 'DF', 'ID', 'UC', 'GR1', 'GR2', 'AS1', 'AS2'
            visualize: If True, collect visualization steps

        Returns:
            Tuple of (PathResult, Optional[List[SearchStep]])
        """
        from ..algorithms import (
            bfs_search,
            dfs_search,
            ids_search,
            ucs_search,
            greedy_search,
            astar_search,
        )
        from ..heuristics import (
            manhattan_heuristic,
            euclidean_heuristic,
        )

        # Wrap instance heuristic to match expected signature (state, goal) -> float
        def tunnel_aware_wrapper(state, goal):
            return self.heuristic(state)

        # Map strategy codes to search functions
        strategy_map = {
            "BF": lambda: bfs_search(self, visualize),
            "DF": lambda: dfs_search(self, visualize),
            "ID": lambda: ids_search(self, visualize),
            "UC": lambda: ucs_search(self, visualize),
            "GR1": lambda: greedy_search(self, manhattan_heuristic, visualize),
            "GR2": lambda: greedy_search(self, euclidean_heuristic, visualize),
            "AS1": lambda: astar_search(self, manhattan_heuristic, visualize),
            "AS2": lambda: astar_search(
                self, tunnel_aware_wrapper, visualize
            ),  # Tunnel-aware
        }

        if strategy not in strategy_map:
            raise ValueError(f"Unknown strategy: {strategy}")

        return strategy_map[strategy]()


def graph_search(
    problem: GenericSearch, frontier: Frontier, visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
    """
    Generic graph search algorithm.

    Args:
        problem: The search problem to solve
        frontier: The frontier data structure (Queue, Stack, or PriorityQueue)
        visualize: If True, collect visualization steps

    Returns:
        Tuple of (PathResult, Optional[List[SearchStep]])
    """
    # Initialize
    start = problem.initial_state()
    start_node = SearchNode(state=start, path_cost=0, depth=0)
    frontier.push(start_node)
    explored: set = set()
    nodes_expanded = 0
    steps: List[SearchStep] = [] if visualize else None

    while not frontier.is_empty():
        # Get next node
        node = frontier.pop()

        # Record step for visualization
        if visualize:
            steps.append(
                SearchStep(
                    step_number=nodes_expanded,
                    current_node=node.state,
                    action=node.action,
                    frontier=frontier.get_states(),
                    explored=list(explored),
                    current_path=node.get_path(),
                    path_cost=node.path_cost,
                )
            )

        # Goal test
        if problem.goal_test(node.state):
            return (
                PathResult(
                    plan=node.get_solution(),
                    cost=node.path_cost,
                    nodes_expanded=nodes_expanded,
                    path=node.get_path(),
                ),
                steps,
            )

        # Skip if already explored
        if node.state in explored:
            continue

        # Mark as explored and count
        explored.add(node.state)
        nodes_expanded += 1

        # Expand node
        for action in problem.actions(node.state):
            child_state = problem.result(node.state, action)
            if child_state not in explored and not frontier.contains_state(child_state):
                step_cost = problem.step_cost(node.state, action, child_state)
                child = SearchNode(
                    state=child_state,
                    parent=node,
                    action=action,
                    path_cost=node.path_cost + step_cost,
                    depth=node.depth + 1,
                )
                frontier.push(child)

    # No solution found
    return (
        PathResult(plan="", cost=float("inf"), nodes_expanded=nodes_expanded, path=[]),
        steps,
    )