Kacemath's picture
Simple deployment: Grid Search Pathfinding with frontend and backend
e067c2d
raw
history blame
5.62 kB
"""A* Search algorithm."""
from typing import Tuple, Optional, List, Generator, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from ..core.generic_search import GenericSearch
from ..core.node import SearchNode
from ..core.frontier import PriorityQueueFrontier
from ..models.state import PathResult, SearchStep
def astar_search(
problem: 'GenericSearch',
heuristic: Callable[[Tuple[int, int], Tuple[int, int]], float],
visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
"""
A* search using f(n) = g(n) + h(n).
Optimal if heuristic is admissible (never overestimates).
Complete if step costs are positive.
Args:
problem: The search problem to solve
heuristic: Function(state, goal) -> estimated cost to goal
visualize: If True, collect visualization steps
Returns:
Tuple of (PathResult, Optional[List[SearchStep]])
"""
frontier = PriorityQueueFrontier()
start = problem.initial_state()
# Get goal for heuristic calculation
goal = getattr(problem, 'goal', None)
h_value = heuristic(start, goal) if goal else 0
f_value = 0 + h_value # g(n) + h(n)
start_node = SearchNode(state=start, path_cost=0, depth=0, priority=f_value)
frontier.push(start_node)
explored: set = set()
nodes_expanded = 0
steps: List[SearchStep] = [] if visualize else None
while not frontier.is_empty():
node = frontier.pop()
# Record step for visualization
if visualize:
steps.append(SearchStep(
step_number=nodes_expanded,
current_node=node.state,
action=node.action,
frontier=frontier.get_states(),
explored=list(explored),
current_path=node.get_path(),
path_cost=node.path_cost
))
# Goal test
if problem.goal_test(node.state):
return PathResult(
plan=node.get_solution(),
cost=node.path_cost,
nodes_expanded=nodes_expanded,
path=node.get_path()
), steps
# Skip if already explored
if node.state in explored:
continue
explored.add(node.state)
nodes_expanded += 1
# Expand node
for action in problem.actions(node.state):
child_state = problem.result(node.state, action)
if child_state not in explored:
step_cost = problem.step_cost(node.state, action, child_state)
g_value = node.path_cost + step_cost
h_value = heuristic(child_state, goal) if goal else 0
f_value = g_value + h_value
child = SearchNode(
state=child_state,
parent=node,
action=action,
path_cost=g_value,
depth=node.depth + 1,
priority=f_value # Priority = f(n) = g(n) + h(n)
)
frontier.push(child)
# No solution found
return PathResult(
plan="",
cost=float('inf'),
nodes_expanded=nodes_expanded,
path=[]
), steps
def astar_search_generator(
problem: 'GenericSearch',
heuristic: Callable[[Tuple[int, int], Tuple[int, int]], float]
) -> Generator[SearchStep, None, PathResult]:
"""
Generator version of A* search that yields steps during execution.
Args:
problem: The search problem to solve
heuristic: Heuristic function
Yields:
SearchStep objects
Returns:
Final PathResult
"""
frontier = PriorityQueueFrontier()
start = problem.initial_state()
goal = getattr(problem, 'goal', None)
h_value = heuristic(start, goal) if goal else 0
f_value = 0 + h_value
start_node = SearchNode(state=start, path_cost=0, depth=0, priority=f_value)
frontier.push(start_node)
explored: set = set()
nodes_expanded = 0
while not frontier.is_empty():
node = frontier.pop()
yield SearchStep(
step_number=nodes_expanded,
current_node=node.state,
action=node.action,
frontier=frontier.get_states(),
explored=list(explored),
current_path=node.get_path(),
path_cost=node.path_cost
)
if problem.goal_test(node.state):
return PathResult(
plan=node.get_solution(),
cost=node.path_cost,
nodes_expanded=nodes_expanded,
path=node.get_path()
)
if node.state in explored:
continue
explored.add(node.state)
nodes_expanded += 1
for action in problem.actions(node.state):
child_state = problem.result(node.state, action)
if child_state not in explored:
step_cost = problem.step_cost(node.state, action, child_state)
g_value = node.path_cost + step_cost
h_value = heuristic(child_state, goal) if goal else 0
f_value = g_value + h_value
child = SearchNode(
state=child_state,
parent=node,
action=action,
path_cost=g_value,
depth=node.depth + 1,
priority=f_value
)
frontier.push(child)
return PathResult(
plan="",
cost=float('inf'),
nodes_expanded=nodes_expanded,
path=[]
)