File size: 7,007 Bytes
80b67bf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | import heapq
import math
from collections import deque
from environment import Direction
class Node:
def __init__(self, position, parent=None, direction=None):
self.position = position
self.parent = parent
self.direction = direction
self.g = 0 # Cost from start to current node
self.h = 0 # Heuristic cost from current node to goal
self.f = 0 # Total cost (g + h)
def __eq__(self, other):
return self.position == other.position
def __lt__(self, other):
return self.f < other.f
class SearchAlgorithms:
def __init__(self, environment, turn_cost_enabled=False):
self.env = environment
self.turn_cost_enabled = turn_cost_enabled
def get_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves: up, right, down, left
moves = [(-1, 0, Direction.UP), (0, 1, Direction.RIGHT),
(1, 0, Direction.DOWN), (0, -1, Direction.LEFT)]
for dr, dc, new_dir in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = 0
if self.turn_cost_enabled and direction is not None and direction != new_dir:
# Calculate turn cost (0.5 for 90° turns)
turn_cost = 0.5
neighbors.append(((new_row, new_col), new_dir, 1 + turn_cost))
return neighbors
def get_diagonal_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves including diagonals
moves = [
(-1, 0, Direction.UP, 1), (0, 1, Direction.RIGHT, 1),
(1, 0, Direction.DOWN, 1), (0, -1, Direction.LEFT, 1),
(-1, -1, None, math.sqrt(2)), (-1, 1, None, math.sqrt(2)),
(1, -1, None, math.sqrt(2)), (1, 1, None, math.sqrt(2))
]
for dr, dc, new_dir, cost in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = 0
if self.turn_cost_enabled and direction is not None and direction != new_dir and new_dir is not None:
turn_cost = 0.5
neighbors.append(((new_row, new_col), new_dir, cost + turn_cost))
return neighbors
def manhattan_distance(self, pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def euclidean_distance(self, pos1, pos2):
return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2)
def chebyshev_distance(self, pos1, pos2):
return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1]))
def bfs(self, start, goals):
"""Breadth-First Search"""
if not goals:
return None, set()
queue = deque([Node(start)])
visited = set([start])
explored = set([start])
while queue:
current_node = queue.popleft()
# Check if we reached any goal
if current_node.position in goals:
path = []
while current_node:
path.append(current_node.position)
current_node = current_node.parent
return path[::-1], explored
for neighbor, _, _ in self.get_neighbors(current_node.position):
if neighbor not in visited:
visited.add(neighbor)
explored.add(neighbor)
queue.append(Node(neighbor, current_node))
return None, explored
def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False):
"""A* Search with different heuristics"""
if not goals:
return None, set()
open_list = []
heapq.heappush(open_list, Node(start))
closed_list = set()
explored = set([start])
# Cost from start to node
g_costs = {start: 0}
while open_list:
current_node = heapq.heappop(open_list)
# Check if we reached any goal
if current_node.position in goals:
path = []
while current_node:
path.append(current_node.position)
current_node = current_node.parent
return path[::-1], explored
closed_list.add(current_node.position)
# Get neighbors based on whether diagonals are allowed
if allow_diagonals:
neighbors = self.get_diagonal_neighbors(current_node.position, current_node.direction)
else:
neighbors = self.get_neighbors(current_node.position, current_node.direction)
for neighbor_pos, direction, move_cost in neighbors:
if neighbor_pos in closed_list:
continue
# Calculate new g cost
new_g = g_costs[current_node.position] + move_cost
if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]:
# Calculate heuristic to the closest goal
min_h = float('inf')
for goal in goals:
if heuristic_type == "manhattan":
h = self.manhattan_distance(neighbor_pos, goal)
elif heuristic_type == "euclidean":
h = self.euclidean_distance(neighbor_pos, goal)
elif heuristic_type == "chebyshev":
h = self.chebyshev_distance(neighbor_pos, goal)
min_h = min(min_h, h)
# Create new node
new_node = Node(neighbor_pos, current_node, direction)
new_node.g = new_g
new_node.h = min_h
new_node.f = new_g + min_h
g_costs[neighbor_pos] = new_g
explored.add(neighbor_pos)
heapq.heappush(open_list, new_node)
return None, explored
def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"):
"""Find path to the nearest dirty cell"""
dirty_cells = list(self.env.dirty_cells)
if algorithm == "bfs":
return self.bfs(vacuum_pos, dirty_cells)
else:
# For A*, we need to decide whether to allow diagonals based on heuristic
allow_diagonals = heuristic in ["euclidean", "chebyshev"]
return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals) |