from globals import * def load_locations(loc_string: str): try: loc_dict = json.loads(loc_string) return loc_dict except json.JSONDecodeError: return 'None' # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # HELP FUNCS # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # def save_results(logs_dict: dict): alg_names = logs_dict['alg_names'] i_problems = logs_dict['i_problems'] img_dir = logs_dict['img_dir'] expr_type = logs_dict['expr_type'] file_dir = f'logs_for_experiments/{expr_type}_{datetime.now().strftime("%Y-%m-%d--%H-%M")}_ALGS-{len(alg_names)}_RUNS-{i_problems}_MAP-{img_dir[:-4]}.json' # Serializing json json_object = json.dumps(logs_dict, indent=4) with open(file_dir, "w") as outfile: outfile.write(json_object) print(f'Results saved in: {file_dir}') return file_dir def set_seed(random_seed_bool, seed=1): if random_seed_bool: seed = random.randint(0, 10000) random.seed(seed) np.random.seed(seed) print(f'[SEED]: --- {seed} ---') def get_dims_from_pic(file_dir): with open(f'{file_dir}') as f: lines = f.readlines() height = int(re.search(r'\d+', lines[1]).group()) width = int(re.search(r'\d+', lines[2]).group()) return height, width def get_np_from_dot_map(file_dir): with open(f'{file_dir}') as f: lines = f.readlines() height, width = get_dims_from_pic(file_dir) img_np = np.zeros((height, width)) for height_index, line in enumerate(lines[4:]): for width_index, curr_str in enumerate(line): if curr_str == '.': img_np[height_index, width_index] = 1 return img_np, (height, width) def build_graph_from_np(img_np: np.ndarray, show_map: bool = False) -> Tuple[List[Node], Dict[str, Node]]: # 0 - wall, 1 - free space nodes = [] nodes_dict = {} x_size, y_size = img_np.shape # CREATE NODES for i_x in range(x_size): for i_y in range(y_size): if img_np[i_x, i_y] == 1: node = Node(i_x, i_y) nodes.append(node) nodes_dict[node.xy_name] = node # CREATE NEIGHBOURS for node1, node2 in combinations(nodes, 2): if abs(node1.x - node2.x) > 1 or abs(node1.y - node2.y) > 1: continue if abs(node1.x - node2.x) == 1 and abs(node1.y - node2.y) == 1: continue node1.neighbours.append(node2.xy_name) node2.neighbours.append(node1.xy_name) # dist = distance_nodes(node1, node2) # if dist == 1: for node in nodes: node.neighbours.append(node.xy_name) heapq.heapify(node.neighbours) for node in nodes: for nei in node.neighbours: node.neighbours_nodes.append(nodes_dict[nei]) if show_map: plt.imshow(img_np, cmap='gray', origin='lower') plt.show() # plt.pause(1) # plt.close() return nodes, nodes_dict def exctract_h_dict(img_dir) -> Dict[str, np.ndarray]: # print(f'Started to build heuristic for {kwargs['img_dir'][:-4]}...') # possible_dir = f"{path}/h_dict_of_{img_dir[:-4]}.json" doc = perfect_heuristics_col.find_one({"name": img_dir}) h_dict = doc['h_dict'] for k, v in h_dict.items(): h_dict[k] = np.array(v) return h_dict # if there is one # if os.path.exists(possible_dir): # # Opening JSON file # with open(possible_dir, 'r') as openfile: # # Reading from json file # h_dict = json.load(openfile) # for k, v in h_dict.items(): # h_dict[k] = np.array(v) # return h_dict # # raise RuntimeError('nu nu') def get_blocked_sv_map(img_dir: str, folder_dir: str = 'logs_for_freedom_maps'): # possible_dir = f'{folder_dir}/blocked_{img_dir[:-4]}.npy' possible_dir = f'{folder_dir}/blocked_v2_{img_dir[:-4]}.npy' assert os.path.exists(possible_dir) with open(possible_dir, 'rb') as f: blocked_sv_map = np.load(f) return blocked_sv_map def get_sv_map(img_dir: str, folder_dir: str = 'logs_for_freedom_maps'): possible_dir = f'{folder_dir}/{img_dir[:-4]}.npy' assert os.path.exists(possible_dir) with open(possible_dir, 'rb') as f: sv_map = np.load(f) return sv_map def create_constraints( paths: List[List[Node]], map_dim: Tuple[int, int], max_path_len: int ) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]: """ vc_np: vertex constraints [x, y, t] = bool ec_np: edge constraints [x, y, x, y, t] = bool pc_np: permanent constraints [x, y] = int or -1 """ if len(paths) == 0: return None, None, None if max_path_len == 0: return None, None, None vc_np = np.zeros((map_dim[0], map_dim[1], max_path_len)) ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len)) pc_np = np.ones((map_dim[0], map_dim[1])) * -1 for path in paths: update_constraints(path, vc_np, ec_np, pc_np) return vc_np, ec_np, pc_np def init_constraints( map_dim: Tuple[int, int], max_path_len: int ) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]: """ vc_np: vertex constraints [x, y, t] = bool ec_np: edge constraints [x, y, x, y, t] = bool pc_np: permanent constraints [x, y] = int or -1 """ if max_path_len == 0: return None, None, None vc_np = np.zeros((map_dim[0], map_dim[1], max_path_len)) ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len)) pc_np = np.ones((map_dim[0], map_dim[1])) * -1 return vc_np, ec_np, pc_np def init_ec_table( map_dim: Tuple[int, int], max_path_len: int ) -> np.ndarray | None: """ ec_np: edge constraints [x, y, x, y, t] = bool """ if max_path_len == 0: return None, None, None ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len)) return ec_np def update_ec_table( path: List[Node], ec_np: np.ndarray ) -> np.ndarray | None: """ ec_np: edge constraints [x, y, x, y, t] = bool """ prev_n = path[0] for t, n in enumerate(path): # ec ec_np[prev_n.x, prev_n.y, n.x, n.y, t] = 1 prev_n = n return ec_np def update_constraints( path: List[Node], vc_np: np.ndarray, ec_np: np.ndarray, pc_np: np.ndarray ) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]: """ vc_np: vertex constraints [x, y, t] = bool ec_np: edge constraints [x, y, x, y, t] = bool pc_np: permanent constraints [x, y] = int or -1 """ # pc last_node = path[-1] last_time = len(path) - 1 pc_np[last_node.x, last_node.y] = max(int(pc_np[last_node.x, last_node.y]), last_time) prev_n = path[0] for t, n in enumerate(path): # vc vc_np[n.x, n.y, t] = 1 # ec ec_np[prev_n.x, prev_n.y, n.x, n.y, t] = 1 prev_n = n return vc_np, ec_np, pc_np def align_all_paths(agents: List, flag_k_limit: bool = False) -> int: if len(agents) == 0: return 1 if flag_k_limit: max_len = max([len(a.k_path) for a in agents]) for a in agents: while len(a.k_path) < max_len: a.k_path.append(a.k_path[-1]) return max_len max_len = max([len(a.path) for a in agents]) for a in agents: while len(a.path) < max_len: a.path.append(a.path[-1]) return max_len def shorten_back_all_paths(agents: List) -> None: for a in agents: if len(a.path) <= 2: continue minus_1_node = a.path[-1] minus_2_node = a.path[-2] while minus_1_node == minus_2_node: a.path = a.path[:-1] if len(a.path) <= 2: break minus_1_node = a.path[-1] minus_2_node = a.path[-2] def check_one_vc_ec_neic_iter(path: List[Node], agent_name, other_paths: Dict[str, List[Node]], iteration: int) -> None: collisions: int = 0 for agent_other_name, other_path in other_paths.items(): # vertex conf assert path[iteration] != other_path[iteration], f'[i: {iteration}] vertex conf: {path[iteration].xy_name}' # edge conf prev_node1 = path[max(0, iteration - 1)] curr_node1 = path[iteration] prev_node2 = other_path[max(0, iteration - 1)] curr_node2 = other_path[iteration] edge1 = (prev_node1.x, prev_node1.y, curr_node1.x, curr_node1.y) edge2 = (curr_node2.x, curr_node2.y, prev_node2.x, prev_node2.y) # nei conf assert path[iteration].xy_name in path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!' assert edge1 != edge2, f'[i: {iteration}] edge collision: {edge1}' assert path[iteration].xy_name in path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!' def check_vc_ec_neic_iter(agents: list | Deque, iteration: int, to_count: bool = False) -> int: collisions: int = 0 for a1, a2 in combinations(agents, 2): # vertex conf if not to_count: assert a1.path[iteration] != a2.path[iteration], f'[i: {iteration}] vertex conf: {a1.name}-{a2.name} in {a1.path[iteration].xy_name}' else: if a1.path[iteration] == a2.path[iteration]: collisions += 1 # edge conf prev_node1 = a1.path[max(0, iteration - 1)] curr_node1 = a1.path[iteration] prev_node2 = a2.path[max(0, iteration - 1)] curr_node2 = a2.path[iteration] edge1 = (prev_node1.x, prev_node1.y, curr_node1.x, curr_node1.y) edge2 = (curr_node2.x, curr_node2.y, prev_node2.x, prev_node2.y) # nei conf assert a1.path[iteration].xy_name in a1.path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!' if not to_count: assert edge1 != edge2, f'[i: {iteration}] edge collision: {a1.name}-{a2.name} in {edge1}' else: if edge1 == edge2: collisions += 1 assert agents[-1].path[iteration].xy_name in agents[-1].path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!' return collisions def check_configs( agents: List[AgentAlg], config_from: Dict[str, Node], config_to: Dict[str, Node], final_check: bool = True ) -> None: if final_check: for agent in agents: assert agent.name in config_from assert agent.name in config_to for a1, a2 in combinations(agents, 2): if a1.name not in config_to or a2.name not in config_to: continue # vertex conf from_node_1: Node = config_from[a1.name] to_node_1: Node = config_to[a1.name] from_node_2: Node = config_from[a2.name] to_node_2: Node = config_to[a2.name] # vc assert from_node_1 != from_node_2, f' vc: {a1.name}-{a2.name} in {from_node_1.xy_name}' assert to_node_1 != to_node_2, f' vc: {a1.name}-{a2.name} in {to_node_2.xy_name}' # edge conf edge1 = (from_node_1.x, from_node_1.y, to_node_1.x, to_node_1.y) edge2 = (to_node_2.x, to_node_2.y, from_node_2.x, from_node_2.y) assert edge1 != edge2, f'ec: {a1.name}-{a2.name} in {edge1}' # nei conf assert from_node_1.xy_name in to_node_1.neighbours, f'neic {a1.name}: {from_node_1.xy_name} not nei of {to_node_1.xy_name}' assert from_node_2.xy_name in to_node_2.neighbours, f'neic {a2.name}: {from_node_2.xy_name} not nei of {to_node_2.xy_name}' def ranges_intersect(range1, range2): start1, end1 = range1 start2, end2 = range2 return start1 <= end2 and start2 <= end1 def use_profiler(save_dir): def decorator(func): def inner1(*args, **kwargs): profiler = cProfile.Profile() profiler.enable() # getting the returned value returned_value = func(*args, **kwargs) profiler.disable() stats = pstats.Stats(profiler).sort_stats('cumtime') stats.dump_stats(save_dir) # returning the value to the original frame return returned_value return inner1 return decorator def two_plans_have_no_confs(path1: List[Node], path2: List[Node]): min_len = min(len(path1), len(path2)) # assert len(path1) == len(path2) prev1 = None prev2 = None bigger_path = path1 if len(path1) >= len(path2) else path2 smaller_path = path1 if len(path1) < len(path2) else path2 if smaller_path[-1] in bigger_path[len(smaller_path) - 1:]: return False for i, (vertex1, vertex2) in enumerate(zip(path1[:min_len], path2[:min_len])): if vertex1.x == vertex2.x and vertex1.y == vertex2.y: return False if i > 0: # edge1 = (prev1.xy_name, vertex1.xy_name) # edge2 = (vertex2.xy_name, prev2.xy_name) # if (prev1.x, prev1.y, vertex1.x, vertex1.y) == (vertex2.x, vertex2.y, prev2.x, prev2.y): if prev1.x == vertex2.x and prev1.y == vertex2.y and vertex1.x == prev2.x and vertex1.y == prev2.y: return False prev1 = vertex1 prev2 = vertex2 return True def two_equal_paths_have_confs(path1: List[Node], path2: List[Node]): assert len(path1) == len(path2) from1 = None from2 = None for i, (to1, to2) in enumerate(zip(path1, path2)): if to1.x == to2.x and to1.y == to2.y: return True if i > 0: if from1.x == to2.x and from1.y == to2.y and to1.x == from2.x and to1.y == from2.y: return True from1 = to1 from2 = to2 return False def create_agents( start_nodes: List[Node], goal_nodes: List[Node] ) -> Tuple[List[AgentAlg], Dict[str, AgentAlg]]: agents: List[AgentAlg] = [] agents_dict: Dict[str, AgentAlg] = {} for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)): new_agent = AgentAlg(num, s_node, g_node) agents.append(new_agent) agents_dict[new_agent.name] = new_agent return agents, agents_dict def time_is_good(start_time: int | float, max_time: int) -> bool: runtime = time.time() - start_time return runtime < max_time def align_path(new_path: List[Node], k_limit: int) -> List[Node]: while len(new_path) < k_limit: new_path.append(new_path[-1]) return new_path[:k_limit] def manhattan_dist(n1: Node, n2: Node): """ The Manhattan Distance between two points (X1, Y1) and (X2, Y2) is given by |X1 – X2| + |Y1 – Y2|. :return: """ return np.abs(n1.x - n2.x) + np.abs(n1.y - n2.y) # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # LIFELONG / k-LIMITED FUNCS # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # def add_k_paths_to_agents(agents: List[AgentAlg] | list) -> None: for agent in agents: if len(agent.path) == 0: agent.path.append(agent.start_node) agent.path.extend(agent.k_path[1:]) def update_goal_nodes(agents: List[AgentAlg] | list, nodes: List[Node]) -> int: finished_goals = 0 goal_names: List[str] = [a.goal_node.xy_name for a in agents if a.goal_node is not None] heapq.heapify(goal_names) free_nodes: List[Node] = [n for n in nodes if n.xy_name not in goal_names] for agent in agents: to_change_goal: bool = False if agent.goal_node is not None and agent.curr_node == agent.goal_node: finished_goals += 1 to_change_goal = True if agent.goal_node is None: to_change_goal = True if to_change_goal: # if random.random() < 0.5: # agent.goal_node = None # continue next_goal_node = random.choice(free_nodes) while next_goal_node.xy_name in goal_names: next_goal_node = random.choice(free_nodes) agent.goal_node = next_goal_node heapq.heappush(goal_names, next_goal_node.xy_name) return finished_goals def stay_k_path_agent(agent: AgentAlg | Any, n: Node, k_limit: int) -> None: agent.k_path = [n] while len(agent.k_path) < k_limit: agent.k_path.append(n) agent.k_path = agent.k_path[:k_limit] def exceeds_k_dist(n1: Node, n2: Node, k_limit: int) -> bool: dist_x = abs(n1.x - n2.x) if dist_x > k_limit: return True dist_y = abs(n1.y - n2.y) if dist_y > k_limit: return True if dist_x + dist_y > k_limit: return True return False def repair_agents_k_paths(agents: List[AgentAlg] | list, k_limit: int) -> None: standby_agents_dict: Dict[str, bool] = {} for agent in agents: if agent.k_path is None or len(agent.k_path) == 0: stay_k_path_agent(agent, agent.curr_node, k_limit + 1) standby_agents_dict[agent.name] = True else: standby_agents_dict[agent.name] = False all_good = False while not all_good: all_good = True for a1, a2 in combinations(agents, 2): if standby_agents_dict[a1.name] and standby_agents_dict[a2.name]: continue if exceeds_k_dist(a1.curr_node, a2.curr_node, k_limit + 1): continue if two_equal_paths_have_confs(a1.k_path, a2.k_path): stay_k_path_agent(a1, a1.curr_node, k_limit + 1) stay_k_path_agent(a2, a2.curr_node, k_limit + 1) standby_agents_dict[a1.name] = True standby_agents_dict[a2.name] = True all_good = False break print(' | repaired') return # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # METRICS FUNCS # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # # -------------------------------------------------------------------------------------------------------------------- # def shorten_back_path(input_path: List[Node]) -> List[Node]: path = input_path[:] if len(path) <= 2: return path minus_1_node = path[-1] minus_2_node = path[-2] while minus_1_node == minus_2_node: path = path[:-1] if len(path) <= 2: break minus_1_node = path[-1] minus_2_node = path[-2] return path def get_makespan_metric(paths_dict: Dict[str, List[Node]]) -> int: old_paths = list(paths_dict.values()) paths: List[List[Node]] = [] for path in old_paths: paths.append(shorten_back_path(path)) makespan: int = max([len(path) for path in paths]) return makespan def get_soc_metric(paths_dict: Dict[str, List[Node]]) -> int: old_paths = list(paths_dict.values()) paths: List[List[Node]] = [] for path in old_paths: paths.append(shorten_back_path(path)) soc: int = sum([len(path) for path in paths]) return soc