Spaces:
Sleeping
Sleeping
| """Generic search problem abstract base class.""" | |
| from abc import ABC, abstractmethod | |
| from typing import List, Tuple, Optional | |
| from .node import SearchNode | |
| from .frontier import Frontier | |
| from ..models.state import PathResult, SearchStep | |
| class GenericSearch(ABC): | |
| """ | |
| Abstract base class for search problems. | |
| Subclasses must implement: | |
| - initial_state(): Return the starting state | |
| - goal_test(state): Return True if state is a goal | |
| - actions(state): Return list of valid actions from state | |
| - result(state, action): Return new state after action | |
| - step_cost(state, action, next_state): Return cost of action | |
| """ | |
| def initial_state(self) -> Tuple[int, int]: | |
| """Return the initial state.""" | |
| pass | |
| def goal_test(self, state: Tuple[int, int]) -> bool: | |
| """Return True if state is a goal state.""" | |
| pass | |
| def actions(self, state: Tuple[int, int]) -> List[str]: | |
| """Return list of valid actions from given state.""" | |
| pass | |
| def result(self, state: Tuple[int, int], action: str) -> Tuple[int, int]: | |
| """Return the state resulting from taking action in given state.""" | |
| pass | |
| def step_cost( | |
| self, state: Tuple[int, int], action: str, next_state: Tuple[int, int] | |
| ) -> float: | |
| """Return the cost of taking action from state to next_state.""" | |
| pass | |
| def heuristic(self, state: Tuple[int, int]) -> float: | |
| """ | |
| Heuristic function h(n) estimating cost from state to goal. | |
| Override in subclass for informed search. | |
| """ | |
| return 0.0 | |
| def solve( | |
| self, strategy: str, visualize: bool = False | |
| ) -> Tuple[PathResult, Optional[List[SearchStep]]]: | |
| """ | |
| Solve the search problem using the specified strategy. | |
| Args: | |
| strategy: One of 'BF', 'DF', 'ID', 'UC', 'GR1', 'GR2', 'AS1', 'AS2' | |
| visualize: If True, collect visualization steps | |
| Returns: | |
| Tuple of (PathResult, Optional[List[SearchStep]]) | |
| """ | |
| from ..algorithms import ( | |
| bfs_search, | |
| dfs_search, | |
| ids_search, | |
| ucs_search, | |
| greedy_search, | |
| astar_search, | |
| ) | |
| from ..heuristics import ( | |
| manhattan_heuristic, | |
| euclidean_heuristic, | |
| ) | |
| # Wrap instance heuristic to match expected signature (state, goal) -> float | |
| def tunnel_aware_wrapper(state, goal): | |
| return self.heuristic(state) | |
| # Map strategy codes to search functions | |
| strategy_map = { | |
| "BF": lambda: bfs_search(self, visualize), | |
| "DF": lambda: dfs_search(self, visualize), | |
| "ID": lambda: ids_search(self, visualize), | |
| "UC": lambda: ucs_search(self, visualize), | |
| "GR1": lambda: greedy_search(self, manhattan_heuristic, visualize), | |
| "GR2": lambda: greedy_search(self, euclidean_heuristic, visualize), | |
| "AS1": lambda: astar_search(self, manhattan_heuristic, visualize), | |
| "AS2": lambda: astar_search( | |
| self, tunnel_aware_wrapper, visualize | |
| ), # Tunnel-aware | |
| } | |
| if strategy not in strategy_map: | |
| raise ValueError(f"Unknown strategy: {strategy}") | |
| return strategy_map[strategy]() | |
| def graph_search( | |
| problem: GenericSearch, frontier: Frontier, visualize: bool = False | |
| ) -> Tuple[PathResult, Optional[List[SearchStep]]]: | |
| """ | |
| Generic graph search algorithm. | |
| Args: | |
| problem: The search problem to solve | |
| frontier: The frontier data structure (Queue, Stack, or PriorityQueue) | |
| visualize: If True, collect visualization steps | |
| Returns: | |
| Tuple of (PathResult, Optional[List[SearchStep]]) | |
| """ | |
| # Initialize | |
| start = problem.initial_state() | |
| start_node = SearchNode(state=start, path_cost=0, depth=0) | |
| frontier.push(start_node) | |
| explored: set = set() | |
| nodes_expanded = 0 | |
| steps: List[SearchStep] = [] if visualize else None | |
| while not frontier.is_empty(): | |
| # Get next node | |
| node = frontier.pop() | |
| # Record step for visualization | |
| if visualize: | |
| steps.append( | |
| SearchStep( | |
| step_number=nodes_expanded, | |
| current_node=node.state, | |
| action=node.action, | |
| frontier=frontier.get_states(), | |
| explored=list(explored), | |
| current_path=node.get_path(), | |
| path_cost=node.path_cost, | |
| ) | |
| ) | |
| # Goal test | |
| if problem.goal_test(node.state): | |
| return ( | |
| PathResult( | |
| plan=node.get_solution(), | |
| cost=node.path_cost, | |
| nodes_expanded=nodes_expanded, | |
| path=node.get_path(), | |
| ), | |
| steps, | |
| ) | |
| # Skip if already explored | |
| if node.state in explored: | |
| continue | |
| # Mark as explored and count | |
| explored.add(node.state) | |
| nodes_expanded += 1 | |
| # Expand node | |
| for action in problem.actions(node.state): | |
| child_state = problem.result(node.state, action) | |
| if child_state not in explored and not frontier.contains_state(child_state): | |
| step_cost = problem.step_cost(node.state, action, child_state) | |
| child = SearchNode( | |
| state=child_state, | |
| parent=node, | |
| action=action, | |
| path_cost=node.path_cost + step_cost, | |
| depth=node.depth + 1, | |
| ) | |
| frontier.push(child) | |
| # No solution found | |
| return ( | |
| PathResult(plan="", cost=float("inf"), nodes_expanded=nodes_expanded, path=[]), | |
| steps, | |
| ) | |