| | import heapq |
| | import math |
| | from collections import deque |
| | from environment import Direction |
| |
|
| | class Node: |
| | def __init__(self, position, parent=None, direction=None): |
| | self.position = position |
| | self.parent = parent |
| | self.direction = direction |
| | self.g = 0 |
| | self.h = 0 |
| | self.f = 0 |
| | |
| | def __eq__(self, other): |
| | return self.position == other.position |
| | |
| | def __lt__(self, other): |
| | return self.f < other.f |
| |
|
| | class SearchAlgorithms: |
| | def __init__(self, environment, turn_cost_enabled=False): |
| | self.env = environment |
| | self.turn_cost_enabled = turn_cost_enabled |
| | |
| | def get_neighbors(self, position, direction=None): |
| | row, col = position |
| | neighbors = [] |
| | |
| | |
| | moves = [(-1, 0, Direction.UP), (0, 1, Direction.RIGHT), |
| | (1, 0, Direction.DOWN), (0, -1, Direction.LEFT)] |
| | |
| | for dr, dc, new_dir in moves: |
| | new_row, new_col = row + dr, col + dc |
| | if self.env.is_valid_position(new_row, new_col): |
| | turn_cost = 0 |
| | if self.turn_cost_enabled and direction is not None and direction != new_dir: |
| | |
| | turn_cost = 0.5 |
| | |
| | neighbors.append(((new_row, new_col), new_dir, 1 + turn_cost)) |
| | |
| | return neighbors |
| | |
| | def get_diagonal_neighbors(self, position, direction=None): |
| | row, col = position |
| | neighbors = [] |
| | |
| | |
| | moves = [ |
| | (-1, 0, Direction.UP, 1), (0, 1, Direction.RIGHT, 1), |
| | (1, 0, Direction.DOWN, 1), (0, -1, Direction.LEFT, 1), |
| | (-1, -1, None, math.sqrt(2)), (-1, 1, None, math.sqrt(2)), |
| | (1, -1, None, math.sqrt(2)), (1, 1, None, math.sqrt(2)) |
| | ] |
| | |
| | for dr, dc, new_dir, cost in moves: |
| | new_row, new_col = row + dr, col + dc |
| | if self.env.is_valid_position(new_row, new_col): |
| | turn_cost = 0 |
| | if self.turn_cost_enabled and direction is not None and direction != new_dir and new_dir is not None: |
| | turn_cost = 0.5 |
| | |
| | neighbors.append(((new_row, new_col), new_dir, cost + turn_cost)) |
| | |
| | return neighbors |
| | |
| | def manhattan_distance(self, pos1, pos2): |
| | return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) |
| | |
| | def euclidean_distance(self, pos1, pos2): |
| | return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2) |
| | |
| | def chebyshev_distance(self, pos1, pos2): |
| | return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1])) |
| | |
| | def bfs(self, start, goals): |
| | """Breadth-First Search""" |
| | if not goals: |
| | return None, set() |
| | |
| | queue = deque([Node(start)]) |
| | visited = set([start]) |
| | explored = set([start]) |
| | |
| | while queue: |
| | current_node = queue.popleft() |
| | |
| | |
| | if current_node.position in goals: |
| | path = [] |
| | while current_node: |
| | path.append(current_node.position) |
| | current_node = current_node.parent |
| | return path[::-1], explored |
| | |
| | for neighbor, _, _ in self.get_neighbors(current_node.position): |
| | if neighbor not in visited: |
| | visited.add(neighbor) |
| | explored.add(neighbor) |
| | queue.append(Node(neighbor, current_node)) |
| | |
| | return None, explored |
| | |
| | def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False): |
| | """A* Search with different heuristics""" |
| | if not goals: |
| | return None, set() |
| | |
| | open_list = [] |
| | heapq.heappush(open_list, Node(start)) |
| | closed_list = set() |
| | explored = set([start]) |
| | |
| | |
| | g_costs = {start: 0} |
| | |
| | while open_list: |
| | current_node = heapq.heappop(open_list) |
| | |
| | |
| | if current_node.position in goals: |
| | path = [] |
| | while current_node: |
| | path.append(current_node.position) |
| | current_node = current_node.parent |
| | return path[::-1], explored |
| | |
| | closed_list.add(current_node.position) |
| | |
| | |
| | if allow_diagonals: |
| | neighbors = self.get_diagonal_neighbors(current_node.position, current_node.direction) |
| | else: |
| | neighbors = self.get_neighbors(current_node.position, current_node.direction) |
| | |
| | for neighbor_pos, direction, move_cost in neighbors: |
| | if neighbor_pos in closed_list: |
| | continue |
| | |
| | |
| | new_g = g_costs[current_node.position] + move_cost |
| | |
| | if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]: |
| | |
| | min_h = float('inf') |
| | for goal in goals: |
| | if heuristic_type == "manhattan": |
| | h = self.manhattan_distance(neighbor_pos, goal) |
| | elif heuristic_type == "euclidean": |
| | h = self.euclidean_distance(neighbor_pos, goal) |
| | elif heuristic_type == "chebyshev": |
| | h = self.chebyshev_distance(neighbor_pos, goal) |
| | min_h = min(min_h, h) |
| | |
| | |
| | new_node = Node(neighbor_pos, current_node, direction) |
| | new_node.g = new_g |
| | new_node.h = min_h |
| | new_node.f = new_g + min_h |
| | |
| | g_costs[neighbor_pos] = new_g |
| | explored.add(neighbor_pos) |
| | heapq.heappush(open_list, new_node) |
| | |
| | return None, explored |
| | |
| | def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"): |
| | """Find path to the nearest dirty cell""" |
| | dirty_cells = list(self.env.dirty_cells) |
| | |
| | if algorithm == "bfs": |
| | return self.bfs(vacuum_pos, dirty_cells) |
| | else: |
| | |
| | allow_diagonals = heuristic in ["euclidean", "chebyshev"] |
| | return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals) |