TroglodyteDerivations's picture
Upload 47 files
80b67bf verified
import heapq
import math
import time
from collections import deque
from environment import Direction
class Node:
def __init__(self, position, parent=None, direction=None):
self.position = position
self.parent = parent
self.direction = direction # This should be a Direction enum
self.g = 0 # Cost from start to current node
self.h = 0 # Heuristic cost from current node to goal
self.f = 0 # Total cost (g + h)
def __eq__(self, other):
return self.position == other.position
def __lt__(self, other):
return self.f < other.f
class SearchAlgorithms:
def __init__(self, environment, turn_cost_enabled=False):
self.env = environment
self.turn_cost_enabled = turn_cost_enabled
def calculate_turn_cost(self, current_dir, new_dir):
"""Calculate turn cost between directions (0.5 for 90° turns)"""
if not self.turn_cost_enabled or current_dir is None or new_dir is None:
return 0
if current_dir == new_dir:
return 0
# Calculate the absolute difference in direction values
diff = abs(current_dir.value - new_dir.value)
# For 4-direction system, handle wrap-around (UP=0, LEFT=3)
if diff == 3: # This means UP to LEFT or LEFT to UP
diff = 1
if diff == 1: # 90° turn
return 0.5
elif diff == 2: # 180° turn
return 1.0
return 0
def get_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves: up, right, down, left
moves = [
(-1, 0, Direction.UP),
(0, 1, Direction.RIGHT),
(1, 0, Direction.DOWN),
(0, -1, Direction.LEFT)
]
for dr, dc, new_dir in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = self.calculate_turn_cost(direction, new_dir)
move_cost = 1 + turn_cost
neighbors.append(((new_row, new_col), new_dir, move_cost))
return neighbors
def get_diagonal_neighbors(self, position, direction=None):
row, col = position
neighbors = []
# Possible moves including diagonals
moves = [
(-1, 0, Direction.UP, 1),
(0, 1, Direction.RIGHT, 1),
(1, 0, Direction.DOWN, 1),
(0, -1, Direction.LEFT, 1),
(-1, -1, Direction.UP, math.sqrt(2)),
(-1, 1, Direction.UP, math.sqrt(2)),
(1, -1, Direction.DOWN, math.sqrt(2)),
(1, 1, Direction.DOWN, math.sqrt(2))
]
for dr, dc, new_dir, base_cost in moves:
new_row, new_col = row + dr, col + dc
if self.env.is_valid_position(new_row, new_col):
turn_cost = self.calculate_turn_cost(direction, new_dir)
move_cost = base_cost + turn_cost
neighbors.append(((new_row, new_col), new_dir, move_cost))
return neighbors
def manhattan_distance(self, pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def euclidean_distance(self, pos1, pos2):
return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2)
def chebyshev_distance(self, pos1, pos2):
return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1]))
def bfs(self, start, goals):
"""Breadth-First Search"""
start_time = time.time()
if not goals:
return None, set(), 0, 0
queue = deque([Node(start)])
visited = set([start])
explored = set([start])
nodes_expanded = 0
while queue:
current_node = queue.popleft()
nodes_expanded += 1
# Check if we reached any goal
if current_node.position in goals:
path = []
temp_node = current_node
while temp_node:
path.append(temp_node.position)
temp_node = temp_node.parent
computation_time = time.time() - start_time
return path[::-1], explored, nodes_expanded, computation_time
for neighbor_pos, new_dir, move_cost in self.get_neighbors(current_node.position):
if neighbor_pos not in visited:
visited.add(neighbor_pos)
explored.add(neighbor_pos)
new_node = Node(neighbor_pos, current_node, new_dir)
queue.append(new_node)
computation_time = time.time() - start_time
return None, explored, nodes_expanded, computation_time
def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False):
"""A* Search with different heuristics"""
start_time = time.time()
if not goals:
return None, set(), 0, 0
open_list = []
start_node = Node(start)
heapq.heappush(open_list, start_node)
closed_list = set()
explored = set([start])
nodes_expanded = 0
# Cost from start to node
g_costs = {start: 0}
# Keep track of directions for turn cost calculation
directions = {start: None}
while open_list:
current_node = heapq.heappop(open_list)
nodes_expanded += 1
# Check if we reached any goal
if current_node.position in goals:
path = []
temp_node = current_node
while temp_node:
path.append(temp_node.position)
temp_node = temp_node.parent
computation_time = time.time() - start_time
return path[::-1], explored, nodes_expanded, computation_time
closed_list.add(current_node.position)
current_dir = directions[current_node.position]
# Get neighbors based on whether diagonals are allowed
if allow_diagonals:
neighbors = self.get_diagonal_neighbors(current_node.position, current_dir)
else:
neighbors = self.get_neighbors(current_node.position, current_dir)
for neighbor_pos, direction, move_cost in neighbors:
if neighbor_pos in closed_list:
continue
# Calculate new g cost
new_g = g_costs[current_node.position] + move_cost
if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]:
# Calculate heuristic to the closest goal
min_h = float('inf')
for goal in goals:
if heuristic_type == "manhattan":
h = self.manhattan_distance(neighbor_pos, goal)
elif heuristic_type == "euclidean":
h = self.euclidean_distance(neighbor_pos, goal)
elif heuristic_type == "chebyshev":
h = self.chebyshev_distance(neighbor_pos, goal)
min_h = min(min_h, h)
# Create new node
new_node = Node(neighbor_pos, current_node, direction)
new_node.g = new_g
new_node.h = min_h
new_node.f = new_g + min_h
g_costs[neighbor_pos] = new_g
directions[neighbor_pos] = direction
explored.add(neighbor_pos)
heapq.heappush(open_list, new_node)
computation_time = time.time() - start_time
return None, explored, nodes_expanded, computation_time
def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"):
"""Find path to the nearest dirty cell"""
dirty_cells = list(self.env.dirty_cells)
if not dirty_cells:
return None, set(), 0, 0
if algorithm == "bfs":
return self.bfs(vacuum_pos, dirty_cells)
else:
# For A*, we need to decide whether to allow diagonals based on heuristic
allow_diagonals = heuristic in ["euclidean", "chebyshev"]
return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals)