import heapq import math import time from collections import deque from environment import Direction class Node: def __init__(self, position, parent=None, direction=None): self.position = position self.parent = parent self.direction = direction # This should be a Direction enum self.g = 0 # Cost from start to current node self.h = 0 # Heuristic cost from current node to goal self.f = 0 # Total cost (g + h) def __eq__(self, other): return self.position == other.position def __lt__(self, other): return self.f < other.f class SearchAlgorithms: def __init__(self, environment, turn_cost_enabled=False): self.env = environment self.turn_cost_enabled = turn_cost_enabled def calculate_turn_cost(self, current_dir, new_dir): """Calculate turn cost between directions (0.5 for 90° turns)""" if not self.turn_cost_enabled or current_dir is None or new_dir is None: return 0 if current_dir == new_dir: return 0 # Calculate the absolute difference in direction values diff = abs(current_dir.value - new_dir.value) # For 4-direction system, handle wrap-around (UP=0, LEFT=3) if diff == 3: # This means UP to LEFT or LEFT to UP diff = 1 if diff == 1: # 90° turn return 0.5 elif diff == 2: # 180° turn return 1.0 return 0 def get_neighbors(self, position, direction=None): row, col = position neighbors = [] # Possible moves: up, right, down, left moves = [ (-1, 0, Direction.UP), (0, 1, Direction.RIGHT), (1, 0, Direction.DOWN), (0, -1, Direction.LEFT) ] for dr, dc, new_dir in moves: new_row, new_col = row + dr, col + dc if self.env.is_valid_position(new_row, new_col): turn_cost = self.calculate_turn_cost(direction, new_dir) move_cost = 1 + turn_cost neighbors.append(((new_row, new_col), new_dir, move_cost)) return neighbors def get_diagonal_neighbors(self, position, direction=None): row, col = position neighbors = [] # Possible moves including diagonals moves = [ (-1, 0, Direction.UP, 1), (0, 1, Direction.RIGHT, 1), (1, 0, Direction.DOWN, 1), (0, -1, Direction.LEFT, 1), (-1, -1, Direction.UP, math.sqrt(2)), (-1, 1, Direction.UP, math.sqrt(2)), (1, -1, Direction.DOWN, math.sqrt(2)), (1, 1, Direction.DOWN, math.sqrt(2)) ] for dr, dc, new_dir, base_cost in moves: new_row, new_col = row + dr, col + dc if self.env.is_valid_position(new_row, new_col): turn_cost = self.calculate_turn_cost(direction, new_dir) move_cost = base_cost + turn_cost neighbors.append(((new_row, new_col), new_dir, move_cost)) return neighbors def manhattan_distance(self, pos1, pos2): return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) def euclidean_distance(self, pos1, pos2): return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2) def chebyshev_distance(self, pos1, pos2): return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1])) def bfs(self, start, goals): """Breadth-First Search""" start_time = time.time() if not goals: return None, set(), 0, 0 queue = deque([Node(start)]) visited = set([start]) explored = set([start]) nodes_expanded = 0 while queue: current_node = queue.popleft() nodes_expanded += 1 # Check if we reached any goal if current_node.position in goals: path = [] temp_node = current_node while temp_node: path.append(temp_node.position) temp_node = temp_node.parent computation_time = time.time() - start_time return path[::-1], explored, nodes_expanded, computation_time for neighbor_pos, new_dir, move_cost in self.get_neighbors(current_node.position): if neighbor_pos not in visited: visited.add(neighbor_pos) explored.add(neighbor_pos) new_node = Node(neighbor_pos, current_node, new_dir) queue.append(new_node) computation_time = time.time() - start_time return None, explored, nodes_expanded, computation_time def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False): """A* Search with different heuristics""" start_time = time.time() if not goals: return None, set(), 0, 0 open_list = [] start_node = Node(start) heapq.heappush(open_list, start_node) closed_list = set() explored = set([start]) nodes_expanded = 0 # Cost from start to node g_costs = {start: 0} # Keep track of directions for turn cost calculation directions = {start: None} while open_list: current_node = heapq.heappop(open_list) nodes_expanded += 1 # Check if we reached any goal if current_node.position in goals: path = [] temp_node = current_node while temp_node: path.append(temp_node.position) temp_node = temp_node.parent computation_time = time.time() - start_time return path[::-1], explored, nodes_expanded, computation_time closed_list.add(current_node.position) current_dir = directions[current_node.position] # Get neighbors based on whether diagonals are allowed if allow_diagonals: neighbors = self.get_diagonal_neighbors(current_node.position, current_dir) else: neighbors = self.get_neighbors(current_node.position, current_dir) for neighbor_pos, direction, move_cost in neighbors: if neighbor_pos in closed_list: continue # Calculate new g cost new_g = g_costs[current_node.position] + move_cost if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]: # Calculate heuristic to the closest goal min_h = float('inf') for goal in goals: if heuristic_type == "manhattan": h = self.manhattan_distance(neighbor_pos, goal) elif heuristic_type == "euclidean": h = self.euclidean_distance(neighbor_pos, goal) elif heuristic_type == "chebyshev": h = self.chebyshev_distance(neighbor_pos, goal) min_h = min(min_h, h) # Create new node new_node = Node(neighbor_pos, current_node, direction) new_node.g = new_g new_node.h = min_h new_node.f = new_g + min_h g_costs[neighbor_pos] = new_g directions[neighbor_pos] = direction explored.add(neighbor_pos) heapq.heappush(open_list, new_node) computation_time = time.time() - start_time return None, explored, nodes_expanded, computation_time def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"): """Find path to the nearest dirty cell""" dirty_cells = list(self.env.dirty_cells) if not dirty_cells: return None, set(), 0, 0 if algorithm == "bfs": return self.bfs(vacuum_pos, dirty_cells) else: # For A*, we need to decide whether to allow diagonals based on heuristic allow_diagonals = heuristic in ["euclidean", "chebyshev"] return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals)