|
|
import heapq |
|
|
import math |
|
|
import time |
|
|
from collections import deque |
|
|
from environment import Direction |
|
|
|
|
|
class Node: |
|
|
def __init__(self, position, parent=None, direction=None): |
|
|
self.position = position |
|
|
self.parent = parent |
|
|
self.direction = direction |
|
|
self.g = 0 |
|
|
self.h = 0 |
|
|
self.f = 0 |
|
|
|
|
|
def __eq__(self, other): |
|
|
return self.position == other.position |
|
|
|
|
|
def __lt__(self, other): |
|
|
return self.f < other.f |
|
|
|
|
|
class SearchAlgorithms: |
|
|
def __init__(self, environment, turn_cost_enabled=False): |
|
|
self.env = environment |
|
|
self.turn_cost_enabled = turn_cost_enabled |
|
|
|
|
|
def calculate_turn_cost(self, current_dir, new_dir): |
|
|
"""Calculate turn cost between directions (0.5 for 90° turns)""" |
|
|
if not self.turn_cost_enabled or current_dir is None or new_dir is None: |
|
|
return 0 |
|
|
|
|
|
if current_dir == new_dir: |
|
|
return 0 |
|
|
|
|
|
|
|
|
diff = abs(current_dir.value - new_dir.value) |
|
|
|
|
|
|
|
|
if diff == 3: |
|
|
diff = 1 |
|
|
|
|
|
if diff == 1: |
|
|
return 0.5 |
|
|
elif diff == 2: |
|
|
return 1.0 |
|
|
return 0 |
|
|
|
|
|
def get_neighbors(self, position, direction=None): |
|
|
row, col = position |
|
|
neighbors = [] |
|
|
|
|
|
|
|
|
moves = [ |
|
|
(-1, 0, Direction.UP), |
|
|
(0, 1, Direction.RIGHT), |
|
|
(1, 0, Direction.DOWN), |
|
|
(0, -1, Direction.LEFT) |
|
|
] |
|
|
|
|
|
for dr, dc, new_dir in moves: |
|
|
new_row, new_col = row + dr, col + dc |
|
|
if self.env.is_valid_position(new_row, new_col): |
|
|
turn_cost = self.calculate_turn_cost(direction, new_dir) |
|
|
move_cost = 1 + turn_cost |
|
|
neighbors.append(((new_row, new_col), new_dir, move_cost)) |
|
|
|
|
|
return neighbors |
|
|
|
|
|
def get_diagonal_neighbors(self, position, direction=None): |
|
|
row, col = position |
|
|
neighbors = [] |
|
|
|
|
|
|
|
|
moves = [ |
|
|
(-1, 0, Direction.UP, 1), |
|
|
(0, 1, Direction.RIGHT, 1), |
|
|
(1, 0, Direction.DOWN, 1), |
|
|
(0, -1, Direction.LEFT, 1), |
|
|
(-1, -1, Direction.UP, math.sqrt(2)), |
|
|
(-1, 1, Direction.UP, math.sqrt(2)), |
|
|
(1, -1, Direction.DOWN, math.sqrt(2)), |
|
|
(1, 1, Direction.DOWN, math.sqrt(2)) |
|
|
] |
|
|
|
|
|
for dr, dc, new_dir, base_cost in moves: |
|
|
new_row, new_col = row + dr, col + dc |
|
|
if self.env.is_valid_position(new_row, new_col): |
|
|
turn_cost = self.calculate_turn_cost(direction, new_dir) |
|
|
move_cost = base_cost + turn_cost |
|
|
neighbors.append(((new_row, new_col), new_dir, move_cost)) |
|
|
|
|
|
return neighbors |
|
|
|
|
|
def manhattan_distance(self, pos1, pos2): |
|
|
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) |
|
|
|
|
|
def euclidean_distance(self, pos1, pos2): |
|
|
return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2) |
|
|
|
|
|
def chebyshev_distance(self, pos1, pos2): |
|
|
return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1])) |
|
|
|
|
|
def bfs(self, start, goals): |
|
|
"""Breadth-First Search""" |
|
|
start_time = time.time() |
|
|
if not goals: |
|
|
return None, set(), 0, 0 |
|
|
|
|
|
queue = deque([Node(start)]) |
|
|
visited = set([start]) |
|
|
explored = set([start]) |
|
|
nodes_expanded = 0 |
|
|
|
|
|
while queue: |
|
|
current_node = queue.popleft() |
|
|
nodes_expanded += 1 |
|
|
|
|
|
|
|
|
if current_node.position in goals: |
|
|
path = [] |
|
|
temp_node = current_node |
|
|
while temp_node: |
|
|
path.append(temp_node.position) |
|
|
temp_node = temp_node.parent |
|
|
computation_time = time.time() - start_time |
|
|
return path[::-1], explored, nodes_expanded, computation_time |
|
|
|
|
|
for neighbor_pos, new_dir, move_cost in self.get_neighbors(current_node.position): |
|
|
if neighbor_pos not in visited: |
|
|
visited.add(neighbor_pos) |
|
|
explored.add(neighbor_pos) |
|
|
new_node = Node(neighbor_pos, current_node, new_dir) |
|
|
queue.append(new_node) |
|
|
|
|
|
computation_time = time.time() - start_time |
|
|
return None, explored, nodes_expanded, computation_time |
|
|
|
|
|
def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False): |
|
|
"""A* Search with different heuristics""" |
|
|
start_time = time.time() |
|
|
if not goals: |
|
|
return None, set(), 0, 0 |
|
|
|
|
|
open_list = [] |
|
|
start_node = Node(start) |
|
|
heapq.heappush(open_list, start_node) |
|
|
closed_list = set() |
|
|
explored = set([start]) |
|
|
nodes_expanded = 0 |
|
|
|
|
|
|
|
|
g_costs = {start: 0} |
|
|
|
|
|
directions = {start: None} |
|
|
|
|
|
while open_list: |
|
|
current_node = heapq.heappop(open_list) |
|
|
nodes_expanded += 1 |
|
|
|
|
|
|
|
|
if current_node.position in goals: |
|
|
path = [] |
|
|
temp_node = current_node |
|
|
while temp_node: |
|
|
path.append(temp_node.position) |
|
|
temp_node = temp_node.parent |
|
|
computation_time = time.time() - start_time |
|
|
return path[::-1], explored, nodes_expanded, computation_time |
|
|
|
|
|
closed_list.add(current_node.position) |
|
|
current_dir = directions[current_node.position] |
|
|
|
|
|
|
|
|
if allow_diagonals: |
|
|
neighbors = self.get_diagonal_neighbors(current_node.position, current_dir) |
|
|
else: |
|
|
neighbors = self.get_neighbors(current_node.position, current_dir) |
|
|
|
|
|
for neighbor_pos, direction, move_cost in neighbors: |
|
|
if neighbor_pos in closed_list: |
|
|
continue |
|
|
|
|
|
|
|
|
new_g = g_costs[current_node.position] + move_cost |
|
|
|
|
|
if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]: |
|
|
|
|
|
min_h = float('inf') |
|
|
for goal in goals: |
|
|
if heuristic_type == "manhattan": |
|
|
h = self.manhattan_distance(neighbor_pos, goal) |
|
|
elif heuristic_type == "euclidean": |
|
|
h = self.euclidean_distance(neighbor_pos, goal) |
|
|
elif heuristic_type == "chebyshev": |
|
|
h = self.chebyshev_distance(neighbor_pos, goal) |
|
|
min_h = min(min_h, h) |
|
|
|
|
|
|
|
|
new_node = Node(neighbor_pos, current_node, direction) |
|
|
new_node.g = new_g |
|
|
new_node.h = min_h |
|
|
new_node.f = new_g + min_h |
|
|
|
|
|
g_costs[neighbor_pos] = new_g |
|
|
directions[neighbor_pos] = direction |
|
|
explored.add(neighbor_pos) |
|
|
heapq.heappush(open_list, new_node) |
|
|
|
|
|
computation_time = time.time() - start_time |
|
|
return None, explored, nodes_expanded, computation_time |
|
|
|
|
|
def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"): |
|
|
"""Find path to the nearest dirty cell""" |
|
|
dirty_cells = list(self.env.dirty_cells) |
|
|
|
|
|
if not dirty_cells: |
|
|
return None, set(), 0, 0 |
|
|
|
|
|
if algorithm == "bfs": |
|
|
return self.bfs(vacuum_pos, dirty_cells) |
|
|
else: |
|
|
|
|
|
allow_diagonals = heuristic in ["euclidean", "chebyshev"] |
|
|
return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals) |