Spaces:
Runtime error
Runtime error
| from algs.alg_sipps_functions import * | |
| def run_sipps_insert_node( | |
| node: SIPPSNode, | |
| Q: List[SIPPSNode], | |
| P: List[SIPPSNode], | |
| ident_dict: DefaultDict[str, List[SIPPSNode]], | |
| goal_node: Node, | |
| goal_np: np.ndarray, | |
| T: int, | |
| T_tag: int, | |
| ec_soft_np: np.ndarray, # x, y, x, y, t -> bool (0/1) | |
| si_table: Dict[str, List[Tuple[int, int, str]]], | |
| agent=None, | |
| ) -> None: | |
| compute_c_g_h_f_values(node, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| identical_nodes = get_identical_nodes(node, Q, P, ident_dict) | |
| for q in identical_nodes: | |
| if q.low <= node.low and q.c <= node.c: | |
| return | |
| elif node.low <= q.low and node.c <= q.c: | |
| if q in Q: | |
| Q.remove(q) | |
| ident_dict[q.ident_str].remove(q) | |
| if q in P: | |
| P.remove(q) | |
| ident_dict[q.ident_str].remove(q) | |
| elif node.low < q.high and q.low < node.high: | |
| if node.low < q.low: | |
| node.set_high(q.low) | |
| else: | |
| q.set_high(node.low) | |
| heapq.heappush(Q, node) | |
| ident_dict[node.ident_str].append(node) | |
| return | |
| def run_sipps_expand_node( | |
| node: SIPPSNode, | |
| nodes_dict: Dict[str, Node], | |
| Q: List[SIPPSNode], | |
| P: List[SIPPSNode], | |
| ident_dict: DefaultDict[str, List[SIPPSNode]], | |
| si_table: Dict[str, List[Tuple[int, int, str]]], | |
| goal_node: Node, | |
| goal_np: np.ndarray, | |
| T: int, | |
| T_tag: int, | |
| ec_hard_np: np.ndarray, # x, y, x, y, t -> bool (0/1) | |
| ec_soft_np: np.ndarray, # x, y, x, y, t -> bool (0/1) | |
| agent=None | |
| ): | |
| I_group: List[Tuple[Node, int]] = get_I_group(node, nodes_dict, si_table, agent) | |
| # I_group_names = [(v.xy_name, i, si_table[v.xy_name][i]) for v, i in I_group] | |
| for v_node, si_id in I_group: | |
| init_low, init_high, init_type = si_table[v_node.xy_name][si_id] | |
| new_low = get_low_without_hard_ec(node, node.n, v_node, init_low, init_high, ec_hard_np, agent) | |
| if new_low is None: | |
| continue | |
| new_low_tag = get_low_without_hard_and_soft_ec(node, node.n, v_node, new_low, init_high, ec_hard_np, ec_soft_np) | |
| if new_low_tag is not None and new_low < new_low_tag < init_high: | |
| n_1 = SIPPSNode(v_node, (new_low, new_low_tag, init_type), si_id, False, parent=node) | |
| run_sipps_insert_node(n_1, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| n_2 = SIPPSNode(v_node, (new_low_tag, init_high, init_type), si_id, False, parent=node) | |
| run_sipps_insert_node(n_2, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| else: | |
| n_3 = SIPPSNode(v_node, (new_low, init_high, init_type), si_id, False, parent=node) | |
| run_sipps_insert_node(n_3, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| def run_sipps( | |
| start_node: Node, | |
| goal_node: Node, | |
| nodes: List[Node], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| vc_hard_np: np.ndarray | None, # x, y, t -> bool (0/1) | |
| ec_hard_np: np.ndarray | None, # x, y, x, y, t -> bool (0/1) | |
| pc_hard_np: np.ndarray | None, # x, y -> time (int) | |
| vc_soft_np: np.ndarray | None, # x, y, t -> bool (0/1) | |
| ec_soft_np: np.ndarray | None, # x, y, x, y, t -> bool (0/1) | |
| pc_soft_np: np.ndarray | None, # x, y -> time (int) | |
| inf_num: int = int(1e10), | |
| max_final_time: int = int(1e10), | |
| flag_k_limit: bool = False, | |
| k_limit: int = int(1e10), | |
| agent=None, | |
| si_table: Dict[str, List[Tuple[int, int, str]]] | None = None, | |
| **kwargs, | |
| ) -> Tuple[List[Node] | None, dict]: | |
| T = get_T(goal_node, si_table) | |
| if T >= inf_num: | |
| return None, {} | |
| root = SIPPSNode(start_node, si_table[start_node.xy_name][0], 0, False) | |
| # goal_vc_times_list = get_vc_list(goal_node, vc_hard_np) | |
| # if len(goal_vc_times_list) > 0: | |
| # T = max(goal_vc_times_list) + 1 | |
| T_tag = get_T_tag(goal_node, si_table) | |
| goal_np: np.ndarray = h_dict[goal_node.xy_name] | |
| compute_c_g_h_f_values(root, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| Q: List[SIPPSNode] = [] | |
| P: List[SIPPSNode] = [] | |
| ident_dict: DefaultDict[str, List[SIPPSNode]] = defaultdict(lambda: []) | |
| heapq.heappush(Q, root) | |
| ident_dict[root.ident_str].append(root) | |
| while len(Q) > 0: | |
| # print(f'\r{len(Q)=}, {len(P)=}', end='') | |
| next_n: SIPPSNode = heapq.heappop(Q) | |
| if next_n.is_goal or next_n.low >= k_limit: | |
| nodes_path, sipps_path = extract_path(next_n, agent=agent) | |
| sipps_path_names = [n.to_print() for n in sipps_path] | |
| return nodes_path, { | |
| 'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table, 'r_type': 'is_goal', | |
| 'sipps_path': sipps_path, 'sipps_path_names': sipps_path_names, 'c': next_n.c, | |
| } | |
| if next_n.n == goal_node and next_n.low >= T: | |
| c_future = get_c_future(goal_node, next_n.low, si_table) | |
| if c_future == 0: | |
| nodes_path, sipps_path = extract_path(next_n, agent=agent) | |
| # nodes_path_names = [n.xy_name for n in nodes_path] | |
| sipps_path_names = [n.to_print() for n in sipps_path] | |
| return nodes_path, { | |
| 'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table, 'r_type': 'c_future=0', | |
| 'sipps_path': sipps_path, 'sipps_path_names': sipps_path_names, 'c': next_n.c, | |
| } | |
| n_tag = duplicate_sipps_node(next_n) | |
| n_tag.is_goal = True | |
| n_tag.c += c_future | |
| run_sipps_insert_node(n_tag, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table) | |
| run_sipps_expand_node(next_n, nodes_dict, Q, P, ident_dict, si_table, goal_node, goal_np, T, T_tag, | |
| ec_hard_np, ec_soft_np, agent) | |
| heapq.heappush(P, next_n) | |
| ident_dict[next_n.ident_str].append(next_n) | |
| return None, { | |
| 'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table, | |
| } | |
| def main(): | |
| # set_seed(random_seed_bool=False, seed=7310) | |
| # set_seed(random_seed_bool=False, seed=123) | |
| set_seed(random_seed_bool=True) | |
| # img_dir = '10_10_my_rand.map' | |
| img_dir = 'empty-32-32.map' | |
| # img_dir = 'random-32-32-10.map' | |
| # img_dir = 'random-32-32-20.map' | |
| # img_dir = 'room-32-32-4.map' | |
| # img_dir = 'maze-32-32-2.map' | |
| # img_dir = 'maze-32-32-4.map' | |
| to_render: bool = True | |
| # final_render: bool = False | |
| path_to_maps: str = '../maps' | |
| path_to_heuristics: str = '../logs_for_heuristics' | |
| img_np, (height, width) = get_np_from_dot_map(img_dir, path_to_maps) | |
| map_dim = (height, width) | |
| nodes, nodes_dict = build_graph_from_np(img_np, show_map=False) | |
| h_dict = exctract_h_dict(img_dir, path_to_heuristics) | |
| path1 = [ | |
| nodes_dict['7_1'], | |
| nodes_dict['7_1'], | |
| nodes_dict['6_1'], | |
| nodes_dict['5_1'], | |
| nodes_dict['4_1'], | |
| ] | |
| path2 = [ | |
| nodes_dict['6_0'], | |
| nodes_dict['6_0'], | |
| # nodes_dict['6_0'], | |
| # nodes_dict['6_0'], | |
| # nodes_dict['6_1'], | |
| # nodes_dict['6_1'], | |
| # nodes_dict['6_1'], | |
| # nodes_dict['6_0'], | |
| ] | |
| h_paths = [path1, path2] | |
| max_path_len = max(map(lambda x: len(x), h_paths)) | |
| vc_hard_np, ec_hard_np, pc_hard_np = create_constraints(h_paths, map_dim, max_path_len) | |
| path = [ | |
| nodes_dict['7_2'], | |
| nodes_dict['7_2'], | |
| nodes_dict['6_2'], | |
| nodes_dict['5_2'], | |
| nodes_dict['4_2'], | |
| ] | |
| s_paths = [path] | |
| vc_soft_np, ec_soft_np, pc_soft_np = create_constraints(s_paths, map_dim, max_path_len) | |
| start_node = nodes_dict['6_2'] | |
| goal_node = nodes_dict['25_25'] | |
| result, info = run_sipps( | |
| start_node, goal_node, nodes, nodes_dict, h_dict, | |
| vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np | |
| ) | |
| # plot | |
| if to_render: | |
| print(f'{[n.xy_name for n in result]}') | |
| plot_np = np.ones(img_np.shape) * -2 | |
| # nodes | |
| for n in nodes: | |
| plot_np[n.x, n.y] = 0 | |
| # hard | |
| for h_path in h_paths: | |
| for n in h_path: | |
| plot_np[n.x, n.y] = -1 | |
| # soft | |
| for s_path in s_paths: | |
| for n in s_path: | |
| plot_np[n.x, n.y] = -0.5 | |
| # result | |
| for n in result: | |
| plot_np[n.x, n.y] = 1 | |
| plt.imshow(plot_np, cmap='binary', origin='lower') | |
| plt.show() | |
| if __name__ == '__main__': | |
| main() | |
| # if vc_hard_np is not None and vc_soft_np is not None: | |
| # assert vc_hard_np.shape[2] == vc_soft_np.shape[2] | |
| # assert ec_hard_np.shape[4] == ec_soft_np.shape[4] | |
| # assert int(max(np.max(pc_hard_np), np.max(pc_soft_np))) + 1 == vc_hard_np.shape[2] | |
| # assert int(max(np.max(pc_hard_np), np.max(pc_soft_np))) + 1 == ec_hard_np.shape[4] | |
| # if pc_hard_np is not None: | |
| # assert pc_hard_np[goal_node.x, goal_node.y] == -1 | |