Spaces:
Sleeping
Sleeping
| """Breadth-First Search algorithm.""" | |
| from typing import Tuple, Optional, List, Generator, TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from ..core.generic_search import GenericSearch | |
| from ..core.node import SearchNode | |
| from ..core.frontier import QueueFrontier | |
| from ..models.state import PathResult, SearchStep | |
| def bfs_search( | |
| problem: 'GenericSearch', | |
| visualize: bool = False | |
| ) -> Tuple[PathResult, Optional[List[SearchStep]]]: | |
| """ | |
| Breadth-first search using FIFO queue. | |
| Finds path with minimum number of steps (not minimum cost). | |
| Complete and optimal for unweighted graphs. | |
| Args: | |
| problem: The search problem to solve | |
| visualize: If True, collect visualization steps | |
| Returns: | |
| Tuple of (PathResult, Optional[List[SearchStep]]) | |
| """ | |
| frontier = QueueFrontier() | |
| start = problem.initial_state() | |
| start_node = SearchNode(state=start, path_cost=0, depth=0) | |
| frontier.push(start_node) | |
| explored: set = set() | |
| nodes_expanded = 0 | |
| steps: List[SearchStep] = [] if visualize else None | |
| while not frontier.is_empty(): | |
| node = frontier.pop() | |
| # Record step for visualization | |
| if visualize: | |
| steps.append(SearchStep( | |
| step_number=nodes_expanded, | |
| current_node=node.state, | |
| action=node.action, | |
| frontier=frontier.get_states(), | |
| explored=list(explored), | |
| current_path=node.get_path(), | |
| path_cost=node.path_cost | |
| )) | |
| # Goal test after pop (standard BFS) | |
| if problem.goal_test(node.state): | |
| return PathResult( | |
| plan=node.get_solution(), | |
| cost=node.path_cost, | |
| nodes_expanded=nodes_expanded, | |
| path=node.get_path() | |
| ), steps | |
| # Skip if already explored | |
| if node.state in explored: | |
| continue | |
| explored.add(node.state) | |
| nodes_expanded += 1 | |
| # Expand node | |
| for action in problem.actions(node.state): | |
| child_state = problem.result(node.state, action) | |
| if child_state not in explored and not frontier.contains_state(child_state): | |
| step_cost = problem.step_cost(node.state, action, child_state) | |
| child = SearchNode( | |
| state=child_state, | |
| parent=node, | |
| action=action, | |
| path_cost=node.path_cost + step_cost, | |
| depth=node.depth + 1 | |
| ) | |
| frontier.push(child) | |
| # No solution found | |
| return PathResult( | |
| plan="", | |
| cost=float('inf'), | |
| nodes_expanded=nodes_expanded, | |
| path=[] | |
| ), steps | |
| def bfs_search_generator( | |
| problem: 'GenericSearch' | |
| ) -> Generator[SearchStep, None, PathResult]: | |
| """ | |
| Generator version of BFS that yields steps during execution. | |
| Args: | |
| problem: The search problem to solve | |
| Yields: | |
| SearchStep objects | |
| Returns: | |
| Final PathResult | |
| """ | |
| frontier = QueueFrontier() | |
| start = problem.initial_state() | |
| start_node = SearchNode(state=start, path_cost=0, depth=0) | |
| frontier.push(start_node) | |
| explored: set = set() | |
| nodes_expanded = 0 | |
| while not frontier.is_empty(): | |
| node = frontier.pop() | |
| yield SearchStep( | |
| step_number=nodes_expanded, | |
| current_node=node.state, | |
| action=node.action, | |
| frontier=frontier.get_states(), | |
| explored=list(explored), | |
| current_path=node.get_path(), | |
| path_cost=node.path_cost | |
| ) | |
| if problem.goal_test(node.state): | |
| return PathResult( | |
| plan=node.get_solution(), | |
| cost=node.path_cost, | |
| nodes_expanded=nodes_expanded, | |
| path=node.get_path() | |
| ) | |
| if node.state in explored: | |
| continue | |
| explored.add(node.state) | |
| nodes_expanded += 1 | |
| for action in problem.actions(node.state): | |
| child_state = problem.result(node.state, action) | |
| if child_state not in explored and not frontier.contains_state(child_state): | |
| step_cost = problem.step_cost(node.state, action, child_state) | |
| child = SearchNode( | |
| state=child_state, | |
| parent=node, | |
| action=action, | |
| path_cost=node.path_cost + step_cost, | |
| depth=node.depth + 1 | |
| ) | |
| frontier.push(child) | |
| return PathResult( | |
| plan="", | |
| cost=float('inf'), | |
| nodes_expanded=nodes_expanded, | |
| path=[] | |
| ) | |