import re import os import math import json import time import heapq import random import pstats import cProfile import itertools from itertools import combinations, permutations, tee, pairwise from datetime import datetime from typing import * from collections import deque, defaultdict import numpy as np import matplotlib.pyplot as plt import matplotlib import concurrent.futures from functions import * class Node: def __init__(self, x, y, t=0, neighbours=None, new_ID=None): if new_ID: self.ID = new_ID else: self.ID = f'{x}_{y}_{t}' self.x = x self.y = y self.t = t if neighbours is None: self.neighbours = [] else: self.neighbours = neighbours # self.neighbours = neighbours self.h = 0 self.g = t self.parent = None self.g_dict = {} self.neighbours_nodes = [] @property def xy_name(self): return f'{self.x}_{self.y}' # @property # def ID(self): # return f'{self.x}_{self.y}_{self.t}' @property def f(self): return self.t + self.h # return self.g + self.h def reset(self, target_nodes=None, **kwargs): if 'start_time' in kwargs: self.t = kwargs['start_time'] else: self.t = 0 self.h = 0 self.g = self.t self.ID = f'{self.x}_{self.y}_{self.t}' self.parent = None if target_nodes is not None: self.g_dict = {target_node.xy_name: 0 for target_node in target_nodes} else: self.g_dict = {} class ListNodes: def __init__(self, target_name=None): self.heap_list = [] # self.nodes_list = [] self.dict = {} self.h_func_bool = False if target_name: self.h_func_bool = True self.target_name = target_name def __len__(self): return len(self.heap_list) def remove(self, node): if self.h_func_bool: self.heap_list.remove((node.g_dict[self.target_name], node.xy_name)) del self.dict[node.xy_name] else: if node.ID not in self.dict: raise RuntimeError('node.ID not in self.dict') self.heap_list.remove(((node.f, node.h), node.ID)) del self.dict[node.ID] # self.nodes_list.remove(node) def add(self, node): if self.h_func_bool: heapq.heappush(self.heap_list, (node.g_dict[self.target_name], node.xy_name)) self.dict[node.xy_name] = node else: heapq.heappush(self.heap_list, ((node.f, node.h), node.ID)) self.dict[node.ID] = node # self.nodes_list.append(node) def pop(self): heap_tuple = heapq.heappop(self.heap_list) node = self.dict[heap_tuple[1]] if self.h_func_bool: del self.dict[node.xy_name] else: del self.dict[node.ID] # self.nodes_list.remove(node) return node def get(self, ID): return self.dict[ID] def get_nodes_list(self): return [self.dict[item[1]] for item in self.heap_list] def reset_nodes(nodes, target_nodes=None): _ = [node.reset(target_nodes) for node in nodes] def parallel_update_h_table(node, nodes, map_dim, to_save, plotter, middle_plot, h_dict, node_index): print(f'[HEURISTIC]: Thread {node_index} started.') h_table = build_heuristic_for_one_target(node, nodes, map_dim, to_save, plotter, middle_plot) h_dict[node.xy_name] = h_table print(f'[HEURISTIC]: Thread {node_index} finished.') def get_node(successor_xy_name, node_current, nodes_dict): if node_current.xy_name == successor_xy_name: return None return nodes_dict[successor_xy_name] def build_heuristic_for_one_target(target_node, nodes, map_dim, to_save=True, plotter=None, middle_plot=False): # print('Started to build heuristic...') copy_nodes = nodes nodes_dict = {node.xy_name: node for node in copy_nodes} target_name = target_node.xy_name target_node = nodes_dict[target_name] # target_node = [node for node in copy_nodes if node.xy_name == target_node.xy_name][0] # open_list = [] # close_list = [] open_nodes = ListNodes(target_name=target_node.xy_name) closed_nodes = ListNodes(target_name=target_node.xy_name) # open_list.append(target_node) open_nodes.add(target_node) iteration = 0 # while len(open_list) > 0: while len(open_nodes) > 0: iteration += 1 # node_current = get_node_from_open(open_list, target_name) node_current = open_nodes.pop() # if node_current.xy_name == '30_12': # print() for successor_xy_name in node_current.neighbours: node_successor = get_node(successor_xy_name, node_current, nodes_dict) if node_successor: successor_current_g = node_current.g_dict[target_name] + 1 # h(now, next) # INSIDE OPEN LIST if node_successor.xy_name in open_nodes.dict: if node_successor.g_dict[target_name] <= successor_current_g: continue open_nodes.remove(node_successor) node_successor.g_dict[target_name] = successor_current_g node_successor.parent = node_current open_nodes.add(node_successor) # INSIDE CLOSED LIST elif node_successor.xy_name in closed_nodes.dict: if node_successor.g_dict[target_name] <= successor_current_g: continue closed_nodes.remove(node_successor) node_successor.g_dict[target_name] = successor_current_g node_successor.parent = node_current open_nodes.add(node_successor) # NOT IN CLOSED AND NOT IN OPEN LISTS else: node_successor.g_dict[target_name] = successor_current_g node_successor.parent = node_current open_nodes.add(node_successor) # node_successor.g_dict[target_name] = successor_current_g # node_successor.parent = node_current # open_nodes.remove(node_current, target_name=target_node.xy_name) closed_nodes.add(node_current) if plotter and middle_plot and iteration % 1000 == 0: plotter.plot_lists(open_list=open_nodes.get_nodes_list(), closed_list=closed_nodes.get_nodes_list(), start=target_node, nodes=copy_nodes) if iteration % 100 == 0: print(f'\riter: {iteration}', end='') if plotter and middle_plot: plotter.plot_lists(open_list=open_nodes.get_nodes_list(), closed_list=closed_nodes.get_nodes_list(), start=target_node, nodes=copy_nodes) h_table = np.zeros(map_dim) for node in copy_nodes: h_table[node.x, node.y] = node.g_dict[target_name] # h_dict = {target_node.xy_name: h_table} # print(f'\rFinished to build heuristic at iter {iteration}.') return h_table def parallel_build_heuristic_for_entire_map(nodes: List, nodes_dict: Dict[str, Any], map_dim: Tuple[int, int], **kwargs) -> Dict[str, np.ndarray]: # print(f'Started to build heuristic for {kwargs['img_dir'][:-4]}...') # path = kwargs['path'] # possible_dir = f"{path}/h_dict_of_{kwargs['img_dir'][:-4]}.json" # else, create one h_dict = {} # reset_nodes(nodes, nodes) # for node_index, node in enumerate(nodes): # h_table = build_heuristic_for_one_target(node, nodes, map_dim, False, None, False) # h_dict[node.xy_name] = h_table with concurrent.futures.ThreadPoolExecutor(max_workers=len(nodes)) as executor: for node_index, node in enumerate(nodes): # parallel_update_h_table(node, nodes, map_dim, to_save, plotter, middle_plot, h_dict, node_index) executor.submit(parallel_update_h_table, node, nodes, map_dim, False, None, False, h_dict, node_index) return h_dict def build_graph_from_np(img_np: np.ndarray, show_map: bool = False) -> Tuple[List[Node], Dict[str, Node]]: # 0 - wall, 1 - free space nodes = [] nodes_dict = {} x_size, y_size = img_np.shape # CREATE NODES for i_x in range(x_size): for i_y in range(y_size): if img_np[i_x, i_y] == 1: node = Node(i_x, i_y) nodes.append(node) nodes_dict[node.xy_name] = node # CREATE NEIGHBOURS for node1, node2 in combinations(nodes, 2): if abs(node1.x - node2.x) > 1 or abs(node1.y - node2.y) > 1: continue if abs(node1.x - node2.x) == 1 and abs(node1.y - node2.y) == 1: continue node1.neighbours.append(node2.xy_name) node2.neighbours.append(node1.xy_name) # dist = distance_nodes(node1, node2) # if dist == 1: for node in nodes: node.neighbours.append(node.xy_name) heapq.heapify(node.neighbours) for node in nodes: for nei in node.neighbours: node.neighbours_nodes.append(nodes_dict[nei]) if show_map: plt.imshow(img_np, cmap='gray', origin='lower') plt.show() # plt.pause(1) # plt.close() return nodes, nodes_dict if __name__ == '__main__': file_dir = 'maps/room-32-32-4.map' img_np, (height, width) = get_np_from_dot_map(file_dir) nodes, nodes_dict = build_graph_from_np(img_np, show_map=False) h_dict = parallel_build_heuristic_for_entire_map(nodes, nodes_dict, (height, width)) print()