File size: 8,704 Bytes
80b67bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
import heapq
import math
import time
from collections import deque
from environment import Direction
class Node:
def __init__(self, position, parent=None, direction=None):
self.position = position
self.parent = parent
self.direction = direction # This should be a Direction enum
self.g = 0 # Cost from start to current node
self.h = 0 # Heuristic cost from current node to goal
self.f = 0 # Total cost (g + h)
def __eq__(self, other):
return self.position == other.position
def __lt__(self, other):
return self.f < other.f
class SearchAlgorithms:
def __init__(self, environment, turn_cost_enabled=False):
self.env = environment
self.turn_cost_enabled = turn_cost_enabled
def calculate_turn_cost(self, current_dir, new_dir):
"""Calculate turn cost between directions (0.5 for 90° turns)"""
if not self.turn_cost_enabled or current_dir is None or new_dir is None:
return 0
if current_dir == new_dir:
return 0
# Calculate the absolute difference in direction values
diff = abs(current_dir.value - new_dir.value)
# For 4-direction system, handle wrap-around (UP=0, LEFT=3)
if diff == 3: # This means UP to LEFT or LEFT to UP
diff = 1
if diff == 1: # 90° turn
return 0.5
elif diff == 2: # 180° turn
return 1.0
return 0
def get_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves: up, right, down, left
moves = [
(-1, 0, Direction.UP),
(0, 1, Direction.RIGHT),
(1, 0, Direction.DOWN),
(0, -1, Direction.LEFT)
]
for dr, dc, new_dir in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = self.calculate_turn_cost(direction, new_dir)
move_cost = 1 + turn_cost
neighbors.append(((new_row, new_col), new_dir, move_cost))
return neighbors
def get_diagonal_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves including diagonals
moves = [
(-1, 0, Direction.UP, 1),
(0, 1, Direction.RIGHT, 1),
(1, 0, Direction.DOWN, 1),
(0, -1, Direction.LEFT, 1),
(-1, -1, Direction.UP, math.sqrt(2)),
(-1, 1, Direction.UP, math.sqrt(2)),
(1, -1, Direction.DOWN, math.sqrt(2)),
(1, 1, Direction.DOWN, math.sqrt(2))
]
for dr, dc, new_dir, base_cost in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = self.calculate_turn_cost(direction, new_dir)
move_cost = base_cost + turn_cost
neighbors.append(((new_row, new_col), new_dir, move_cost))
return neighbors
def manhattan_distance(self, pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def euclidean_distance(self, pos1, pos2):
return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2)
def chebyshev_distance(self, pos1, pos2):
return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1]))
def bfs(self, start, goals):
"""Breadth-First Search"""
start_time = time.time()
if not goals:
return None, set(), 0, 0
queue = deque([Node(start)])
visited = set([start])
explored = set([start])
nodes_expanded = 0
while queue:
current_node = queue.popleft()
nodes_expanded += 1
# Check if we reached any goal
if current_node.position in goals:
path = []
temp_node = current_node
while temp_node:
path.append(temp_node.position)
temp_node = temp_node.parent
computation_time = time.time() - start_time
return path[::-1], explored, nodes_expanded, computation_time
for neighbor_pos, new_dir, move_cost in self.get_neighbors(current_node.position):
if neighbor_pos not in visited:
visited.add(neighbor_pos)
explored.add(neighbor_pos)
new_node = Node(neighbor_pos, current_node, new_dir)
queue.append(new_node)
computation_time = time.time() - start_time
return None, explored, nodes_expanded, computation_time
def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False):
"""A* Search with different heuristics"""
start_time = time.time()
if not goals:
return None, set(), 0, 0
open_list = []
start_node = Node(start)
heapq.heappush(open_list, start_node)
closed_list = set()
explored = set([start])
nodes_expanded = 0
# Cost from start to node
g_costs = {start: 0}
# Keep track of directions for turn cost calculation
directions = {start: None}
while open_list:
current_node = heapq.heappop(open_list)
nodes_expanded += 1
# Check if we reached any goal
if current_node.position in goals:
path = []
temp_node = current_node
while temp_node:
path.append(temp_node.position)
temp_node = temp_node.parent
computation_time = time.time() - start_time
return path[::-1], explored, nodes_expanded, computation_time
closed_list.add(current_node.position)
current_dir = directions[current_node.position]
# Get neighbors based on whether diagonals are allowed
if allow_diagonals:
neighbors = self.get_diagonal_neighbors(current_node.position, current_dir)
else:
neighbors = self.get_neighbors(current_node.position, current_dir)
for neighbor_pos, direction, move_cost in neighbors:
if neighbor_pos in closed_list:
continue
# Calculate new g cost
new_g = g_costs[current_node.position] + move_cost
if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]:
# Calculate heuristic to the closest goal
min_h = float('inf')
for goal in goals:
if heuristic_type == "manhattan":
h = self.manhattan_distance(neighbor_pos, goal)
elif heuristic_type == "euclidean":
h = self.euclidean_distance(neighbor_pos, goal)
elif heuristic_type == "chebyshev":
h = self.chebyshev_distance(neighbor_pos, goal)
min_h = min(min_h, h)
# Create new node
new_node = Node(neighbor_pos, current_node, direction)
new_node.g = new_g
new_node.h = min_h
new_node.f = new_g + min_h
g_costs[neighbor_pos] = new_g
directions[neighbor_pos] = direction
explored.add(neighbor_pos)
heapq.heappush(open_list, new_node)
computation_time = time.time() - start_time
return None, explored, nodes_expanded, computation_time
def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"):
"""Find path to the nearest dirty cell"""
dirty_cells = list(self.env.dirty_cells)
if not dirty_cells:
return None, set(), 0, 0
if algorithm == "bfs":
return self.bfs(vacuum_pos, dirty_cells)
else:
# For A*, we need to decide whether to allow diagonals based on heuristic
allow_diagonals = heuristic in ["euclidean", "chebyshev"]
return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals) |