SearchAlgorithms / backend /app /core /generic_search.py
Kacemath's picture
feat: update with latest changes
47bba68
"""Generic search problem abstract base class."""
from abc import ABC, abstractmethod
from typing import List, Tuple, Optional
from .node import SearchNode
from .frontier import Frontier
from ..models.state import PathResult, SearchStep
class GenericSearch(ABC):
"""
Abstract base class for search problems.
Subclasses must implement:
- initial_state(): Return the starting state
- goal_test(state): Return True if state is a goal
- actions(state): Return list of valid actions from state
- result(state, action): Return new state after action
- step_cost(state, action, next_state): Return cost of action
"""
@abstractmethod
def initial_state(self) -> Tuple[int, int]:
"""Return the initial state."""
pass
@abstractmethod
def goal_test(self, state: Tuple[int, int]) -> bool:
"""Return True if state is a goal state."""
pass
@abstractmethod
def actions(self, state: Tuple[int, int]) -> List[str]:
"""Return list of valid actions from given state."""
pass
@abstractmethod
def result(self, state: Tuple[int, int], action: str) -> Tuple[int, int]:
"""Return the state resulting from taking action in given state."""
pass
@abstractmethod
def step_cost(
self, state: Tuple[int, int], action: str, next_state: Tuple[int, int]
) -> float:
"""Return the cost of taking action from state to next_state."""
pass
def heuristic(self, state: Tuple[int, int]) -> float:
"""
Heuristic function h(n) estimating cost from state to goal.
Override in subclass for informed search.
"""
return 0.0
def solve(
self, strategy: str, visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
"""
Solve the search problem using the specified strategy.
Args:
strategy: One of 'BF', 'DF', 'ID', 'UC', 'GR1', 'GR2', 'AS1', 'AS2'
visualize: If True, collect visualization steps
Returns:
Tuple of (PathResult, Optional[List[SearchStep]])
"""
from ..algorithms import (
bfs_search,
dfs_search,
ids_search,
ucs_search,
greedy_search,
astar_search,
)
from ..heuristics import (
manhattan_heuristic,
euclidean_heuristic,
)
# Wrap instance heuristic to match expected signature (state, goal) -> float
def tunnel_aware_wrapper(state, goal):
return self.heuristic(state)
# Map strategy codes to search functions
strategy_map = {
"BF": lambda: bfs_search(self, visualize),
"DF": lambda: dfs_search(self, visualize),
"ID": lambda: ids_search(self, visualize),
"UC": lambda: ucs_search(self, visualize),
"GR1": lambda: greedy_search(self, manhattan_heuristic, visualize),
"GR2": lambda: greedy_search(self, euclidean_heuristic, visualize),
"AS1": lambda: astar_search(self, manhattan_heuristic, visualize),
"AS2": lambda: astar_search(
self, tunnel_aware_wrapper, visualize
), # Tunnel-aware
}
if strategy not in strategy_map:
raise ValueError(f"Unknown strategy: {strategy}")
return strategy_map[strategy]()
def graph_search(
problem: GenericSearch, frontier: Frontier, visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
"""
Generic graph search algorithm.
Args:
problem: The search problem to solve
frontier: The frontier data structure (Queue, Stack, or PriorityQueue)
visualize: If True, collect visualization steps
Returns:
Tuple of (PathResult, Optional[List[SearchStep]])
"""
# Initialize
start = problem.initial_state()
start_node = SearchNode(state=start, path_cost=0, depth=0)
frontier.push(start_node)
explored: set = set()
nodes_expanded = 0
steps: List[SearchStep] = [] if visualize else None
while not frontier.is_empty():
# Get next node
node = frontier.pop()
# Record step for visualization
if visualize:
steps.append(
SearchStep(
step_number=nodes_expanded,
current_node=node.state,
action=node.action,
frontier=frontier.get_states(),
explored=list(explored),
current_path=node.get_path(),
path_cost=node.path_cost,
)
)
# Goal test
if problem.goal_test(node.state):
return (
PathResult(
plan=node.get_solution(),
cost=node.path_cost,
nodes_expanded=nodes_expanded,
path=node.get_path(),
),
steps,
)
# Skip if already explored
if node.state in explored:
continue
# Mark as explored and count
explored.add(node.state)
nodes_expanded += 1
# Expand node
for action in problem.actions(node.state):
child_state = problem.result(node.state, action)
if child_state not in explored and not frontier.contains_state(child_state):
step_cost = problem.step_cost(node.state, action, child_state)
child = SearchNode(
state=child_state,
parent=node,
action=action,
path_cost=node.path_cost + step_cost,
depth=node.depth + 1,
)
frontier.push(child)
# No solution found
return (
PathResult(plan="", cost=float("inf"), nodes_expanded=nodes_expanded, path=[]),
steps,
)