"""Generic search problem abstract base class.""" from abc import ABC, abstractmethod from typing import List, Tuple, Optional from .node import SearchNode from .frontier import Frontier from ..models.state import PathResult, SearchStep class GenericSearch(ABC): """ Abstract base class for search problems. Subclasses must implement: - initial_state(): Return the starting state - goal_test(state): Return True if state is a goal - actions(state): Return list of valid actions from state - result(state, action): Return new state after action - step_cost(state, action, next_state): Return cost of action """ @abstractmethod def initial_state(self) -> Tuple[int, int]: """Return the initial state.""" pass @abstractmethod def goal_test(self, state: Tuple[int, int]) -> bool: """Return True if state is a goal state.""" pass @abstractmethod def actions(self, state: Tuple[int, int]) -> List[str]: """Return list of valid actions from given state.""" pass @abstractmethod def result(self, state: Tuple[int, int], action: str) -> Tuple[int, int]: """Return the state resulting from taking action in given state.""" pass @abstractmethod def step_cost( self, state: Tuple[int, int], action: str, next_state: Tuple[int, int] ) -> float: """Return the cost of taking action from state to next_state.""" pass def heuristic(self, state: Tuple[int, int]) -> float: """ Heuristic function h(n) estimating cost from state to goal. Override in subclass for informed search. """ return 0.0 def solve( self, strategy: str, visualize: bool = False ) -> Tuple[PathResult, Optional[List[SearchStep]]]: """ Solve the search problem using the specified strategy. Args: strategy: One of 'BF', 'DF', 'ID', 'UC', 'GR1', 'GR2', 'AS1', 'AS2' visualize: If True, collect visualization steps Returns: Tuple of (PathResult, Optional[List[SearchStep]]) """ from ..algorithms import ( bfs_search, dfs_search, ids_search, ucs_search, greedy_search, astar_search, ) from ..heuristics import ( manhattan_heuristic, euclidean_heuristic, ) # Wrap instance heuristic to match expected signature (state, goal) -> float def tunnel_aware_wrapper(state, goal): return self.heuristic(state) # Map strategy codes to search functions strategy_map = { "BF": lambda: bfs_search(self, visualize), "DF": lambda: dfs_search(self, visualize), "ID": lambda: ids_search(self, visualize), "UC": lambda: ucs_search(self, visualize), "GR1": lambda: greedy_search(self, manhattan_heuristic, visualize), "GR2": lambda: greedy_search(self, euclidean_heuristic, visualize), "AS1": lambda: astar_search(self, manhattan_heuristic, visualize), "AS2": lambda: astar_search( self, tunnel_aware_wrapper, visualize ), # Tunnel-aware } if strategy not in strategy_map: raise ValueError(f"Unknown strategy: {strategy}") return strategy_map[strategy]() def graph_search( problem: GenericSearch, frontier: Frontier, visualize: bool = False ) -> Tuple[PathResult, Optional[List[SearchStep]]]: """ Generic graph search algorithm. Args: problem: The search problem to solve frontier: The frontier data structure (Queue, Stack, or PriorityQueue) visualize: If True, collect visualization steps Returns: Tuple of (PathResult, Optional[List[SearchStep]]) """ # Initialize start = problem.initial_state() start_node = SearchNode(state=start, path_cost=0, depth=0) frontier.push(start_node) explored: set = set() nodes_expanded = 0 steps: List[SearchStep] = [] if visualize else None while not frontier.is_empty(): # Get next node node = frontier.pop() # Record step for visualization if visualize: steps.append( SearchStep( step_number=nodes_expanded, current_node=node.state, action=node.action, frontier=frontier.get_states(), explored=list(explored), current_path=node.get_path(), path_cost=node.path_cost, ) ) # Goal test if problem.goal_test(node.state): return ( PathResult( plan=node.get_solution(), cost=node.path_cost, nodes_expanded=nodes_expanded, path=node.get_path(), ), steps, ) # Skip if already explored if node.state in explored: continue # Mark as explored and count explored.add(node.state) nodes_expanded += 1 # Expand node for action in problem.actions(node.state): child_state = problem.result(node.state, action) if child_state not in explored and not frontier.contains_state(child_state): step_cost = problem.step_cost(node.state, action, child_state) child = SearchNode( state=child_state, parent=node, action=action, path_cost=node.path_cost + step_cost, depth=node.depth + 1, ) frontier.push(child) # No solution found return ( PathResult(plan="", cost=float("inf"), nodes_expanded=nodes_expanded, path=[]), steps, )