Spaces:
Sleeping
Sleeping
| """Iterative Deepening Search algorithm.""" | |
| from typing import Tuple, Optional, List, Generator, TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from ..core.generic_search import GenericSearch | |
| from ..core.node import SearchNode | |
| from ..models.state import PathResult, SearchStep | |
| # Sentinel values for DLS results | |
| CUTOFF = "cutoff" | |
| FAILURE = "failure" | |
| def depth_limited_search( | |
| problem: 'GenericSearch', | |
| limit: int, | |
| visualize: bool = False, | |
| steps: Optional[List[SearchStep]] = None, | |
| base_expanded: int = 0 | |
| ) -> Tuple[Optional[PathResult], str, int, Optional[List[SearchStep]]]: | |
| """ | |
| Depth-limited search - DFS with depth limit. | |
| Args: | |
| problem: The search problem | |
| limit: Maximum depth to search | |
| visualize: If True, collect visualization steps | |
| steps: Existing steps list to append to | |
| base_expanded: Starting count for nodes expanded | |
| Returns: | |
| Tuple of (PathResult or None, status, nodes_expanded, steps) | |
| status is CUTOFF if limit reached, FAILURE if no solution exists | |
| """ | |
| start = problem.initial_state() | |
| start_node = SearchNode(state=start, path_cost=0, depth=0) | |
| return _recursive_dls( | |
| problem, start_node, limit, set(), visualize, | |
| steps if steps is not None else ([] if visualize else None), | |
| base_expanded | |
| ) | |
| def _recursive_dls( | |
| problem: 'GenericSearch', | |
| node: SearchNode, | |
| limit: int, | |
| explored: set, | |
| visualize: bool, | |
| steps: Optional[List[SearchStep]], | |
| nodes_expanded: int | |
| ) -> Tuple[Optional[PathResult], str, int, Optional[List[SearchStep]]]: | |
| """Recursive helper for depth-limited search.""" | |
| # Record step for visualization | |
| if visualize and steps is not None: | |
| steps.append(SearchStep( | |
| step_number=nodes_expanded, | |
| current_node=node.state, | |
| action=node.action, | |
| frontier=[], # DLS doesn't maintain explicit frontier | |
| explored=list(explored), | |
| current_path=node.get_path(), | |
| path_cost=node.path_cost | |
| )) | |
| # Goal test | |
| if problem.goal_test(node.state): | |
| return PathResult( | |
| plan=node.get_solution(), | |
| cost=node.path_cost, | |
| nodes_expanded=nodes_expanded, | |
| path=node.get_path() | |
| ), "success", nodes_expanded, steps | |
| # Depth limit reached | |
| if node.depth >= limit: | |
| return None, CUTOFF, nodes_expanded, steps | |
| # Mark as explored for this path | |
| explored.add(node.state) | |
| nodes_expanded += 1 | |
| cutoff_occurred = False | |
| # Expand node | |
| for action in problem.actions(node.state): | |
| child_state = problem.result(node.state, action) | |
| if child_state not in explored: | |
| step_cost = problem.step_cost(node.state, action, child_state) | |
| child = SearchNode( | |
| state=child_state, | |
| parent=node, | |
| action=action, | |
| path_cost=node.path_cost + step_cost, | |
| depth=node.depth + 1 | |
| ) | |
| result, status, nodes_expanded, steps = _recursive_dls( | |
| problem, child, limit, explored.copy(), visualize, steps, nodes_expanded | |
| ) | |
| if status == "success": | |
| return result, status, nodes_expanded, steps | |
| elif status == CUTOFF: | |
| cutoff_occurred = True | |
| if cutoff_occurred: | |
| return None, CUTOFF, nodes_expanded, steps | |
| else: | |
| return None, FAILURE, nodes_expanded, steps | |
| def ids_search( | |
| problem: 'GenericSearch', | |
| visualize: bool = False, | |
| max_depth: int = 1000 | |
| ) -> Tuple[PathResult, Optional[List[SearchStep]]]: | |
| """ | |
| Iterative deepening search - repeated DLS with increasing depth. | |
| Combines BFS's completeness and optimality (for unweighted) | |
| with DFS's space efficiency. | |
| Args: | |
| problem: The search problem to solve | |
| visualize: If True, collect visualization steps | |
| max_depth: Maximum depth to search (prevents infinite loops) | |
| Returns: | |
| Tuple of (PathResult, Optional[List[SearchStep]]) | |
| """ | |
| total_expanded = 0 | |
| all_steps: List[SearchStep] = [] if visualize else None | |
| for depth in range(max_depth): | |
| result, status, expanded, steps = depth_limited_search( | |
| problem, depth, visualize, all_steps, total_expanded | |
| ) | |
| total_expanded = expanded | |
| if visualize and steps: | |
| all_steps = steps | |
| if status == "success" and result is not None: | |
| result.nodes_expanded = total_expanded | |
| return result, all_steps | |
| elif status == FAILURE: | |
| # No solution exists | |
| break | |
| # No solution found within max_depth | |
| return PathResult( | |
| plan="", | |
| cost=float('inf'), | |
| nodes_expanded=total_expanded, | |
| path=[] | |
| ), all_steps | |
| def ids_search_generator( | |
| problem: 'GenericSearch', | |
| max_depth: int = 1000 | |
| ) -> Generator[SearchStep, None, PathResult]: | |
| """ | |
| Generator version of IDS that yields steps during execution. | |
| Args: | |
| problem: The search problem to solve | |
| max_depth: Maximum depth to search | |
| Yields: | |
| SearchStep objects | |
| Returns: | |
| Final PathResult | |
| """ | |
| total_expanded = 0 | |
| for depth in range(max_depth): | |
| # Run DLS and yield steps | |
| for step in _dls_generator(problem, depth, total_expanded): | |
| yield step | |
| total_expanded = step.step_number | |
| # Check if solution was found at this depth | |
| result, status, expanded, _ = depth_limited_search( | |
| problem, depth, False, None, total_expanded | |
| ) | |
| total_expanded = expanded | |
| if status == "success" and result is not None: | |
| result.nodes_expanded = total_expanded | |
| return result | |
| elif status == FAILURE: | |
| break | |
| return PathResult( | |
| plan="", | |
| cost=float('inf'), | |
| nodes_expanded=total_expanded, | |
| path=[] | |
| ) | |
| def _dls_generator( | |
| problem: 'GenericSearch', | |
| limit: int, | |
| base_expanded: int | |
| ) -> Generator[SearchStep, None, None]: | |
| """Generator helper for DLS.""" | |
| start = problem.initial_state() | |
| start_node = SearchNode(state=start, path_cost=0, depth=0) | |
| stack = [(start_node, set())] | |
| nodes_expanded = base_expanded | |
| while stack: | |
| node, explored = stack.pop() | |
| yield SearchStep( | |
| step_number=nodes_expanded, | |
| current_node=node.state, | |
| action=node.action, | |
| frontier=[n.state for n, _ in stack], | |
| explored=list(explored), | |
| current_path=node.get_path(), | |
| path_cost=node.path_cost | |
| ) | |
| if problem.goal_test(node.state): | |
| return | |
| if node.depth >= limit: | |
| continue | |
| explored = explored | {node.state} | |
| nodes_expanded += 1 | |
| for action in reversed(problem.actions(node.state)): | |
| child_state = problem.result(node.state, action) | |
| if child_state not in explored: | |
| step_cost = problem.step_cost(node.state, action, child_state) | |
| child = SearchNode( | |
| state=child_state, | |
| parent=node, | |
| action=action, | |
| path_cost=node.path_cost + step_cost, | |
| depth=node.depth + 1 | |
| ) | |
| stack.append((child, explored)) | |