Spaces:
Runtime error
Runtime error
| import re | |
| import os | |
| import math | |
| import json | |
| import time | |
| import heapq | |
| import random | |
| import pstats | |
| import cProfile | |
| import itertools | |
| from itertools import combinations, permutations, tee, pairwise | |
| from datetime import datetime | |
| from typing import * | |
| from collections import deque, defaultdict | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| import concurrent.futures | |
| from functions import * | |
| class Node: | |
| def __init__(self, x, y, t=0, neighbours=None, new_ID=None): | |
| if new_ID: | |
| self.ID = new_ID | |
| else: | |
| self.ID = f'{x}_{y}_{t}' | |
| self.x = x | |
| self.y = y | |
| self.t = t | |
| if neighbours is None: | |
| self.neighbours = [] | |
| else: | |
| self.neighbours = neighbours | |
| # self.neighbours = neighbours | |
| self.h = 0 | |
| self.g = t | |
| self.parent = None | |
| self.g_dict = {} | |
| self.neighbours_nodes = [] | |
| def xy_name(self): | |
| return f'{self.x}_{self.y}' | |
| # @property | |
| # def ID(self): | |
| # return f'{self.x}_{self.y}_{self.t}' | |
| def f(self): | |
| return self.t + self.h | |
| # return self.g + self.h | |
| def reset(self, target_nodes=None, **kwargs): | |
| if 'start_time' in kwargs: | |
| self.t = kwargs['start_time'] | |
| else: | |
| self.t = 0 | |
| self.h = 0 | |
| self.g = self.t | |
| self.ID = f'{self.x}_{self.y}_{self.t}' | |
| self.parent = None | |
| if target_nodes is not None: | |
| self.g_dict = {target_node.xy_name: 0 for target_node in target_nodes} | |
| else: | |
| self.g_dict = {} | |
| class ListNodes: | |
| def __init__(self, target_name=None): | |
| self.heap_list = [] | |
| # self.nodes_list = [] | |
| self.dict = {} | |
| self.h_func_bool = False | |
| if target_name: | |
| self.h_func_bool = True | |
| self.target_name = target_name | |
| def __len__(self): | |
| return len(self.heap_list) | |
| def remove(self, node): | |
| if self.h_func_bool: | |
| self.heap_list.remove((node.g_dict[self.target_name], node.xy_name)) | |
| del self.dict[node.xy_name] | |
| else: | |
| if node.ID not in self.dict: | |
| raise RuntimeError('node.ID not in self.dict') | |
| self.heap_list.remove(((node.f, node.h), node.ID)) | |
| del self.dict[node.ID] | |
| # self.nodes_list.remove(node) | |
| def add(self, node): | |
| if self.h_func_bool: | |
| heapq.heappush(self.heap_list, (node.g_dict[self.target_name], node.xy_name)) | |
| self.dict[node.xy_name] = node | |
| else: | |
| heapq.heappush(self.heap_list, ((node.f, node.h), node.ID)) | |
| self.dict[node.ID] = node | |
| # self.nodes_list.append(node) | |
| def pop(self): | |
| heap_tuple = heapq.heappop(self.heap_list) | |
| node = self.dict[heap_tuple[1]] | |
| if self.h_func_bool: | |
| del self.dict[node.xy_name] | |
| else: | |
| del self.dict[node.ID] | |
| # self.nodes_list.remove(node) | |
| return node | |
| def get(self, ID): | |
| return self.dict[ID] | |
| def get_nodes_list(self): | |
| return [self.dict[item[1]] for item in self.heap_list] | |
| def reset_nodes(nodes, target_nodes=None): | |
| _ = [node.reset(target_nodes) for node in nodes] | |
| def parallel_update_h_table(node, nodes, map_dim, to_save, plotter, middle_plot, h_dict, node_index): | |
| print(f'[HEURISTIC]: Thread {node_index} started.') | |
| h_table = build_heuristic_for_one_target(node, nodes, map_dim, to_save, plotter, middle_plot) | |
| h_dict[node.xy_name] = h_table | |
| print(f'[HEURISTIC]: Thread {node_index} finished.') | |
| def get_node(successor_xy_name, node_current, nodes_dict): | |
| if node_current.xy_name == successor_xy_name: | |
| return None | |
| return nodes_dict[successor_xy_name] | |
| def build_heuristic_for_one_target(target_node, nodes, map_dim, to_save=True, plotter=None, middle_plot=False): | |
| # print('Started to build heuristic...') | |
| copy_nodes = nodes | |
| nodes_dict = {node.xy_name: node for node in copy_nodes} | |
| target_name = target_node.xy_name | |
| target_node = nodes_dict[target_name] | |
| # target_node = [node for node in copy_nodes if node.xy_name == target_node.xy_name][0] | |
| # open_list = [] | |
| # close_list = [] | |
| open_nodes = ListNodes(target_name=target_node.xy_name) | |
| closed_nodes = ListNodes(target_name=target_node.xy_name) | |
| # open_list.append(target_node) | |
| open_nodes.add(target_node) | |
| iteration = 0 | |
| # while len(open_list) > 0: | |
| while len(open_nodes) > 0: | |
| iteration += 1 | |
| # node_current = get_node_from_open(open_list, target_name) | |
| node_current = open_nodes.pop() | |
| # if node_current.xy_name == '30_12': | |
| # print() | |
| for successor_xy_name in node_current.neighbours: | |
| node_successor = get_node(successor_xy_name, node_current, nodes_dict) | |
| if node_successor: | |
| successor_current_g = node_current.g_dict[target_name] + 1 # h(now, next) | |
| # INSIDE OPEN LIST | |
| if node_successor.xy_name in open_nodes.dict: | |
| if node_successor.g_dict[target_name] <= successor_current_g: | |
| continue | |
| open_nodes.remove(node_successor) | |
| node_successor.g_dict[target_name] = successor_current_g | |
| node_successor.parent = node_current | |
| open_nodes.add(node_successor) | |
| # INSIDE CLOSED LIST | |
| elif node_successor.xy_name in closed_nodes.dict: | |
| if node_successor.g_dict[target_name] <= successor_current_g: | |
| continue | |
| closed_nodes.remove(node_successor) | |
| node_successor.g_dict[target_name] = successor_current_g | |
| node_successor.parent = node_current | |
| open_nodes.add(node_successor) | |
| # NOT IN CLOSED AND NOT IN OPEN LISTS | |
| else: | |
| node_successor.g_dict[target_name] = successor_current_g | |
| node_successor.parent = node_current | |
| open_nodes.add(node_successor) | |
| # node_successor.g_dict[target_name] = successor_current_g | |
| # node_successor.parent = node_current | |
| # open_nodes.remove(node_current, target_name=target_node.xy_name) | |
| closed_nodes.add(node_current) | |
| if plotter and middle_plot and iteration % 1000 == 0: | |
| plotter.plot_lists(open_list=open_nodes.get_nodes_list(), | |
| closed_list=closed_nodes.get_nodes_list(), start=target_node, nodes=copy_nodes) | |
| if iteration % 100 == 0: | |
| print(f'\riter: {iteration}', end='') | |
| if plotter and middle_plot: | |
| plotter.plot_lists(open_list=open_nodes.get_nodes_list(), | |
| closed_list=closed_nodes.get_nodes_list(), start=target_node, nodes=copy_nodes) | |
| h_table = np.zeros(map_dim) | |
| for node in copy_nodes: | |
| h_table[node.x, node.y] = node.g_dict[target_name] | |
| # h_dict = {target_node.xy_name: h_table} | |
| # print(f'\rFinished to build heuristic at iter {iteration}.') | |
| return h_table | |
| def parallel_build_heuristic_for_entire_map(nodes: List, nodes_dict: Dict[str, Any], map_dim: Tuple[int, int], | |
| **kwargs) -> Dict[str, np.ndarray]: | |
| # print(f'Started to build heuristic for {kwargs['img_dir'][:-4]}...') | |
| # path = kwargs['path'] | |
| # possible_dir = f"{path}/h_dict_of_{kwargs['img_dir'][:-4]}.json" | |
| # else, create one | |
| h_dict = {} | |
| # reset_nodes(nodes, nodes) | |
| # for node_index, node in enumerate(nodes): | |
| # h_table = build_heuristic_for_one_target(node, nodes, map_dim, False, None, False) | |
| # h_dict[node.xy_name] = h_table | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(nodes)) as executor: | |
| for node_index, node in enumerate(nodes): | |
| # parallel_update_h_table(node, nodes, map_dim, to_save, plotter, middle_plot, h_dict, node_index) | |
| executor.submit(parallel_update_h_table, node, nodes, map_dim, False, None, False, h_dict, node_index) | |
| return h_dict | |
| def build_graph_from_np(img_np: np.ndarray, show_map: bool = False) -> Tuple[List[Node], Dict[str, Node]]: | |
| # 0 - wall, 1 - free space | |
| nodes = [] | |
| nodes_dict = {} | |
| x_size, y_size = img_np.shape | |
| # CREATE NODES | |
| for i_x in range(x_size): | |
| for i_y in range(y_size): | |
| if img_np[i_x, i_y] == 1: | |
| node = Node(i_x, i_y) | |
| nodes.append(node) | |
| nodes_dict[node.xy_name] = node | |
| # CREATE NEIGHBOURS | |
| for node1, node2 in combinations(nodes, 2): | |
| if abs(node1.x - node2.x) > 1 or abs(node1.y - node2.y) > 1: | |
| continue | |
| if abs(node1.x - node2.x) == 1 and abs(node1.y - node2.y) == 1: | |
| continue | |
| node1.neighbours.append(node2.xy_name) | |
| node2.neighbours.append(node1.xy_name) | |
| # dist = distance_nodes(node1, node2) | |
| # if dist == 1: | |
| for node in nodes: | |
| node.neighbours.append(node.xy_name) | |
| heapq.heapify(node.neighbours) | |
| for node in nodes: | |
| for nei in node.neighbours: | |
| node.neighbours_nodes.append(nodes_dict[nei]) | |
| if show_map: | |
| plt.imshow(img_np, cmap='gray', origin='lower') | |
| plt.show() | |
| # plt.pause(1) | |
| # plt.close() | |
| return nodes, nodes_dict | |
| if __name__ == '__main__': | |
| file_dir = 'maps/room-32-32-4.map' | |
| img_np, (height, width) = get_np_from_dot_map(file_dir) | |
| nodes, nodes_dict = build_graph_from_np(img_np, show_map=False) | |
| h_dict = parallel_build_heuristic_for_entire_map(nodes, nodes_dict, (height, width)) | |
| print() | |