Spaces:
Sleeping
Sleeping
File size: 6,022 Bytes
e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d 47bba68 e067c2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""Generic search problem abstract base class."""
from abc import ABC, abstractmethod
from typing import List, Tuple, Optional
from .node import SearchNode
from .frontier import Frontier
from ..models.state import PathResult, SearchStep
class GenericSearch(ABC):
"""
Abstract base class for search problems.
Subclasses must implement:
- initial_state(): Return the starting state
- goal_test(state): Return True if state is a goal
- actions(state): Return list of valid actions from state
- result(state, action): Return new state after action
- step_cost(state, action, next_state): Return cost of action
"""
@abstractmethod
def initial_state(self) -> Tuple[int, int]:
"""Return the initial state."""
pass
@abstractmethod
def goal_test(self, state: Tuple[int, int]) -> bool:
"""Return True if state is a goal state."""
pass
@abstractmethod
def actions(self, state: Tuple[int, int]) -> List[str]:
"""Return list of valid actions from given state."""
pass
@abstractmethod
def result(self, state: Tuple[int, int], action: str) -> Tuple[int, int]:
"""Return the state resulting from taking action in given state."""
pass
@abstractmethod
def step_cost(
self, state: Tuple[int, int], action: str, next_state: Tuple[int, int]
) -> float:
"""Return the cost of taking action from state to next_state."""
pass
def heuristic(self, state: Tuple[int, int]) -> float:
"""
Heuristic function h(n) estimating cost from state to goal.
Override in subclass for informed search.
"""
return 0.0
def solve(
self, strategy: str, visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
"""
Solve the search problem using the specified strategy.
Args:
strategy: One of 'BF', 'DF', 'ID', 'UC', 'GR1', 'GR2', 'AS1', 'AS2'
visualize: If True, collect visualization steps
Returns:
Tuple of (PathResult, Optional[List[SearchStep]])
"""
from ..algorithms import (
bfs_search,
dfs_search,
ids_search,
ucs_search,
greedy_search,
astar_search,
)
from ..heuristics import (
manhattan_heuristic,
euclidean_heuristic,
)
# Wrap instance heuristic to match expected signature (state, goal) -> float
def tunnel_aware_wrapper(state, goal):
return self.heuristic(state)
# Map strategy codes to search functions
strategy_map = {
"BF": lambda: bfs_search(self, visualize),
"DF": lambda: dfs_search(self, visualize),
"ID": lambda: ids_search(self, visualize),
"UC": lambda: ucs_search(self, visualize),
"GR1": lambda: greedy_search(self, manhattan_heuristic, visualize),
"GR2": lambda: greedy_search(self, euclidean_heuristic, visualize),
"AS1": lambda: astar_search(self, manhattan_heuristic, visualize),
"AS2": lambda: astar_search(
self, tunnel_aware_wrapper, visualize
), # Tunnel-aware
}
if strategy not in strategy_map:
raise ValueError(f"Unknown strategy: {strategy}")
return strategy_map[strategy]()
def graph_search(
problem: GenericSearch, frontier: Frontier, visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
"""
Generic graph search algorithm.
Args:
problem: The search problem to solve
frontier: The frontier data structure (Queue, Stack, or PriorityQueue)
visualize: If True, collect visualization steps
Returns:
Tuple of (PathResult, Optional[List[SearchStep]])
"""
# Initialize
start = problem.initial_state()
start_node = SearchNode(state=start, path_cost=0, depth=0)
frontier.push(start_node)
explored: set = set()
nodes_expanded = 0
steps: List[SearchStep] = [] if visualize else None
while not frontier.is_empty():
# Get next node
node = frontier.pop()
# Record step for visualization
if visualize:
steps.append(
SearchStep(
step_number=nodes_expanded,
current_node=node.state,
action=node.action,
frontier=frontier.get_states(),
explored=list(explored),
current_path=node.get_path(),
path_cost=node.path_cost,
)
)
# Goal test
if problem.goal_test(node.state):
return (
PathResult(
plan=node.get_solution(),
cost=node.path_cost,
nodes_expanded=nodes_expanded,
path=node.get_path(),
),
steps,
)
# Skip if already explored
if node.state in explored:
continue
# Mark as explored and count
explored.add(node.state)
nodes_expanded += 1
# Expand node
for action in problem.actions(node.state):
child_state = problem.result(node.state, action)
if child_state not in explored and not frontier.contains_state(child_state):
step_cost = problem.step_cost(node.state, action, child_state)
child = SearchNode(
state=child_state,
parent=node,
action=action,
path_cost=node.path_cost + step_cost,
depth=node.depth + 1,
)
frontier.push(child)
# No solution found
return (
PathResult(plan="", cost=float("inf"), nodes_expanded=nodes_expanded, path=[]),
steps,
)
|