"""Frontier data structures for search algorithms.""" from abc import ABC, abstractmethod from collections import deque import heapq from typing import List, Optional, Set, Dict from .node import SearchNode class Frontier(ABC): """Abstract base class for frontier data structures.""" @abstractmethod def push(self, node: SearchNode) -> None: """Add a node to the frontier.""" pass @abstractmethod def pop(self) -> SearchNode: """Remove and return the next node from the frontier.""" pass @abstractmethod def is_empty(self) -> bool: """Check if the frontier is empty.""" pass @abstractmethod def __len__(self) -> int: """Return the number of nodes in the frontier.""" pass @abstractmethod def contains_state(self, state) -> bool: """Check if a state is in the frontier.""" pass def get_states(self) -> List: """Get all states in the frontier (for visualization).""" return [] class QueueFrontier(Frontier): """FIFO queue frontier for Breadth-First Search.""" def __init__(self): self._queue: deque[SearchNode] = deque() self._states: Set = set() def push(self, node: SearchNode) -> None: self._queue.append(node) self._states.add(node.state) def pop(self) -> SearchNode: node = self._queue.popleft() self._states.discard(node.state) return node def is_empty(self) -> bool: return len(self._queue) == 0 def __len__(self) -> int: return len(self._queue) def contains_state(self, state) -> bool: return state in self._states def get_states(self) -> List: return [node.state for node in self._queue] class StackFrontier(Frontier): """LIFO stack frontier for Depth-First Search.""" def __init__(self): self._stack: List[SearchNode] = [] self._states: Set = set() def push(self, node: SearchNode) -> None: self._stack.append(node) self._states.add(node.state) def pop(self) -> SearchNode: node = self._stack.pop() self._states.discard(node.state) return node def is_empty(self) -> bool: return len(self._stack) == 0 def __len__(self) -> int: return len(self._stack) def contains_state(self, state) -> bool: return state in self._states def get_states(self) -> List: return [node.state for node in self._stack] class PriorityQueueFrontier(Frontier): """ Priority queue frontier for UCS, Greedy, and A* search. Uses heapq with a counter to break ties and ensure FIFO ordering for nodes with equal priority. """ def __init__(self): self._heap: List[tuple] = [] # (priority, counter, node) self._counter: int = 0 self._states: Dict = {} # state -> (priority, node) for updates self._removed: Set = set() # Track removed entries def push(self, node: SearchNode) -> None: """ Add a node to the priority queue. If a node with the same state already exists with higher priority, it will be updated. """ state = node.state priority = node.priority # If state exists with higher or equal priority, skip if state in self._states: existing_priority, _ = self._states[state] if existing_priority <= priority: return # Mark old entry as removed self._removed.add((existing_priority, state)) # Add new entry entry = (priority, self._counter, node) heapq.heappush(self._heap, entry) self._states[state] = (priority, node) self._counter += 1 def pop(self) -> SearchNode: """Remove and return the node with lowest priority.""" while self._heap: priority, _, node = heapq.heappop(self._heap) # Skip removed entries if (priority, node.state) in self._removed: self._removed.discard((priority, node.state)) continue # Skip if this is not the current entry for this state if node.state in self._states: current_priority, current_node = self._states[node.state] if current_priority != priority: continue del self._states[node.state] return node raise IndexError("Pop from empty frontier") def is_empty(self) -> bool: # Account for lazy deletions return len(self._states) == 0 def __len__(self) -> int: return len(self._states) def contains_state(self, state) -> bool: return state in self._states def get_node(self, state) -> Optional[SearchNode]: """Get the node for a given state if it exists.""" if state in self._states: _, node = self._states[state] return node return None def get_states(self) -> List: return list(self._states.keys()) def get_priority(self, state) -> Optional[float]: """Get the priority of a state if it exists.""" if state in self._states: priority, _ = self._states[state] return priority return None