Spaces:
Sleeping
Sleeping
| """Frontier data structures for search algorithms.""" | |
| from abc import ABC, abstractmethod | |
| from collections import deque | |
| import heapq | |
| from typing import List, Optional, Set, Dict | |
| from .node import SearchNode | |
| class Frontier(ABC): | |
| """Abstract base class for frontier data structures.""" | |
| def push(self, node: SearchNode) -> None: | |
| """Add a node to the frontier.""" | |
| pass | |
| def pop(self) -> SearchNode: | |
| """Remove and return the next node from the frontier.""" | |
| pass | |
| def is_empty(self) -> bool: | |
| """Check if the frontier is empty.""" | |
| pass | |
| def __len__(self) -> int: | |
| """Return the number of nodes in the frontier.""" | |
| pass | |
| def contains_state(self, state) -> bool: | |
| """Check if a state is in the frontier.""" | |
| pass | |
| def get_states(self) -> List: | |
| """Get all states in the frontier (for visualization).""" | |
| return [] | |
| class QueueFrontier(Frontier): | |
| """FIFO queue frontier for Breadth-First Search.""" | |
| def __init__(self): | |
| self._queue: deque[SearchNode] = deque() | |
| self._states: Set = set() | |
| def push(self, node: SearchNode) -> None: | |
| self._queue.append(node) | |
| self._states.add(node.state) | |
| def pop(self) -> SearchNode: | |
| node = self._queue.popleft() | |
| self._states.discard(node.state) | |
| return node | |
| def is_empty(self) -> bool: | |
| return len(self._queue) == 0 | |
| def __len__(self) -> int: | |
| return len(self._queue) | |
| def contains_state(self, state) -> bool: | |
| return state in self._states | |
| def get_states(self) -> List: | |
| return [node.state for node in self._queue] | |
| class StackFrontier(Frontier): | |
| """LIFO stack frontier for Depth-First Search.""" | |
| def __init__(self): | |
| self._stack: List[SearchNode] = [] | |
| self._states: Set = set() | |
| def push(self, node: SearchNode) -> None: | |
| self._stack.append(node) | |
| self._states.add(node.state) | |
| def pop(self) -> SearchNode: | |
| node = self._stack.pop() | |
| self._states.discard(node.state) | |
| return node | |
| def is_empty(self) -> bool: | |
| return len(self._stack) == 0 | |
| def __len__(self) -> int: | |
| return len(self._stack) | |
| def contains_state(self, state) -> bool: | |
| return state in self._states | |
| def get_states(self) -> List: | |
| return [node.state for node in self._stack] | |
| class PriorityQueueFrontier(Frontier): | |
| """ | |
| Priority queue frontier for UCS, Greedy, and A* search. | |
| Uses heapq with a counter to break ties and ensure FIFO ordering for nodes with equal priority. | |
| """ | |
| def __init__(self): | |
| self._heap: List[tuple] = [] # (priority, counter, node) | |
| self._counter: int = 0 | |
| self._states: Dict = {} # state -> (priority, node) for updates | |
| self._removed: Set = set() # Track removed entries | |
| def push(self, node: SearchNode) -> None: | |
| """ | |
| Add a node to the priority queue. | |
| If a node with the same state already exists with higher priority, it will be updated. | |
| """ | |
| state = node.state | |
| priority = node.priority | |
| # If state exists with higher or equal priority, skip | |
| if state in self._states: | |
| existing_priority, _ = self._states[state] | |
| if existing_priority <= priority: | |
| return | |
| # Mark old entry as removed | |
| self._removed.add((existing_priority, state)) | |
| # Add new entry | |
| entry = (priority, self._counter, node) | |
| heapq.heappush(self._heap, entry) | |
| self._states[state] = (priority, node) | |
| self._counter += 1 | |
| def pop(self) -> SearchNode: | |
| """Remove and return the node with lowest priority.""" | |
| while self._heap: | |
| priority, _, node = heapq.heappop(self._heap) | |
| # Skip removed entries | |
| if (priority, node.state) in self._removed: | |
| self._removed.discard((priority, node.state)) | |
| continue | |
| # Skip if this is not the current entry for this state | |
| if node.state in self._states: | |
| current_priority, current_node = self._states[node.state] | |
| if current_priority != priority: | |
| continue | |
| del self._states[node.state] | |
| return node | |
| raise IndexError("Pop from empty frontier") | |
| def is_empty(self) -> bool: | |
| # Account for lazy deletions | |
| return len(self._states) == 0 | |
| def __len__(self) -> int: | |
| return len(self._states) | |
| def contains_state(self, state) -> bool: | |
| return state in self._states | |
| def get_node(self, state) -> Optional[SearchNode]: | |
| """Get the node for a given state if it exists.""" | |
| if state in self._states: | |
| _, node = self._states[state] | |
| return node | |
| return None | |
| def get_states(self) -> List: | |
| return list(self._states.keys()) | |
| def get_priority(self, state) -> Optional[float]: | |
| """Get the priority of a state if it exists.""" | |
| if state in self._states: | |
| priority, _ = self._states[state] | |
| return priority | |
| return None | |